Spark - memuat file CSV sebagai DataFrame?

147

Saya ingin membaca CSV dalam percikan dan mengubahnya sebagai DataFrame dan menyimpannya dalam HDFS dengan df.registerTempTable("table_name")

Saya telah mencoba:

scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")

Kesalahan yang saya dapatkan:

java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
    at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
    at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
    at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
    at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
    at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
    at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
    at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Apa perintah yang tepat untuk memuat file CSV sebagai DataFrame di Apache Spark?

Donbeo
sumber
periksa tautan
mrsrinivas

Jawaban:

190

spark-csv adalah bagian dari fungsionalitas Spark inti dan tidak memerlukan pustaka terpisah. Jadi Anda bisa melakukannya sebagai contoh

df = spark.read.format("csv").option("header", "true").load("csvfile.csv")

Dalam scala, (ini berfungsi untuk semua format-in delimiter yang menyebutkan "," untuk csv, "\ t" untuk tsv dll)

val df = sqlContext.read.format("com.databricks.spark.csv") .option("delimiter", ",") .load("csvfile.csv")

Shyamendra Solanki
sumber
169

Parsing CSV dan muat sebagai DataFrame / DataSet dengan Spark 2.x

Pertama, inisialisasi SparkSessionobjek secara default itu akan tersedia di shell sebagaispark

val spark = org.apache.spark.sql.SparkSession.builder
        .master("local") # Change it as per your cluster
        .appName("Spark CSV Reader")
        .getOrCreate;

Gunakan salah satu cara berikut untuk memuat CSV sebagai DataFrame/DataSet

1. Lakukan dengan cara terprogram

 val df = spark.read
         .format("csv")
         .option("header", "true") //first line in file has headers
         .option("mode", "DROPMALFORMED")
         .load("hdfs:///csv/file/dir/file.csv")

Pembaruan: Menambahkan semua opsi dari sini jika tautan akan rusak di masa mendatang

  • path : lokasi file. Mirip dengan Spark dapat menerima ekspresi globbing Hadoop standar.
  • header : jika disetel ke true, baris pertama file akan digunakan untuk memberi nama kolom dan tidak akan disertakan dalam data. Semua jenis akan diasumsikan string. Nilai defaultnya salah.
  • pembatas : secara default kolom dipisahkan menggunakan, tetapi pembatas dapat diatur ke karakter apa pun
  • kutipan : secara default karakter kutipan adalah ", tetapi dapat disetel ke karakter apa pun. Pemisah di dalam tanda kutip akan diabaikan
  • escape : secara default, karakter escape adalah, tetapi dapat disetel ke karakter apa pun. Karakter kutipan yang lolos diabaikan
  • parserLib : secara default, " commons " yang dapat disetel ke " univocity " untuk menggunakan library tersebut untuk penguraian CSV.
  • mode : menentukan mode parsing. Secara default ini PERMISSIVE. Nilai yang memungkinkan adalah:
    • PERMISSIVE : mencoba mengurai semua baris: null dimasukkan untuk token yang hilang dan token tambahan diabaikan.
    • DROPMALFORMED : menghapus baris yang memiliki token lebih sedikit atau lebih dari yang diharapkan atau token yang tidak cocok dengan skema
    • FAILFAST : dibatalkan dengan RuntimeException jika menemukan baris charset yang salah format: default-nya ke 'UTF-8' tetapi dapat disetel ke nama charset valid lainnya
  • inferSchema : secara otomatis menyimpulkan jenis kolom. Ini membutuhkan satu lompatan ekstra atas data dan salah dengan komentar default: lewati baris yang dimulai dengan karakter ini. Default-nya adalah "#". Nonaktifkan komentar dengan menyetelnya ke null.
  • nullValue : menentukan string yang menunjukkan nilai null, bidang apa pun yang cocok dengan string ini akan disetel sebagai null di DataFrame
  • dateFormat : menentukan string yang menunjukkan format tanggal yang akan digunakan saat membaca tanggal atau cap waktu. Format tanggal kustom mengikuti format di java.text.SimpleDateFormat. Ini berlaku untuk DateType dan TimestampType. Secara default, ini adalah null yang berarti mencoba mengurai waktu dan tanggal dengan java.sql.Timestamp.valueOf () dan java.sql.Date.valueOf ().

2. Anda juga dapat melakukan cara SQL ini

 val df = spark.sql("SELECT * FROM csv.`hdfs:///csv/file/dir/file.csv`")

