Tulis satu file CSV menggunakan spark-csv

Jawaban:

168

Ini membuat folder dengan banyak file, karena setiap partisi disimpan secara individual. Jika Anda memerlukan satu file output (masih dalam folder), Anda dapat repartition(lebih disukai jika data upstream besar, tetapi memerlukan pengacakan):

df
   .repartition(1)
   .write.format("com.databricks.spark.csv")
   .option("header", "true")
   .save("mydata.csv")

atau coalesce:

df
   .coalesce(1)
   .write.format("com.databricks.spark.csv")
   .option("header", "true")
   .save("mydata.csv")

bingkai data sebelum menyimpan:

Semua data akan ditulis ke mydata.csv/part-00000. Sebelum Anda menggunakan opsi ini, pastikan Anda memahami apa yang terjadi dan berapa biaya mentransfer semua data ke satu pekerja . Jika Anda menggunakan sistem file terdistribusi dengan replikasi, data akan ditransfer beberapa kali - pertama diambil ke satu pekerja dan kemudian didistribusikan melalui node penyimpanan.

Atau Anda dapat membiarkan kode Anda apa adanya dan menggunakan alat tujuan umum seperti catatau HDFSgetmerge untuk menggabungkan semua bagian setelahnya.

nol323
sumber
6
Anda juga dapat menggunakan coalesce: df.coalesce (1) .write.format ("com.databricks.spark.csv") .option ("header", "true") .save ("mydata.csv")
ravi
spark 1.6 melontarkan kesalahan saat kami menyetelnya .coalesce(1)mengatakan beberapa FileNotFoundException di direktori _t sementara. Ini masih bug dalam percikan: issues.apache.org/jira/browse/SPARK-2984
Harsha
@Harsha Tidak mungkin. Hasil yang sederhana coalesce(1)karena sangat mahal dan biasanya tidak praktis.
zero323
Setuju @ zero323, tetapi jika Anda memiliki persyaratan khusus untuk menggabungkan ke dalam satu file, ini masih dapat dilakukan mengingat Anda memiliki sumber daya dan waktu yang cukup.
Harsha
2
@ Harsha saya tidak mengatakan tidak ada. Jika Anda menyetel GC dengan benar, ini akan berfungsi dengan baik tetapi hanya membuang-buang waktu dan kemungkinan besar akan mengganggu kinerja secara keseluruhan. Jadi secara pribadi saya tidak melihat alasan untuk repot-repot terutama karena sangat mudah untuk menggabungkan file di luar Spark tanpa mengkhawatirkan penggunaan memori sama sekali.
nol323
36

Jika Anda menjalankan Spark dengan HDFS, saya telah memecahkan masalah dengan menulis file csv secara normal dan memanfaatkan HDFS untuk melakukan penggabungan. Saya melakukan itu di Spark (1.6) secara langsung:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._

def merge(srcPath: String, dstPath: String): Unit =  {
   val hadoopConfig = new Configuration()
   val hdfs = FileSystem.get(hadoopConfig)
   FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null) 
   // the "true" setting deletes the source files once they are merged into the new output
}


val newData = << create your dataframe >>


val outputfile = "/user/feeds/project/outputs/subject"  
var filename = "myinsights"
var outputFileName = outputfile + "/temp_" + filename 
var mergedFileName = outputfile + "/merged_" + filename
var mergeFindGlob  = outputFileName

    newData.write
        .format("com.databricks.spark.csv")
        .option("header", "false")
        .mode("overwrite")
        .save(outputFileName)
    merge(mergeFindGlob, mergedFileName )
    newData.unpersist()

Tidak dapat mengingat di mana saya mempelajari trik ini, tetapi mungkin berhasil untuk Anda.

Minkymorgan
sumber
Saya belum mencobanya - dan curiga itu mungkin tidak langsung.
Minkymorgan
1
Terima kasih. Saya telah menambahkan jawaban yang berfungsi di Databricks
Josiah Yoder
@Minkymorgan Saya memiliki masalah yang sama tetapi tidak dapat melakukannya dengan benar .. Bisakah Anda melihat pertanyaan ini stackoverflow.com/questions/46812388/…
SUDARSHAN
4
@SUDARSHAN Fungsi saya di atas bekerja dengan data yang tidak terkompresi. Dalam contoh Anda, saya pikir Anda menggunakan kompresi gzip saat Anda menulis file - dan kemudian setelah - mencoba menggabungkan ini bersama-sama yang gagal. Itu tidak akan berhasil, karena Anda tidak dapat menggabungkan file gzip menjadi satu. Gzip bukanlah algoritma Kompresi yang Dapat Dipisahkan, jadi tentunya bukan "dapat digabung". Anda dapat menguji kompresi "snappy" atau "bz2" - tetapi menurut firasat ini akan gagal juga saat digabungkan. Mungkin yang terbaik adalah menghapus kompresi, menggabungkan file mentah, lalu mengompres menggunakan codec yang dapat dipisahkan.
Minkymorgan
dan bagaimana jika saya ingin mempertahankan tajuk? itu menggandakan untuk setiap bagian file
Normal
32

