How to catch exception for each record while reading CSV using Spark/Scala

The name of the pictureThe name of the pictureThe name of the pictureClash Royale CLAN TAG#URR8PPP



How to catch exception for each record while reading CSV using Spark/Scala



I have a CSV (no headers) that looks like this:


file_id, file_contents
1001, textString1
1002, textString2
1003, textString3



I am reading the file using Spark/Scala app like this:


val df = spark.read
.text(list: _*)
.map r =>
val str = r.getAs[String]("value")
val fileId == str.substring(0, str.indexOf(","))
val fileContents =
val content = str.substring(0, str.indexOf(","))
if (content .startsWith(""")) content .substring(1, content .length - 1) else content

(fileId, fileContents)
.toDF("fileId", "fileContents")



When i transform this dataframe, i am capturing exceptions and processing as usual. But the problem iam having is that, if there is atleast on bad record in the CSV, like the contents are malformed etc. the application is failing for the whole file. I want to change this feature and make sure the application identifies the correct records and captures the bad records in exception. Can someone please help me modify this code so that i can still process the good records in the CSV and capture the bad ones in exception. Thanks.





Why not use spark.read.csv? It would save you a lot of work. It also have some different modes that can be set of what to do with malformed rows.
– Shaido
Aug 8 at 3:19


spark.read.csv




1 Answer
1



You can read a CSV taking into account invalid rows, after that you can filter any row you consider necessary.


val spark = org.apache.spark.sql.SparkSession.builder
.master("local")
.appName("Spark CSV Reader")
.getOrCreate

// input
val df = spark.read
.format("csv")
.option("header", "true") //reading the headers
.option("mode", "DROPMALFORMED") // discard invalid rows
.load("INPUT FILE")

// output
df.toDF("fileId", "fileContents")
.filter( row => row.getString(0).forall(_.isDigit) ) // eg first column is a number
.write
.format("com.databricks.spark.csv")
.option("header", "true")
.save("OUTPUT FILE")





Thank you Sebastian! This worked for me!
– qubiter
Aug 8 at 16:13






By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.

Popular posts from this blog

Firebase Auth - with Email and Password - Check user already registered

Dynamically update html content plain JS

How to determine optimal route across keyboard