Spark - Reading JSON from Partitioned Folders using Firehose

Clash Royale CLAN TAG#URR8PPP
Spark - Reading JSON from Partitioned Folders using Firehose
Kinesis firehose manages the persistence of files, in this case time series JSON, into a folder hierarchy that is partitioned by YYYY/MM/DD/HH (down to the hour in 24 numbering)...great.
How using Spark 2.0 then can I read these nested sub folders and create a static Dataframe from all the leaf json files? Is there an 'option' to the dataframe reader?
My next goal is for this to be a streaming DF, where new files persisted by Firehose into s3 naturally become part of the streaming dataframe using the new structured streaming in Spark 2.0. I know this is all experimental - hoping someone has used S3 as a streaming file source before, where the data is paritioned into folders as described above. Of course would prefer straight of a Kinesis stream but there is no date on this connector for 2.0 so Firehose->S3 is the interim.
ND: I am using databricks, which mounts S3 into DBFS, but could easily be EMR of course or other Spark providers. Be great to see a notebook too if one is shareable that gives an example.
Cheers!
2 Answers
2
Can I read nested subfolders and create a static DataFrame from all the leaf JSON files? Is there an option to the DataFrame reader?
Yes, as your directory structure is regular(YYYY/MM/DD/HH), you can give the path till leaf node with wildcard chars like below
YYYY/MM/DD/HH
val spark: SparkSession = SparkSession.builder.master("local").getOrCreate
val jsonDf = spark.read.format("json").json("base/path/*/*/*/*/*.json")
// Here */*/*/*/*.json maps to YYYY/MM/DD/HH/filename.json
Of course, would prefer straight of a Kinesis stream but there is no date on this connector for 2.0 so Firehose->S3 is the interim.
I could see there is a library for Kinesis integration with Spark Streaming. So, you can read the streaming data directly and perform SQL operations on it without reading from S3.
groupId = org.apache.spark
artifactId = spark-streaming-kinesis-asl_2.11
version = 2.0.0
Sample code with Spark Streaming and SQL
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.kinesis._
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
val kinesisStream = KinesisUtils.createStream(
streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
[region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2)
kinesisStream.foreachRDD rdd =>
// Get the singleton instance of SparkSession
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
// Convert RDD[String] to DataFrame
val jsonDf = rdd.toDF() // or rdd.toDF("specify schema/columns here")
// Create a temporary view with DataFrame
jsonDf.createOrReplaceTempView("json_data_tbl")
//As we have DataFrame and SparkSession object we can perform most
//of the Spark SQL stuff here
yeah, that is if you for static s3 read. can you give a try with 2nd approach (process
Kinesis stream directly) ?– mrsrinivas
Dec 19 '16 at 10:46
Kinesis
I'm not the OP, I don't yet have the streaming need, I just wanted to see this more clearly documented in Stack Overflow. I'm using the static method you documented, but changing it to scan using better S3 calls that then create either a list of files to be processed in parallel (best) or the sequence of files to pass in.
– Jayson Minard
Dec 19 '16 at 11:30
for the s3 recursive file reading, see stackoverflow.com/questions/41062705/…
– Jayson Minard
Dec 19 '16 at 11:55
Maybe this is better: forums.databricks.com/questions/480/…
– Jayson Minard
Dec 19 '16 at 11:58
Full disclosure: I work for Databricks but I do not represent them on Stack Overflow.
How using Spark 2.0 then can I read these nested sub folders and create a static Dataframe from all the leaf json files? Is there an 'option' to the dataframe reader?
DataFrameReader supports loading a sequence. See the documentation for def
json(paths: String*): DataFrame. You can specify the sequence, use a globbing pattern or build it programmatically (recommended):
val inputPathSeq = Seq[String]("/mnt/myles/structured-streaming/2016/12/18/02", "/mnt/myles/structured-streaming/2016/12/18/03")
val inputPathGlob = "/mnt/myles/structured-streaming/2016/12/18/*"
val basePath = "/mnt/myles/structured-streaming/2016/12/18/0"
val inputPathList = (2 to 4).toList.map(basePath+_+"/*.json")
I know this is all experimental - hoping someone has used S3 as a streaming file source before, where the data is partitioned into folders as described above. Of course would prefer straight of a Kinesis stream but there is no date on this connector for 2.0 so Firehose->S3 is the interim.
Since you're using DBFS, I'm going to assume the S3 buckets where data are streaming from Firehose are already mounted to DBFS. Check out Databricks documentation if you need help mounting your S3 bucket to DBFS. Once you have your input path described above, you can simply load the files into a static or streaming dataframe:
Static
val staticInputDF =
spark
.read
.schema(jsonSchema)
.json(inputPathSeq : _*)
staticInputDF.isStreaming
res: Boolean = false
Streaming
val streamingInputDF =
spark
.readStream // `readStream` instead of `read` for creating streaming DataFrame
.schema(jsonSchema) // Set the schema of the JSON data
.option("maxFilesPerTrigger", 1) // Treat a sequence of files as a stream by picking one file at a time
.json(inputPathSeq : _*)
streamingCountsDF.isStreaming
res: Boolean = true
Most of this is taken straight from Databricks documentation on Structured Streaming. There is even a notebook example you can import into Databricks directly.
If you use S3 mounted via DBFS is it faster with nested globs than using them with
s3a file system?– Jayson Minard
Dec 19 '16 at 10:00
s3a
I guess using the glob wouldn't necessarily maintain order, but using the file sequence would (for the streaming case, that seems important), yes?
– Jayson Minard
Dec 19 '16 at 10:01
Lastly, the stream wouldn't continue past the original list of sequence files, so having it pick up new files as they arrive and continue the streaming doesn't really work here.
– Jayson Minard
Dec 19 '16 at 10:02
Working around the slow glob listing of S3 buckets, read: forums.databricks.com/questions/480/… and stackoverflow.com/questions/41062705/…
– Jayson Minard
Dec 19 '16 at 11:59
@JaysonMinard: DBFS vs. s3a: DBFS will contain proprietary and Databricks-optimal access of S3. If you're accessing S3 through Databricks, the best experience is with DBFS.
– Myles Baker
Dec 19 '16 at 14:30
By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.
The nested subfolders answer here would be dreadfully slow. The way that it reads is to do a list on each subfolder recursively, which is about as bad as it gets for performance.
– Jayson Minard
Dec 19 '16 at 9:45