Saya mungkin sedikit terlambat untuk permainan di sini, tetapi menggunakan coalesce(1)ataurepartition(1) mungkin bekerja untuk kumpulan data kecil, tetapi kumpulan data yang besar semuanya akan dilemparkan ke satu partisi pada satu node. Hal ini cenderung menimbulkan kesalahan OOM, atau paling banter, memproses dengan lambat.

Saya sangat menyarankan agar Anda menggunakan FileUtil.copyMerge() fungsi dari Hadoop API. Ini akan menggabungkan output menjadi satu file.

EDIT - Ini secara efektif membawa data ke driver daripada node pelaksana.Coalesce()akan baik-baik saja jika satu eksekutor memiliki lebih banyak RAM untuk digunakan daripada driver.

EDIT 2 : copyMerge()dihapus di Hadoop 3.0. Lihat artikel stack overflow berikut untuk informasi lebih lanjut tentang cara bekerja dengan versi terbaru: Bagaimana melakukan CopyMerge di Hadoop 3.0?

etspaceman
sumber
Adakah pemikiran tentang cara mendapatkan csv dengan baris header dengan cara ini? Tidak ingin file menghasilkan header, karena itu akan menyelingi header di seluruh file, satu untuk setiap partisi.
nojo
Ada opsi yang saya gunakan di masa lalu yang didokumentasikan di sini: markhneedham.com/blog/2014/11/30/…
etspaceman
@etspink Keren. Saya masih belum memiliki cara yang baik untuk melakukan ini, sayangnya, karena saya harus dapat melakukan ini di Java (atau Spark, tetapi dengan cara yang tidak menghabiskan banyak memori dan dapat bekerja dengan file besar) . Saya masih tidak percaya mereka menghapus panggilan API ini ... ini adalah penggunaan yang sangat umum meskipun tidak benar-benar digunakan oleh aplikasi lain di ekosistem Hadoop.
woot
20

Jika Anda menggunakan Databricks dan dapat memasukkan semua data ke dalam RAM pada satu pekerja (dan dengan demikian dapat digunakan .coalesce(1)), Anda dapat menggunakan dbfs untuk menemukan dan memindahkan file CSV yang dihasilkan:

val fileprefix= "/mnt/aws/path/file-prefix"

dataset
  .coalesce(1)       
  .write             
//.mode("overwrite") // I usually don't use this, but you may want to.
  .option("header", "true")
  .option("delimiter","\t")
  .csv(fileprefix+".tmp")

val partition_path = dbutils.fs.ls(fileprefix+".tmp/")
     .filter(file=>file.name.endsWith(".csv"))(0).path

dbutils.fs.cp(partition_path,fileprefix+".tab")

dbutils.fs.rm(fileprefix+".tmp",recurse=true)

Jika file Anda tidak cocok dengan RAM pada pekerja, Anda mungkin ingin mempertimbangkan saran chaotic3quilibrium untuk menggunakan FileUtils.copyMerge () . Saya belum melakukan ini, dan belum tahu apakah mungkin atau tidak, misalnya, di S3.

Jawaban ini dibangun di atas jawaban sebelumnya untuk pertanyaan ini serta tes saya sendiri dari cuplikan kode yang disediakan. Saya awalnya mempostingnya ke Databricks dan menerbitkannya kembali di sini.

Dokumentasi terbaik untuk opsi rekursif dbfs rm yang saya temukan ada di forum Databricks .

Josiah Yoder
sumber
3

Solusi yang berfungsi untuk S3 dimodifikasi dari Minkymorgan.

Cukup lewati jalur direktori yang dipartisi sementara (dengan nama berbeda dari jalur terakhir) sebagai srcPathcsv / txt terakhir dan destPath tentukan juga deleteSourcejika Anda ingin menghapus direktori asli.

