How to catch exception for each record while reading CSV using Spark/Scala
I have a CSV (no headers) that looks like this:
file_id, file_contents
1001, textString1
1002, textString2
1003, textString3
I am reading the file using Spark/Scala app like this:
val df =
.text(list: _*)
.map r =>
val str = r.getAs[String]("value")
val fileId == str.substring(0, str.indexOf(","))
val fileContents =
val content = str.substring(0, str.indexOf(","))
if (content .startsWith(""")) content .substring(1, content .length - 1) else content
(fileId, fileContents)
.toDF("fileId", "fileContents")
When i transform this dataframe, i am capturing exceptions and processing as usual. But the problem iam having is that, if there is atleast on bad record in the CSV, like the contents are malformed etc. the application is failing for the whole file. I want to change this feature and make sure the application identifies the correct records and captures the bad records in exception. Can someone please help me modify this code so that i can still process the good records in the CSV and capture the bad ones in exception. Thanks.
1 Answer
You can read a CSV taking into account invalid rows, after that you can filter any row you consider necessary.
val spark = org.apache.spark.sql.SparkSession.builder
.appName("Spark CSV Reader")
// input
val df =
.option("header", "true") //reading the headers
.option("mode", "DROPMALFORMED") // discard invalid rows
.load("INPUT FILE")
// output
df.toDF("fileId", "fileContents")
.filter( row => row.getString(0).forall(_.isDigit) ) // eg first column is a number
.option("header", "true")
.save("OUTPUT FILE")
Thank you Sebastian! This worked for me!
– qubiter
Aug 8 at 16:13
