Issues in loading multiple csv in spark with scala

The name of the pictureThe name of the pictureThe name of the pictureClash Royale CLAN TAG#URR8PPP



Issues in loading multiple csv in spark with scala



I am using Spark2.3 with Scala and trying to load multiple csv files from a directory, I am getting an issue that it load files but miss some columns from them



I have following sample files



test1.csv


Col1,Col2,Col3,Col4,Col5
aaa,2,3,4,5
aaa,2,3,4,5
aaa,2,3,4,5
aaa,2,3,4,5
aaa,2,3,4,5
aaa,2,3,4,5
aaa,2,3,4,5
aaa,2,3,4,5
aaa,2,3,4,5



test2.csv


Col1,Col2,Col3,Col4
aaa,2,3,4
aaa,2,3,4
aaa,2,3,4
aaa,2,3,4
aaa,2,3,4
aaa,2,3,4
aaa,2,3,4
aaa,2,3,4
aaa,2,3,4



test3.csv


Col1,Col2,Col3,Col4,Col6
aaa,2,3,4,6
aaa,2,3,4,6
aaa,2,3,4,6
aaa,2,3,4,6
aaa,2,3,4,6
aaa,2,3,4,6
aaa,2,3,4,6
aaa,2,3,4,6
aaa,2,3,4,6



test4.csv


Col1,Col2,Col5,Col4,Col3
aaa,2,5,4,3
aaa,2,5,4,3
aaa,2,5,4,3
aaa,2,5,4,3
aaa,2,5,4,3
aaa,2,5,4,3
aaa,2,5,4,3
aaa,2,5,4,3
aaa,2,5,4,3



What i want to do is load all these files into a dataframe with all the columns in 4 files but when i try to load files with following code


val dft = spark.read.format("csv").option("header", "true").load("path/to/directory/*.csv")



It loads csv but misses some columns from csv.



here is the output of dft.show()


+----+----+----+----+----+
|Col1|Col2|Col3|Col4|Col6|
+----+----+----+----+----+
| aaa| 2| 3| 4| 6|
| aaa| 2| 3| 4| 6|
| aaa| 2| 3| 4| 6|
| aaa| 2| 3| 4| 6|
| aaa| 2| 3| 4| 6|
| aaa| 2| 3| 4| 6|
| aaa| 2| 3| 4| 6|
| aaa| 2| 3| 4| 6|
| aaa| 2| 3| 4| 6|
| aaa| 2| 5| 4| 3|
| aaa| 2| 5| 4| 3|
| aaa| 2| 5| 4| 3|
| aaa| 2| 5| 4| 3|
| aaa| 2| 5| 4| 3|
| aaa| 2| 5| 4| 3|
| aaa| 2| 5| 4| 3|
| aaa| 2| 5| 4| 3|
| aaa| 2| 5| 4| 3|
| aaa| 2| 3| 4| 5|
| aaa| 2| 3| 4| 5|
+----+----+----+----+----+



I want it to be like this


+----+----+----+----+----+----+
|Col1|Col2|Col3|Col4|Col5|Col6|
+----+----+----+----+----+----+
| aaa| 2| 3| 4| 5| 6|
| aaa| 2| 3| 4| 5| 6|
| aaa| 2| 3| 4| 5| 6|
| aaa| 2| 3| 4| 5| 6|
| aaa| 2| 3| 4| 5| 6|
| aaa| 2| 3| 4| 5| 6|
| aaa| 2| 3| 4| 5| 6|
| aaa| 2| 3| 4| 5| 6|
| aaa| 2| 3| 4| 5| 6|
| aaa| 2| 3| 4| 5| 6|
| aaa| 2| 3| 4| 5| 6|
| aaa| 2| 3| 4| 5| 6|
| aaa| 2| 3| 4| 5| 6|
| aaa| 2| 3| 4| 5| 6|
| aaa| 2| 3| 4| 5| 6|
| aaa| 2| 3| 4| 5| 6|
| aaa| 2| 3| 4| 5| 6|
| aaa| 2| 3| 4| 5| 6|
| aaa| 2| 3| 4| 5| 6|
| aaa| 2| 3| 4| 5| 6|
+----+----+----+----+----+----+



Please guide me what is wrong with my code?
or is there any other efficient way to do it?



Thanks





Some thing related to the same issue stackoverflow.com/questions/48999381/…
– shahidammer
Aug 6 at 7:00





Spark's CSV reader does not support missing columns. You have to find another way. Can you tell me how many files you have and how big they are? Also, what do you expect when the column is not there?
– Oli
Aug 6 at 9:18





2 Answers
2



If each individual file is not too big, you could use wholeTextFile and parse your files yourself as follows:


wholeTextFile


