Saya ingin membaca CSV dalam percikan dan mengubahnya sebagai DataFrame dan menyimpannya dalam HDFS dengan df.registerTempTable("table_name")
Saya telah mencoba:
scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")
Kesalahan yang saya dapatkan:
java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Apa perintah yang tepat untuk memuat file CSV sebagai DataFrame di Apache Spark?
Jawaban:
spark-csv adalah bagian dari fungsionalitas Spark inti dan tidak memerlukan pustaka terpisah. Jadi Anda bisa melakukannya sebagai contoh
df = spark.read.format("csv").option("header", "true").load("csvfile.csv")
Dalam scala, (ini berfungsi untuk semua format-in delimiter yang menyebutkan "," untuk csv, "\ t" untuk tsv dll)
val df = sqlContext.read.format("com.databricks.spark.csv") .option("delimiter", ",") .load("csvfile.csv")
sumber
Parsing CSV dan muat sebagai DataFrame / DataSet dengan Spark 2.x
Pertama, inisialisasi
SparkSession
objek secara default itu akan tersedia di shell sebagaispark
val spark = org.apache.spark.sql.SparkSession.builder .master("local") # Change it as per your cluster .appName("Spark CSV Reader") .getOrCreate;
1. Lakukan dengan cara terprogram
val df = spark.read .format("csv") .option("header", "true") //first line in file has headers .option("mode", "DROPMALFORMED") .load("hdfs:///csv/file/dir/file.csv")
Pembaruan: Menambahkan semua opsi dari sini jika tautan akan rusak di masa mendatang
2. Anda juga dapat melakukan cara SQL ini
val df = spark.sql("SELECT * FROM csv.`hdfs:///csv/file/dir/file.csv`")
Dependensi :
"org.apache.spark" % "spark-core_2.11" % 2.0.0, "org.apache.spark" % "spark-sql_2.11" % 2.0.0,
Versi Spark <2.0
val df = sqlContext.read .format("com.databricks.spark.csv") .option("header", "true") .option("mode", "DROPMALFORMED") .load("csv/file/path");
Dependensi:
"org.apache.spark" % "spark-sql_2.10" % 1.6.0, "com.databricks" % "spark-csv_2.10" % 1.6.0, "com.univocity" % "univocity-parsers" % LATEST,
sumber
spark-core_2.11
danspark-sql_2.11
dari2.0.1
versi baik-baik saja. Jika memungkinkan tambahkan pesan kesalahan.spark.read.format("csv").option("delimiter ", "|") ...
programmatic way
adalah meninggalkan.format("csv")
dan mengganti.load(...
dengan.csv(...
. Theoption
metode milik kelas DataFrameReader sebagai dikembalikan olehread
metode, di manaload
dancsv
metode kembali dataframe sehingga tidak dapat memiliki pilihan ditandai setelah mereka disebut. Jawaban ini cukup menyeluruh tetapi Anda harus menautkan ke dokumentasi sehingga orang-orang dapat melihat semua opsi CSV lain yang tersedia spark.apache.org/docs/latest/api/scala/… *): org.apache.spark.sql.DataFrameIni untuk yang Hadoop 2.6 dan Spark 1.6 dan tanpa paket "databricks".
import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType}; import org.apache.spark.sql.Row; val csv = sc.textFile("/path/to/file.csv") val rows = csv.map(line => line.split(",").map(_.trim)) val header = rows.first val data = rows.filter(_(0) != header(0)) val rdd = data.map(row => Row(row(0),row(1).toInt)) val schema = new StructType() .add(StructField("id", StringType, true)) .add(StructField("val", IntegerType, true)) val df = sqlContext.createDataFrame(rdd, schema)
sumber
Dengan Spark 2.0, berikut adalah cara Anda membaca CSV
val conf = new SparkConf().setMaster("local[2]").setAppName("my app") val sc = new SparkContext(conf) val sparkSession = SparkSession.builder .config(conf = conf) .appName("spark session example") .getOrCreate() val path = "/Users/xxx/Downloads/usermsg.csv" val base_df = sparkSession.read.option("header","true"). csv(path)
sumber
spark.read.csv(path)
danspark.read.format("csv").load(path)
?Di Java 1.8 Potongan kode ini berfungsi sempurna untuk membaca file CSV
POM.xml
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.0.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>2.0.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.8</version> </dependency> <dependency> <groupId>com.databricks</groupId> <artifactId>spark-csv_2.10</artifactId> <version>1.4.0</version> </dependency>
Jawa
SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local"); // create Spark Context SparkContext context = new SparkContext(conf); // create spark Session SparkSession sparkSession = new SparkSession(context); Dataset<Row> df = sparkSession.read().format("com.databricks.spark.csv").option("header", true).option("inferSchema", true).load("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv"); //("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv"); System.out.println("========== Print Schema ============"); df.printSchema(); System.out.println("========== Print Data =============="); df.show(); System.out.println("========== Print title =============="); df.select("title").show();
sumber
Ada banyak tantangan untuk mengurai file CSV, itu terus bertambah jika ukuran file lebih besar, jika ada karakter non-inggris / escape / separator / lain dalam nilai kolom, yang dapat menyebabkan kesalahan penguraian.
Keajaiban kemudian ada dalam opsi yang digunakan. Yang berhasil untuk saya dan harapan harus mencakup sebagian besar kasus edge ada dalam kode di bawah ini:
### Create a Spark Session spark = SparkSession.builder.master("local").appName("Classify Urls").getOrCreate() ### Note the options that are used. You may have to tweak these in case of error html_df = spark.read.csv(html_csv_file_path, header=True, multiLine=True, ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True, encoding="UTF-8", sep=',', quote='"', escape='"', maxColumns=2, inferSchema=True)
Semoga membantu. Untuk lebih lanjut lihat: Menggunakan PySpark 2 untuk membaca CSV yang memiliki kode sumber HTML
Catatan: Kode di atas berasal dari Spark 2 API, di mana API membaca file CSV dibundel dengan paket bawaan Spark yang dapat diinstal.
Catatan: PySpark adalah pembungkus Python untuk Spark dan berbagi API yang sama dengan Scala / Java.
sumber
Contoh Penny's Spark 2 adalah cara melakukannya di spark2. Ada satu trik lagi: buat tajuk itu dibuat untuk Anda dengan melakukan pemindaian awal data, dengan menyetel opsi
inferSchema
ketrue
Di sini, dengan asumsi bahwa
spark
sesi percikan yang telah Anda siapkan, adalah operasi untuk memuat dalam file indeks CSV dari semua citra Landsat yang di-host oleh amazon pada S3./* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ val csvdata = spark.read.options(Map( "header" -> "true", "ignoreLeadingWhiteSpace" -> "true", "ignoreTrailingWhiteSpace" -> "true", "timestampFormat" -> "yyyy-MM-dd HH:mm:ss.SSSZZZ", "inferSchema" -> "true", "mode" -> "FAILFAST")) .csv("s3a://landsat-pds/scene_list.gz")
Kabar buruknya adalah: ini memicu pemindaian melalui file; untuk sesuatu yang besar seperti file CSV zip 20 + MB ini, yang dapat memakan waktu 30 detik untuk koneksi jarak jauh. Ingatlah itu: Anda lebih baik mengkodekan skema secara manual setelah Anda mendapatkannya.
(cuplikan kode Lisensi Perangkat Lunak Apache 2.0 berlisensi untuk menghindari semua ambiguitas; sesuatu yang telah saya lakukan sebagai uji demo / integrasi integrasi S3)
sumber
Jika Anda membuat stoples dengan scala 2.11 dan Apache 2.0 atau lebih tinggi.
Tidak perlu membuat
sqlContext
atausparkContext
objek. SebuahSparkSession
benda saja sudah cukup untuk memenuhi semua kebutuhan.Berikut ini adalah kode saya yang berfungsi dengan baik:
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession} import org.apache.log4j.{Level, LogManager, Logger} object driver { def main(args: Array[String]) { val log = LogManager.getRootLogger log.info("**********JAR EXECUTION STARTED**********") val spark = SparkSession.builder().master("local").appName("ValidationFrameWork").getOrCreate() val df = spark.read.format("csv") .option("header", "true") .option("delimiter","|") .option("inferSchema","true") .load("d:/small_projects/spark/test.pos") df.show() } }
Jika Anda menjalankan dalam cluster, ubah saja
.master("local")
ke.master("yarn")
saat mendefinisikansparkBuilder
objekSpark Doc mencakup ini: https://spark.apache.org/docs/2.2.0/sql-programming-guide.html
sumber
Tambahkan dependensi Spark berikut ke file POM:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.2.0</version> </dependency>
// Konfigurasi percikan:
val spark = SparkSession.builder (). master ("local"). appName ("Sample App"). getOrCreate ()
// Baca file csv:
val df = spark.read.option ("header", "true"). csv ("FILE_PATH")
// Menampilkan keluaran
df.show ()
sumber
Dengan Spark 2.4+, jika Anda ingin memuat csv dari direktori lokal, Anda dapat menggunakan 2 sesi dan memuatnya ke sarang. Sesi pertama harus dibuat dengan master () config sebagai "local [*]" dan sesi kedua dengan "yarn" dan Hive diaktifkan.
Yang di bawah ini berhasil untuk saya.
import org.apache.log4j.{Level, Logger} import org.apache.spark._ import org.apache.spark.rdd._ import org.apache.spark.sql._ object testCSV { def main(args: Array[String]) { Logger.getLogger("org").setLevel(Level.ERROR) val spark_local = SparkSession.builder().appName("CSV local files reader").master("local[*]").getOrCreate() import spark_local.implicits._ spark_local.sql("SET").show(100,false) val local_path="/tmp/data/spend_diversity.csv" // Local file val df_local = spark_local.read.format("csv").option("inferSchema","true").load("file://"+local_path) // "file://" is mandatory df_local.show(false) val spark = SparkSession.builder().appName("CSV HDFS").config("spark.sql.warehouse.dir", "/apps/hive/warehouse").enableHiveSupport().getOrCreate() import spark.implicits._ spark.sql("SET").show(100,false) val df = df_local df.createOrReplaceTempView("lcsv") spark.sql(" drop table if exists work.local_csv ") spark.sql(" create table work.local_csv as select * from lcsv ") }
Ketika berlari dengan
spark2-submit --master "yarn" --conf spark.ui.enabled=false testCSV.jar
itu berjalan dengan baik dan membuat meja di sarang.sumber
Untuk membaca dari jalur relatif pada sistem, gunakan metode System.getProperty untuk mendapatkan direktori saat ini dan selanjutnya digunakan untuk memuat file menggunakan jalur relatif.
scala> val path = System.getProperty("user.dir").concat("/../2015-summary.csv") scala> val csvDf = spark.read.option("inferSchema","true").option("header", "true").csv(path) scala> csvDf.take(3)
percikan: 2.4.4 skala: 2.11.12
sumber
Format file default adalah Parquet dengan spark.read .. dan file membaca csv itulah mengapa Anda mendapatkan pengecualian. Tentukan format csv dengan api yang Anda coba gunakan
sumber
Coba ini jika menggunakan spark 2.0+
For non-hdfs file: df = spark.read.csv("file:///csvfile.csv") For hdfs file: df = spark.read.csv("hdfs:///csvfile.csv") For hdfs file (with different delimiter than comma: df = spark.read.option("delimiter","|")csv("hdfs:///csvfile.csv")
Catatan: - ini berfungsi untuk semua file yang dipisahkan. Cukup gunakan opsi ("pembatas",) untuk mengubah nilai.
Semoga bermanfaat.
sumber
Dengan Spark csv bawaan, Anda dapat menyelesaikannya dengan mudah menggunakan objek SparkSession baru untuk Spark> 2.0.
val df = spark. read. option("inferSchema", "false"). option("header","true"). option("mode","DROPMALFORMED"). option("delimiter", ";"). schema(dataSchema). csv("/csv/file/dir/file.csv") df.show() df.printSchema()
Ada berbagai pilihan yang bisa Anda atur.
header
: apakah file Anda menyertakan baris header di bagian atasinferSchema
: apakah Anda ingin menyimpulkan skema secara otomatis atau tidak. Default-nya adalahtrue
. Saya selalu lebih suka memberikan skema untuk memastikan tipe data yang tepat.mode
: mode parsing, PERMISSIVE, DROPMALFORMED atau FAILFASTdelimiter
: untuk menentukan pembatas, defaultnya adalah koma (',')sumber