/**
* Merges multiple partitions of spark text file output into single file. 
* @param srcPath source directory of partitioned files
* @param dstPath output path of individual path
* @param deleteSource whether or not to delete source directory after merging
* @param spark sparkSession
*/
def mergeTextFiles(srcPath: String, dstPath: String, deleteSource: Boolean): Unit =  {
  import org.apache.hadoop.fs.FileUtil
  import java.net.URI
  val config = spark.sparkContext.hadoopConfiguration
  val fs: FileSystem = FileSystem.get(new URI(srcPath), config)
  FileUtil.copyMerge(
    fs, new Path(srcPath), fs, new Path(dstPath), deleteSource, config, null
  )
}
John Zhu
sumber
Implementasi copyMerge mencantumkan semua file dan mengulanginya, ini tidak aman di s3. jika Anda menulis file Anda dan kemudian mendaftarnya - ini tidak menjamin bahwa semuanya akan dicantumkan. lihat [ini | docs.aws.amazon.com/AmazonS3/latest/dev/…
LiranBo
3

percikan ini df.write()API akan membuat beberapa file bagian dalam jalan yang diberikan ... untuk kekuatan percikan menulis hanya file digunakan bagian tunggal df.coalesce(1).write.csv(...)bukan df.repartition(1).write.csv(...)sebagai menyatu adalah transformasi sempit sedangkan partisi ulang adalah transformasi lihat lebar Spark - partisi ulang () vs menyatu ()

df.coalesce(1).write.csv(filepath,header=True) 

akan membuat folder di jalur file tertentu dengan satu part-0001-...-c000.csvpenggunaan file

cat filepath/part-0001-...-c000.csv > filename_you_want.csv 

untuk memiliki nama file yang ramah pengguna

pprasad009
sumber
sebagai alternatif jika kerangka data tidak terlalu besar (~ GB atau dapat muat dalam memori driver) Anda juga dapat menggunakan df.toPandas().to_csv(path)ini akan menulis csv tunggal dengan nama file pilihan Anda
pprasad009
1
Ugh, sangat frustasi bagaimana ini hanya bisa dilakukan dengan mengonversi ke panda. Seberapa sulit untuk menulis file tanpa UUID di dalamnya?
ijoseph
2

partisi ulang / penggabungan ke 1 partisi sebelum Anda menyimpan (Anda masih mendapatkan folder tetapi akan memiliki satu file bagian di dalamnya)

Arnon Rotem-Gal-Oz
sumber
2

kamu bisa memakai rdd.coalesce(1, true).saveAsTextFile(path)

itu akan menyimpan data sebagai file tunggal di path / part-00000

Gourav
sumber
1
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.sql.{DataFrame,SaveMode,SparkSession}
import org.apache.spark.sql.functions._

Saya menyelesaikan menggunakan pendekatan di bawah ini (ganti nama file hdfs): -

Langkah 1: - (Crate Data Frame dan tulis ke HDFS)

df.coalesce(1).write.format("csv").option("header", "false").mode(SaveMode.Overwrite).save("/hdfsfolder/blah/")

Langkah 2: - (Buat Konfigurasi Hadoop)

val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)

Langkah3: - (Dapatkan jalur di jalur folder hdfs)

val pathFiles = new Path("/hdfsfolder/blah/")

Step4: - (Dapatkan nama file spark dari folder hdfs)

val fileNames = hdfs.listFiles(pathFiles, false)
println(fileNames)

setp5: - (buat daftar yang bisa diubah scala untuk menyimpan semua nama file dan menambahkannya ke daftar)

    var fileNamesList = scala.collection.mutable.MutableList[String]()
    while (fileNames.hasNext) {
      fileNamesList += fileNames.next().getPath.getName
    }
    println(fileNamesList)

Langkah 6: - (filter _SUCESS file order dari daftar nama file scala)

    // get files name which are not _SUCCESS
    val partFileName = fileNamesList.filterNot(filenames => filenames == "_SUCCESS")

langkah 7: - (ubah daftar skala menjadi string dan tambahkan nama file yang diinginkan ke string folder hdfs lalu terapkan ganti nama)

val partFileSourcePath = new Path("/yourhdfsfolder/"+ partFileName.mkString(""))
    val desiredCsvTargetPath = new Path(/yourhdfsfolder/+ "op_"+ ".csv")
    hdfs.rename(partFileSourcePath , desiredCsvTargetPath)
sri hari kali charan Tummala
sumber
1

Saya menggunakan ini dengan Python untuk mendapatkan satu file:

df.toPandas().to_csv("/tmp/my.csv", sep=',', header=True, index=False)
Kees C. Bakker
sumber
1