Dependensi :

 "org.apache.spark" % "spark-core_2.11" % 2.0.0,
 "org.apache.spark" % "spark-sql_2.11" % 2.0.0,

Versi Spark <2.0

val df = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true") 
    .option("mode", "DROPMALFORMED")
    .load("csv/file/path"); 

Dependensi:

"org.apache.spark" % "spark-sql_2.10" % 1.6.0,
"com.databricks" % "spark-csv_2.10" % 1.6.0,
"com.univocity" % "univocity-parsers" % LATEST,
Nyonya
sumber
apakah sesi ini membutuhkan sarang? Saya mendapatkan kesalahan sarang.
Puneet
2
Tidak dibutuhkan. Hanya spark-core_2.11dan spark-sql_2.11dari 2.0.1versi baik-baik saja. Jika memungkinkan tambahkan pesan kesalahan.
mrsrinivas
1
dapatkah kita mengonversi file yang dipisahkan tanda pipa ke kerangka data?
Omkar Puttagunta
3
@OmkarPuttagunta: Ya, tentu saja! coba beberapa hal seperti ini spark.read.format("csv").option("delimiter ", "|") ...
mrsrinivas
1
Pilihan lain untuk programmatic wayadalah meninggalkan .format("csv")dan mengganti .load(...dengan .csv(.... The optionmetode milik kelas DataFrameReader sebagai dikembalikan oleh readmetode, di mana loaddan csvmetode kembali dataframe sehingga tidak dapat memiliki pilihan ditandai setelah mereka disebut. Jawaban ini cukup menyeluruh tetapi Anda harus menautkan ke dokumentasi sehingga orang-orang dapat melihat semua opsi CSV lain yang tersedia spark.apache.org/docs/latest/api/scala/… *): org.apache.spark.sql.DataFrame
Davos
17

Ini untuk yang Hadoop 2.6 dan Spark 1.6 dan tanpa paket "databricks".

import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType};
import org.apache.spark.sql.Row;

val csv = sc.textFile("/path/to/file.csv")
val rows = csv.map(line => line.split(",").map(_.trim))
val header = rows.first
val data = rows.filter(_(0) != header(0))
val rdd = data.map(row => Row(row(0),row(1).toInt))

val schema = new StructType()
    .add(StructField("id", StringType, true))
    .add(StructField("val", IntegerType, true))

val df = sqlContext.createDataFrame(rdd, schema)
Eric Yiwei Liu
sumber
13

Dengan Spark 2.0, berikut adalah cara Anda membaca CSV

val conf = new SparkConf().setMaster("local[2]").setAppName("my app")
val sc = new SparkContext(conf)
val sparkSession = SparkSession.builder
  .config(conf = conf)
  .appName("spark session example")
  .getOrCreate()

val path = "/Users/xxx/Downloads/usermsg.csv"
val base_df = sparkSession.read.option("header","true").
  csv(path)
penny chan
sumber
7
Apakah ada perbedaan antara spark.read.csv(path)dan spark.read.format("csv").load(path)?
Eric
8

Di Java 1.8 Potongan kode ini berfungsi sempurna untuk membaca file CSV

POM.xml

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.10</artifactId>
    <version>2.0.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.11.8</version>
</dependency>
<dependency>
    <groupId>com.databricks</groupId>
    <artifactId>spark-csv_2.10</artifactId>
    <version>1.4.0</version>
</dependency>

Jawa

SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
// create Spark Context
SparkContext context = new SparkContext(conf);
// create spark Session
SparkSession sparkSession = new SparkSession(context);

Dataset<Row> df = sparkSession.read().format("com.databricks.spark.csv").option("header", true).option("inferSchema", true).load("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");

        //("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");
System.out.println("========== Print Schema ============");
df.printSchema();
System.out.println("========== Print Data ==============");
df.show();
System.out.println("========== Print title ==============");
df.select("title").show();
Rajeev Rathor
sumber
Meskipun ini mungkin berguna bagi seseorang. Pertanyaan tersebut memiliki tag Scala.
OneCricketeer
5

Ada banyak tantangan untuk mengurai file CSV, itu terus bertambah jika ukuran file lebih besar, jika ada karakter non-inggris / escape / separator / lain dalam nilai kolom, yang dapat menyebabkan kesalahan penguraian.