val columns = (1 to 6).map("Col"+_)
val rdd = sc.wholeTextFiles("path_to_files/*")
.map(_._2.split("\n"))
.flatMap(x=>
// We consider the first line as the header
val cols = x.head.split(",")
// Then we flatten the remaining lines and shape each of them
// as a list of tuples (ColumnName, content).
x.tail
.map(_.split(","))
.map(row => row.indices.map(i => cols(i) -> row(i)))
)
.map(_.toMap)
// Here we take the list of all the colmuns and map each of them to
// its value if it exists, null otherwise.
.map(map => columns.map(name => map.getOrElse(name, null) ))
.map(Row.fromSeq _)



This code puts each file within a single record using wholeTextFile (this is why files cannot be too big), uses the first line to determine what columns are present and in which order, creates a Map that maps column names to values and convert it to a row with nulls when values are missing. Then, the data is ready to go into a dataframe:


wholeTextFile


val schema = StructType(
columns.map(name => StructField(name, StringType, true))
)
spark.createDataFrame(rdd, schema).show()





thanks for replying , but first portion gives err on "columns" in this line "map(map => columns.map(name => map.getOrElse(name, null) ))" i.e "cmd22.sc:1: not found: value columns". Can you please tell me which columns it is referring to?
– M.Ahsen Taqi
Aug 6 at 10:03






Right, I should have defined columns earlier ;-) I edited the answer.
– Oli
Aug 6 at 10:05


columns





thanks for the edit it solve the issue for the first portion, now the it gives error on StructField "not found: value StructField" , then i added import statement for it and getting this error ..... "not enough arguments for method apply: (name: String, dataType: org.apache.spark.sql.types.DataType, nullable: Boolean, metadata: org.apache.spark.sql.types.Metadata)org.apache.spark.sql.types.StructField in object StructField. Unspecified value parameter dataType. val schema = StructField("
– M.Ahsen Taqi
Aug 6 at 10:32






here is the import statement i added to the 2nd portion of the code.... "import org.apache.spark.sql.types.StructField"
– M.Ahsen Taqi
Aug 6 at 10:35





You could even put import org.apache.spark.sql.types._ or import org.apache.spark.sql.types. StructType, StructField, StringType because you need all three. I realized that I had made another typo. I had used a StructField instead of a StructType in the schema declaration. I corrected the answer.
– Oli
Aug 6 at 12:07


import org.apache.spark.sql.types._


import org.apache.spark.sql.types. StructType, StructField, StringType



I found solution for the problem I was trying to solve, so I thought I should share this for anyone who is trying to achieve same output.



I used Parquet to solve the merge task in different files with some common columns.



here is the code


val conf = new SparkConf()
.setAppName("Exercise")
.setMaster("local")
val sc = new SparkContext(conf)

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val spark = SparkSession
.builder()
.appName("Spark Sql Session")
.config("spark.some.config.option", "test")
.getOrCreate()

val filepath = sc.wholeTextFiles("path/to/MergeFiles/*.txt").keys
val list = filepath.collect().toList
var i = 1
list.foreach path =>
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("delimiter", ",")
.option("header", "true")
.load(path)
df.write.parquet("data/test_tbl/key="+ i)
i +=1

val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_tbl")

mergedDF.write.format("csv").save("target/directory/for/mergedFiles")



and following is the output of mergedDF.show()


mergedDF.show()


+----+----+----+----+----+----+---+
|Col1|Col2|Col3|Col4|Col6|Col5|key|
+----+----+----+----+----+----+---+
|aaa |2 |3 |4 |6 |null|2 |
|aaa |2 |3 |4 |6 |null|2 |
|aaa |2 |3 |4 |6 |null|2 |
|aaa |2 |3 |4 |6 |null|2 |
|aaa |2 |3 |4 |6 |null|2 |
|aaa |2 |3 |4 |6 |null|2 |
|aaa |2 |3 |4 |6 |null|2 |
|aaa |2 |3 |4 |6 |null|2 |
|aaa |2 |3 |4 |6 |null|2 |
|aaa |2 |3 |4 |null|5 |4 |
|aaa |2 |3 |4 |null|5 |4 |
|aaa |2 |3 |4 |null|5 |4 |
|aaa |2 |3 |4 |null|5 |4 |
|aaa |2 |3 |4 |null|5 |4 |
|aaa |2 |3 |4 |null|5 |4 |
|aaa |2 |3 |4 |null|5 |4 |
|aaa |2 |3 |4 |null|5 |4 |
|aaa |2 |3 |4 |null|5 |4 |
|aaa |2 |3 |4 |null|5 |3 |
|aaa |2 |3 |4 |null|5 |3 |
+----+----+----+----+----+----+---+






By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.

Popular posts from this blog

Firebase Auth - with Email and Password - Check user already registered

Dynamically update html content plain JS

How to determine optimal route across keyboard