saya menggunakan https://github.com/databricks/spark-csv , saya mencoba menulis satu CSV, tetapi tidak bisa, itu membuat folder.
Membutuhkan fungsi Scala yang akan mengambil parameter seperti jalur dan nama file dan menulis file CSV itu.
sumber
saya menggunakan https://github.com/databricks/spark-csv , saya mencoba menulis satu CSV, tetapi tidak bisa, itu membuat folder.
Membutuhkan fungsi Scala yang akan mengambil parameter seperti jalur dan nama file dan menulis file CSV itu.
Ini membuat folder dengan banyak file, karena setiap partisi disimpan secara individual. Jika Anda memerlukan satu file output (masih dalam folder), Anda dapat repartition
(lebih disukai jika data upstream besar, tetapi memerlukan pengacakan):
df
.repartition(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save("mydata.csv")
atau coalesce
:
df
.coalesce(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save("mydata.csv")
bingkai data sebelum menyimpan:
Semua data akan ditulis ke mydata.csv/part-00000
. Sebelum Anda menggunakan opsi ini, pastikan Anda memahami apa yang terjadi dan berapa biaya mentransfer semua data ke satu pekerja . Jika Anda menggunakan sistem file terdistribusi dengan replikasi, data akan ditransfer beberapa kali - pertama diambil ke satu pekerja dan kemudian didistribusikan melalui node penyimpanan.
Atau Anda dapat membiarkan kode Anda apa adanya dan menggunakan alat tujuan umum seperti cat
atau HDFSgetmerge
untuk menggabungkan semua bagian setelahnya.
.coalesce(1)
mengatakan beberapa FileNotFoundException di direktori _t sementara. Ini masih bug dalam percikan: issues.apache.org/jira/browse/SPARK-2984coalesce(1)
karena sangat mahal dan biasanya tidak praktis.Jika Anda menjalankan Spark dengan HDFS, saya telah memecahkan masalah dengan menulis file csv secara normal dan memanfaatkan HDFS untuk melakukan penggabungan. Saya melakukan itu di Spark (1.6) secara langsung:
Tidak dapat mengingat di mana saya mempelajari trik ini, tetapi mungkin berhasil untuk Anda.
sumber
Saya mungkin sedikit terlambat untuk permainan di sini, tetapi menggunakan
coalesce(1)
ataurepartition(1)
mungkin bekerja untuk kumpulan data kecil, tetapi kumpulan data yang besar semuanya akan dilemparkan ke satu partisi pada satu node. Hal ini cenderung menimbulkan kesalahan OOM, atau paling banter, memproses dengan lambat.Saya sangat menyarankan agar Anda menggunakan
FileUtil.copyMerge()
fungsi dari Hadoop API. Ini akan menggabungkan output menjadi satu file.EDIT - Ini secara efektif membawa data ke driver daripada node pelaksana.
Coalesce()
akan baik-baik saja jika satu eksekutor memiliki lebih banyak RAM untuk digunakan daripada driver.EDIT 2 :
copyMerge()
dihapus di Hadoop 3.0. Lihat artikel stack overflow berikut untuk informasi lebih lanjut tentang cara bekerja dengan versi terbaru: Bagaimana melakukan CopyMerge di Hadoop 3.0?sumber
Jika Anda menggunakan Databricks dan dapat memasukkan semua data ke dalam RAM pada satu pekerja (dan dengan demikian dapat digunakan
.coalesce(1)
), Anda dapat menggunakan dbfs untuk menemukan dan memindahkan file CSV yang dihasilkan:Jika file Anda tidak cocok dengan RAM pada pekerja, Anda mungkin ingin mempertimbangkan saran chaotic3quilibrium untuk menggunakan FileUtils.copyMerge () . Saya belum melakukan ini, dan belum tahu apakah mungkin atau tidak, misalnya, di S3.
Jawaban ini dibangun di atas jawaban sebelumnya untuk pertanyaan ini serta tes saya sendiri dari cuplikan kode yang disediakan. Saya awalnya mempostingnya ke Databricks dan menerbitkannya kembali di sini.
Dokumentasi terbaik untuk opsi rekursif dbfs rm yang saya temukan ada di forum Databricks .
sumber
Solusi yang berfungsi untuk S3 dimodifikasi dari Minkymorgan.
Cukup lewati jalur direktori yang dipartisi sementara (dengan nama berbeda dari jalur terakhir) sebagai
srcPath
csv / txt terakhir dandestPath
tentukan jugadeleteSource
jika Anda ingin menghapus direktori asli.sumber
percikan ini
df.write()
API akan membuat beberapa file bagian dalam jalan yang diberikan ... untuk kekuatan percikan menulis hanya file digunakan bagian tunggaldf.coalesce(1).write.csv(...)
bukandf.repartition(1).write.csv(...)
sebagai menyatu adalah transformasi sempit sedangkan partisi ulang adalah transformasi lihat lebar Spark - partisi ulang () vs menyatu ()akan membuat folder di jalur file tertentu dengan satu
part-0001-...-c000.csv
penggunaan fileuntuk memiliki nama file yang ramah pengguna
sumber
df.toPandas().to_csv(path)
ini akan menulis csv tunggal dengan nama file pilihan Andapartisi ulang / penggabungan ke 1 partisi sebelum Anda menyimpan (Anda masih mendapatkan folder tetapi akan memiliki satu file bagian di dalamnya)
sumber
kamu bisa memakai
rdd.coalesce(1, true).saveAsTextFile(path)
itu akan menyimpan data sebagai file tunggal di path / part-00000
sumber
Saya menyelesaikan menggunakan pendekatan di bawah ini (ganti nama file hdfs): -
Langkah 1: - (Crate Data Frame dan tulis ke HDFS)
Langkah 2: - (Buat Konfigurasi Hadoop)
Langkah3: - (Dapatkan jalur di jalur folder hdfs)
Step4: - (Dapatkan nama file spark dari folder hdfs)
setp5: - (buat daftar yang bisa diubah scala untuk menyimpan semua nama file dan menambahkannya ke daftar)
Langkah 6: - (filter _SUCESS file order dari daftar nama file scala)
langkah 7: - (ubah daftar skala menjadi string dan tambahkan nama file yang diinginkan ke string folder hdfs lalu terapkan ganti nama)
sumber
Saya menggunakan ini dengan Python untuk mendapatkan satu file:
sumber
Jawaban ini memperluas jawaban yang diterima, memberikan lebih banyak konteks, dan memberikan cuplikan kode yang dapat Anda jalankan di Spark Shell pada mesin Anda.
Lebih banyak konteks tentang jawaban yang diterima
Jawaban yang diterima mungkin memberi Anda kesan bahwa kode sampel menghasilkan satu
mydata.csv
file dan bukan itu masalahnya. Mari kita tunjukkan:Inilah yang dikeluarkan:
NB
mydata.csv
adalah folder dalam jawaban yang diterima - ini bukan file!Cara mengeluarkan file tunggal dengan nama tertentu
Kita bisa menggunakan spark-daria untuk menulis satu
mydata.csv
file.Ini akan menampilkan file sebagai berikut:
Jalur S3
Anda harus melewati jalur s3a ke
DariaWriters.writeSingleFile
untuk menggunakan metode ini di S3:Lihat sini untuk info lebih lanjut.
Menghindari copyMerge
copyMerge telah dihapus dari Hadoop 3.
DariaWriters.writeSingleFile
Penerapannya menggunakanfs.rename
, seperti yang dijelaskan di sini . Spark 3 masih menggunakan Hadoop 2 , jadi implementasi copyMerge akan berfungsi pada tahun 2020. Saya tidak yakin kapan Spark akan meningkatkan ke Hadoop 3, tetapi lebih baik hindari pendekatan copyMerge yang akan menyebabkan kode Anda rusak saat Spark meningkatkan Hadoop.Kode sumber
Cari
DariaWriters
objek dalam kode sumber spark-daria jika Anda ingin memeriksa implementasinya.Implementasi PySpark
Lebih mudah untuk menulis satu file dengan PySpark karena Anda dapat mengonversi DataFrame menjadi Pandas DataFrame yang ditulis sebagai file tunggal secara default.
Batasan
The
DariaWriters.writeSingleFile
Scala pendekatan dandf.toPandas()
Python pendekatan hanya bekerja untuk dataset kecil. Set data yang sangat besar tidak dapat ditulis sebagai file tunggal. Menulis data sebagai satu file tidak optimal dari perspektif kinerja karena data tidak dapat ditulis secara paralel.sumber
Dengan menggunakan Listbuffer kita dapat menyimpan data menjadi satu file:
sumber
Ada satu cara lagi untuk menggunakan Java
sumber