Spark - load CSV file as DataFrame?

The name of the pictureThe name of the pictureThe name of the pictureClash Royale CLAN TAG#URR8PPP



Spark - load CSV file as DataFrame?



I would like to read a CSV in spark and convert it as DataFrame and store it in HDFS with df.registerTempTable("table_name")


df.registerTempTable("table_name")


scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")


java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



What is the right command to load CSV file as DataFrame in Apache Spark?





check this link for doing it in Spark 2.0
– mrsrinivas
Oct 23 '16 at 5:25




8 Answers
8



spark-csv is part of core Spark functionality and doesn't require a separate library.
So you could just do for example


df = spark.read.format("csv").option("header", "true").load("csvfile.csv")





In my case there is no function named csvFile () inside SQLContext class.
– Fahad Siddiqui
Jul 13 '15 at 5:40


csvFile ()





yes @Fahad, it comes from CsvContext as mentioned.
– Shyamendra Solanki
Jul 13 '15 at 5:44



parse CSV as DataFrame/DataSet with Spark 2.x



First initialize SparkSession object by default it will available in shells as spark


SparkSession


spark


val spark = org.apache.spark.sql.SparkSession.builder
.master("local")
.appName("Spark CSV Reader")
.getOrCreate;



Use any one of the follwing way to load CSV as DataFrame/DataSet


DataFrame/DataSet


val df = spark.read
.format("csv")
.option("header", "true") //reading the headers
.option("mode", "DROPMALFORMED")
.load("hdfs:///csv/file/dir/file.csv")


val df = spark.sql("SELECT * FROM csv.`csv/file/path/in/hdfs`")



Dependencies:


"org.apache.spark" % "spark-core_2.11" % 2.0.0,
"org.apache.spark" % "spark-sql_2.11" % 2.0.0,



Spark version < 2.0


val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("mode", "DROPMALFORMED")
.load("csv/file/path");



Dependencies:


"org.apache.spark" % "spark-sql_2.10" % 1.6.0,
"com.databricks" % "spark-csv_2.10" % 1.6.0,
"com.univocity" % "univocity-parsers" % LATEST,





do this session require hive? I am getting hive errors.
– Puneet
Nov 15 '16 at 19:28





No need. Only spark-core_2.11 and spark-sql_2.11 of 2.0.1 version is fine. If possible add the error message.
– mrsrinivas
Nov 16 '16 at 3:20


spark-core_2.11


spark-sql_2.11


2.0.1





can we convert a pipe delimited file to a dataframe?
– Omkar Puttagunta
Mar 23 '17 at 14:30






@OmkarPuttagunta: Yes, off course! try some thing like this spark.read.format("csv").option("delimiter ", "|") ...
– mrsrinivas
Mar 23 '17 at 14:36



spark.read.format("csv").option("delimiter ", "|") ...