Keajaiban kemudian ada dalam opsi yang digunakan. Yang berhasil untuk saya dan harapan harus mencakup sebagian besar kasus edge ada dalam kode di bawah ini:

### Create a Spark Session
spark = SparkSession.builder.master("local").appName("Classify Urls").getOrCreate()

### Note the options that are used. You may have to tweak these in case of error
html_df = spark.read.csv(html_csv_file_path, 
                         header=True, 
                         multiLine=True, 
                         ignoreLeadingWhiteSpace=True, 
                         ignoreTrailingWhiteSpace=True, 
                         encoding="UTF-8",
                         sep=',',
                         quote='"', 
                         escape='"',
                         maxColumns=2,
                         inferSchema=True)

Semoga membantu. Untuk lebih lanjut lihat: Menggunakan PySpark 2 untuk membaca CSV yang memiliki kode sumber HTML

Catatan: Kode di atas berasal dari Spark 2 API, di mana API membaca file CSV dibundel dengan paket bawaan Spark yang dapat diinstal.

Catatan: PySpark adalah pembungkus Python untuk Spark dan berbagi API yang sama dengan Scala / Java.

karthiks
sumber
Terima kasih banyak, Anda menyelamatkan hidup saya: D
Khubaib Raza
4

Contoh Penny's Spark 2 adalah cara melakukannya di spark2. Ada satu trik lagi: buat tajuk itu dibuat untuk Anda dengan melakukan pemindaian awal data, dengan menyetel opsi inferSchemaketrue

Di sini, dengan asumsi bahwa sparksesi percikan yang telah Anda siapkan, adalah operasi untuk memuat dalam file indeks CSV dari semua citra Landsat yang di-host oleh amazon pada S3.

  /*
   * Licensed to the Apache Software Foundation (ASF) under one or more
   * contributor license agreements.  See the NOTICE file distributed with
   * this work for additional information regarding copyright ownership.
   * The ASF licenses this file to You under the Apache License, Version 2.0
   * (the "License"); you may not use this file except in compliance with
   * the License.  You may obtain a copy of the License at
   *
   *    http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */

val csvdata = spark.read.options(Map(
    "header" -> "true",
    "ignoreLeadingWhiteSpace" -> "true",
    "ignoreTrailingWhiteSpace" -> "true",
    "timestampFormat" -> "yyyy-MM-dd HH:mm:ss.SSSZZZ",
    "inferSchema" -> "true",
    "mode" -> "FAILFAST"))
  .csv("s3a://landsat-pds/scene_list.gz")

Kabar buruknya adalah: ini memicu pemindaian melalui file; untuk sesuatu yang besar seperti file CSV zip 20 + MB ini, yang dapat memakan waktu 30 detik untuk koneksi jarak jauh. Ingatlah itu: Anda lebih baik mengkodekan skema secara manual setelah Anda mendapatkannya.

(cuplikan kode Lisensi Perangkat Lunak Apache 2.0 berlisensi untuk menghindari semua ambiguitas; sesuatu yang telah saya lakukan sebagai uji demo / integrasi integrasi S3)

stevel
sumber
Saya belum pernah melihat metode csv ini atau meneruskan peta ke opsi. Setuju selalu lebih baik memberikan skema eksplisit, inferSchema baik-baik saja untuk cepat dan kotor (alias ilmu data) tetapi buruk untuk ETL.
Davos
2

Jika Anda membuat stoples dengan scala 2.11 dan Apache 2.0 atau lebih tinggi.

Tidak perlu membuat sqlContextatau sparkContextobjek. Sebuah SparkSessionbenda saja sudah cukup untuk memenuhi semua kebutuhan.

Berikut ini adalah kode saya yang berfungsi dengan baik:

import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}
import org.apache.log4j.{Level, LogManager, Logger}

object driver {

  def main(args: Array[String]) {

    val log = LogManager.getRootLogger

    log.info("**********JAR EXECUTION STARTED**********")

    val spark = SparkSession.builder().master("local").appName("ValidationFrameWork").getOrCreate()
    val df = spark.read.format("csv")
      .option("header", "true")
      .option("delimiter","|")
      .option("inferSchema","true")
      .load("d:/small_projects/spark/test.pos")
    df.show()
  }
}

Jika Anda menjalankan dalam cluster, ubah saja .master("local")ke .master("yarn")saat mendefinisikan sparkBuilderobjek

Spark Doc mencakup ini: https://spark.apache.org/docs/2.2.0/sql-programming-guide.html