Jawaban ini memperluas jawaban yang diterima, memberikan lebih banyak konteks, dan memberikan cuplikan kode yang dapat Anda jalankan di Spark Shell pada mesin Anda.

Lebih banyak konteks tentang jawaban yang diterima

Jawaban yang diterima mungkin memberi Anda kesan bahwa kode sampel menghasilkan satu mydata.csvfile dan bukan itu masalahnya. Mari kita tunjukkan:

val df = Seq("one", "two", "three").toDF("num")
df
  .repartition(1)
  .write.csv(sys.env("HOME")+ "/Documents/tmp/mydata.csv")

Inilah yang dikeluarkan:

Documents/
  tmp/
    mydata.csv/
      _SUCCESS
      part-00000-b3700504-e58b-4552-880b-e7b52c60157e-c000.csv

NB mydata.csvadalah folder dalam jawaban yang diterima - ini bukan file!

Cara mengeluarkan file tunggal dengan nama tertentu

Kita bisa menggunakan spark-daria untuk menulis satu mydata.csvfile.

import com.github.mrpowers.spark.daria.sql.DariaWriters
DariaWriters.writeSingleFile(
    df = df,
    format = "csv",
    sc = spark.sparkContext,
    tmpFolder = sys.env("HOME") + "/Documents/better/staging",
    filename = sys.env("HOME") + "/Documents/better/mydata.csv"
)

Ini akan menampilkan file sebagai berikut:

Documents/
  better/
    mydata.csv

Jalur S3

Anda harus melewati jalur s3a ke DariaWriters.writeSingleFile untuk menggunakan metode ini di S3:

DariaWriters.writeSingleFile(
    df = df,
    format = "csv",
    sc = spark.sparkContext,
    tmpFolder = "s3a://bucket/data/src",
    filename = "s3a://bucket/data/dest/my_cool_file.csv"
)

Lihat sini untuk info lebih lanjut.

Menghindari copyMerge

copyMerge telah dihapus dari Hadoop 3. DariaWriters.writeSingleFilePenerapannya menggunakan fs.rename, seperti yang dijelaskan di sini . Spark 3 masih menggunakan Hadoop 2 , jadi implementasi copyMerge akan berfungsi pada tahun 2020. Saya tidak yakin kapan Spark akan meningkatkan ke Hadoop 3, tetapi lebih baik hindari pendekatan copyMerge yang akan menyebabkan kode Anda rusak saat Spark meningkatkan Hadoop.

Kode sumber

Cari DariaWriters objek dalam kode sumber spark-daria jika Anda ingin memeriksa implementasinya.

Implementasi PySpark

Lebih mudah untuk menulis satu file dengan PySpark karena Anda dapat mengonversi DataFrame menjadi Pandas DataFrame yang ditulis sebagai file tunggal secara default.

from pathlib import Path
home = str(Path.home())
data = [
    ("jellyfish", "JALYF"),
    ("li", "L"),
    ("luisa", "LAS"),
    (None, None)
]
df = spark.createDataFrame(data, ["word", "expected"])
df.toPandas().to_csv(home + "/Documents/tmp/mydata-from-pyspark.csv", sep=',', header=True, index=False)

Batasan

The DariaWriters.writeSingleFileScala pendekatan dan df.toPandas()Python pendekatan hanya bekerja untuk dataset kecil. Set data yang sangat besar tidak dapat ditulis sebagai file tunggal. Menulis data sebagai satu file tidak optimal dari perspektif kinerja karena data tidak dapat ditulis secara paralel.

Powers
sumber
0

Dengan menggunakan Listbuffer kita dapat menyimpan data menjadi satu file:

import java.io.FileWriter
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.ListBuffer
    val text = spark.read.textFile("filepath")
    var data = ListBuffer[String]()
    for(line:String <- text.collect()){
      data += line
    }
    val writer = new FileWriter("filepath")
    data.foreach(line => writer.write(line.toString+"\n"))
    writer.close()
siddhu salvi
sumber
-2

Ada satu cara lagi untuk menggunakan Java

import java.io._

def printToFile(f: java.io.File)(op: java.io.PrintWriter => Unit) 
  {
     val p = new java.io.PrintWriter(f);  
     try { op(p) } 
     finally { p.close() }
  } 

printToFile(new File("C:/TEMP/df.csv")) { p => df.collect().foreach(p.println)}
Sergio Alyoshkin
sumber
nama 'benar' tidak ditentukan
Arron