The other option for programmatic way is to leave off the .format("csv") and replace .load(... with .csv(.... The option method belongs to the DataFrameReader class as returned by the read method, where the load and csv methods return a dataframe so can't have options tagged on after they are called. This answer is pretty thorough but you should link to the documentation so people can see all the other CSV options available spark.apache.org/docs/latest/api/scala/…*):org.apache.spark.sql.DataFrame
– Davos
Apr 17 at 13:20


programmatic way


.format("csv")


.load(...


.csv(...


option


read


load


csv



With Spark 2.0, following is how you can read CSV


val conf = new SparkConf().setMaster("local[2]").setAppName("my app")
val sc = new SparkContext(conf)
val sparkSession = SparkSession.builder
.config(conf = conf)
.appName("spark session example")
.getOrCreate()

val path = "/Users/xxx/Downloads/usermsg.csv"
val base_df = sparkSession.read.option("header","true").
csv(path)





Is there a difference between spark.read.csv(path) and spark.read.format("csv").load(path)?
– Eric
Jun 5 at 14:49


spark.read.csv(path)


spark.read.format("csv").load(path)



It's for whose Hadoop is 2.6 and Spark is 1.6 and without "databricks" package.


import org.apache.spark.sql.types.StructType,StructField,StringType,IntegerType;
import org.apache.spark.sql.Row;

val csv = sc.textFile("/path/to/file.csv")
val rows = csv.map(line => line.split(",").map(_.trim))
val header = rows.first
val data = rows.filter(_(0) != header(0))
val rdd = data.map(row => Row(row(0),row(1).toInt))

val schema = new StructType()
.add(StructField("id", StringType, true))
.add(StructField("val", IntegerType, true))

val df = sqlContext.createDataFrame(rdd, schema)



In Java 1.8 This code snippet perfectly working to read CSV files



POM.xml


<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>2.0.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-csv_2.10</artifactId>
<version>1.4.0</version>
</dependency>



Java


SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
// create Spark Context
SparkContext context = new SparkContext(conf);
// create spark Session
SparkSession sparkSession = new SparkSession(context);

Dataset<Row> df = sparkSession.read().format("com.databricks.spark.csv").option("header", true).option("inferSchema", true).load("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");

//("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");
System.out.println("========== Print Schema ============");
df.printSchema();
System.out.println("========== Print Data ==============");
df.show();
System.out.println("========== Print title ==============");
df.select("title").show();





While this may be useful to someone. The question has a Scala tag.
– cricket_007
Oct 30 '16 at 6:33



Penny's Spark 2 example is the way to do it in spark2. There's one more trick: have that header generated for you by doing an initial scan of the data, by setting the option inferSchema to true


inferSchema


true



Here, then, assumming that spark is a spark session you have set up, is the operation to load in the CSV index file of all the Landsat images which amazon host on S3.


spark


/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

val csvdata = spark.read.options(Map(
"header" -> "true",
"ignoreLeadingWhiteSpace" -> "true",
"ignoreTrailingWhiteSpace" -> "true",
"timestampFormat" -> "yyyy-MM-dd HH:mm:ss.SSSZZZ",
"inferSchema" -> "true",
"mode" -> "FAILFAST"))
.csv("s3a://landsat-pds/scene_list.gz")



The bad news is: this triggers a scan through the file; for something large like this 20+MB zipped CSV file, that can take 30s over a long haul connection. Bear that in mind: you are better off manually coding up the schema once you've got it coming in.



(code snippet Apache Software License 2.0 licensed to avoid all ambiguity; something I've done as a demo/integration test of S3 integration)





I hadn't seen this csv method or passing a map to options. Agreed always better off providing explicit schema, inferSchema is fine for quick n dirty (aka data science) but terrible for ETL.
– Davos
Feb 15 at 7:15



Default file format is Parquet with spark.read.. and file reading csv that why you are getting the exception. Specify csv format with api you are trying to use



There are a lot of challenges to parsing a CSV file, it keeps adding up if the file size is bigger, if there are non-english/escape/separator/other characters in the column values, that could cause parsing errors.



The magic then is in the options that are used. The ones that worked for me and hope should cover most of the edge cases are in code below:


### Create a Spark Session
spark = SparkSession.builder.master("local").appName("Classify Urls").getOrCreate()

### Note the options that are used. You may have to tweak these in case of error
html_df = spark.read.csv(html_csv_file_path,
header=True,
multiLine=True,
ignoreLeadingWhiteSpace=True,
ignoreTrailingWhiteSpace=True,
encoding="UTF-8",
sep=',',
quote='"',
escape='"',
maxColumns=2,
inferSchema=True)



Hope that helps. For more refer: Using PySpark 2 to read CSV having HTML source code



Note: The code above is from Spark 2 API, where the CSV file reading API comes bundled with built-in packages of Spark installable.



Note: PySpark is a Python wrapper for Spark and shares the same API as Scala/Java.






By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.

Popular posts from this blog

Firebase Auth - with Email and Password - Check user already registered

Dynamically update html content plain JS

How to determine optimal route across keyboard