swapnil shashank
sumber
Ini sama dengan jawaban yang ada
mrsrinivas
1

Tambahkan dependensi Spark berikut ke file POM:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.2.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.2.0</version>
</dependency>

// Konfigurasi percikan:

val spark = SparkSession.builder (). master ("local"). appName ("Sample App"). getOrCreate ()

// Baca file csv:

val df = spark.read.option ("header", "true"). csv ("FILE_PATH")

// Menampilkan keluaran

df.show ()

S_K
sumber
1

Dengan Spark 2.4+, jika Anda ingin memuat csv dari direktori lokal, Anda dapat menggunakan 2 sesi dan memuatnya ke sarang. Sesi pertama harus dibuat dengan master () config sebagai "local [*]" dan sesi kedua dengan "yarn" dan Hive diaktifkan.

Yang di bawah ini berhasil untuk saya.

import org.apache.log4j.{Level, Logger}
import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.sql._

object testCSV { 

  def main(args: Array[String]) {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val spark_local = SparkSession.builder().appName("CSV local files reader").master("local[*]").getOrCreate()

    import spark_local.implicits._
    spark_local.sql("SET").show(100,false)
    val local_path="/tmp/data/spend_diversity.csv"  // Local file
    val df_local = spark_local.read.format("csv").option("inferSchema","true").load("file://"+local_path) // "file://" is mandatory
    df_local.show(false)

    val spark = SparkSession.builder().appName("CSV HDFS").config("spark.sql.warehouse.dir", "/apps/hive/warehouse").enableHiveSupport().getOrCreate()

    import spark.implicits._
    spark.sql("SET").show(100,false)
    val df = df_local
    df.createOrReplaceTempView("lcsv")
    spark.sql(" drop table if exists work.local_csv ")
    spark.sql(" create table work.local_csv as select * from lcsv ")

   }

Ketika berlari dengan spark2-submit --master "yarn" --conf spark.ui.enabled=false testCSV.jaritu berjalan dengan baik dan membuat meja di sarang.

stack0114106
sumber
0

Untuk membaca dari jalur relatif pada sistem, gunakan metode System.getProperty untuk mendapatkan direktori saat ini dan selanjutnya digunakan untuk memuat file menggunakan jalur relatif.

scala> val path = System.getProperty("user.dir").concat("/../2015-summary.csv")
scala> val csvDf = spark.read.option("inferSchema","true").option("header", "true").csv(path)
scala> csvDf.take(3)

percikan: 2.4.4 skala: 2.11.12

Venkat Kotra
sumber
-1

Format file default adalah Parquet dengan spark.read .. dan file membaca csv itulah mengapa Anda mendapatkan pengecualian. Tentukan format csv dengan api yang Anda coba gunakan

tazak
sumber
-1

Coba ini jika menggunakan spark 2.0+

For non-hdfs file:
df = spark.read.csv("file:///csvfile.csv")


For hdfs file:
df = spark.read.csv("hdfs:///csvfile.csv")

For hdfs file (with different delimiter than comma:
df = spark.read.option("delimiter","|")csv("hdfs:///csvfile.csv")

Catatan: - ini berfungsi untuk semua file yang dipisahkan. Cukup gunakan opsi ("pembatas",) untuk mengubah nilai.

Semoga bermanfaat.

Ajay Ahuja
sumber
Ini sama dengan jawaban yang ada
mrsrinivas
-1

Dengan Spark csv bawaan, Anda dapat menyelesaikannya dengan mudah menggunakan objek SparkSession baru untuk Spark> 2.0.

val df = spark.
        read.
        option("inferSchema", "false").
        option("header","true").
        option("mode","DROPMALFORMED").
        option("delimiter", ";").
        schema(dataSchema).
        csv("/csv/file/dir/file.csv")
df.show()
df.printSchema()

Ada berbagai pilihan yang bisa Anda atur.

  • header: apakah file Anda menyertakan baris header di bagian atas
  • inferSchema: apakah Anda ingin menyimpulkan skema secara otomatis atau tidak. Default-nya adalah true. Saya selalu lebih suka memberikan skema untuk memastikan tipe data yang tepat.
  • mode: mode parsing, PERMISSIVE, DROPMALFORMED atau FAILFAST
  • delimiter: untuk menentukan pembatas, defaultnya adalah koma (',')
Piyush Patel
sumber