Saya memiliki aplikasi streaming percikan yang menghasilkan kumpulan data untuk setiap menit. Saya perlu menyimpan / menimpa hasil dari data yang diproses.
Ketika saya mencoba menimpa dataset org.apache.hadoop.mapred.FileAlreadyExistsException menghentikan eksekusi.
Saya mengatur properti Spark set("spark.files.overwrite","true")
, tetapi tidak berhasil.
Bagaimana cara menimpa atau menghapus file dari percikan?
apache-spark
Vijay Innamuri
sumber
sumber
set("spark.files.overwrite","true")
bekerja hanya untuk file yang ditambahkan pikirspark.addFile()
Jawaban:
UPDATE: Sarankan penggunaan
Dataframes
, plus sesuatu seperti... .write.mode(SaveMode.Overwrite) ...
.Germo yang berguna:
Untuk versi yang lebih lama coba
Di 1.1.0 Anda dapat menyetel pengaturan conf menggunakan skrip spark-submit dengan flag --conf.
PERINGATAN (versi lama): Menurut @piggybox, ada bug di Spark yang hanya akan menimpa file yang diperlukan untuk menulis
part-
file itu, file lain tidak akan dihapus.sumber
Spark 1.4
:df.write.mode(SaveMode.Overwrite).parquet(path)
df.write.mode(mode: String).parquet(path)
mode Di mana: String bisa menjadi: "overwrite", "append", "ignore", "error".sejak
df.save(path, source, mode)
tidak digunakan lagi, ( http://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.sql.DataFrame )gunakan di
df.write.format(source).mode("overwrite").save(path)
mana df.write adalah DataFrameWriter
'sumber' dapat berupa ("com.databricks.spark.avro" | "parket" | "json")
sumber
source
bisa jugacsv
Dokumentasi untuk parameter
spark.files.overwrite
mengatakan ini: "Apakah akan menimpa file yang ditambahkanSparkContext.addFile()
saat file target ada dan isinya tidak cocok dengan yang ada di sumbernya." Jadi tidak berpengaruh pada metode saveAsTextFiles.Anda dapat melakukan ini sebelum menyimpan file:
Aas dijelaskan di sini: http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-td6696. html
sumber
Dari dokumentasi pyspark.sql.DataFrame.save (saat ini di 1.3.1), Anda dapat menentukan
mode='overwrite'
saat menyimpan DataFrame:Saya telah memverifikasi bahwa ini bahkan akan menghapus file partisi yang tersisa. Jadi jika Anda awalnya mengatakan 10 partisi / file, tetapi kemudian menimpa folder dengan DataFrame yang hanya memiliki 6 partisi, folder yang dihasilkan akan memiliki 6 partisi / file.
Lihat dokumentasi Spark SQL untuk informasi selengkapnya tentang opsi mode.
sumber
spark.hadoop.validateOutputSpecs
akan berfungsi di semua API Spark.spark.hadoop.validateOutputSpecs
tidak berhasil untuk saya di 1.3, tapi ini berhasil.save(... , mode=
rute, Anda dapat menimpa satu set file, menambahkan yang lain, dll. dalam konteks Spark yang sama. Tidakkahspark.hadoop.validateOutputSpecs
membatasi Anda hanya pada satu mode per konteks?df.write.mode('overwrite').parquet("/output/folder/path")
berfungsi jika Anda ingin menimpa file parket menggunakan python. Ini ada di percikan 1.6.2. API mungkin berbeda di versi yang lebih barusumber
sumber
df.write.mode(SaveMode.Overwrite)
Versi fungsi simpan yang kelebihan beban ini berfungsi untuk saya:
yourDF.save (outputPath, org.apache.spark.sql.SaveMode.valueOf ("Overwrite"))
Contoh di atas akan menimpa folder yang sudah ada. Savemode juga dapat menggunakan parameter ini ( https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/SaveMode.html ):
Tambahkan : Mode Tambah berarti bahwa saat menyimpan DataFrame ke sumber data, jika data / tabel sudah ada, konten DataFrame diharapkan ditambahkan ke data yang sudah ada.
ErrorIfExists : Mode ErrorIfExists berarti bahwa ketika menyimpan DataFrame ke sumber data, jika data sudah ada, pengecualian diharapkan akan dilemparkan.
Abaikan : Mode Abaikan berarti saat menyimpan DataFrame ke sumber data, jika data sudah ada, operasi penyimpanan diharapkan tidak menyimpan konten DataFrame dan tidak mengubah data yang ada.
sumber
Jika Anda ingin menggunakan format keluaran kustom Anda sendiri, Anda akan bisa mendapatkan perilaku yang diinginkan dengan RDD juga.
Lihat kelas-kelas berikut: FileOutputFormat , FileOutputCommitter
Dalam format output file, Anda memiliki metode bernama checkOutputSpecs, yang memeriksa apakah direktori output ada. Di FileOutputCommitter Anda memiliki commitJob yang biasanya mentransfer data dari direktori sementara ke tempat terakhirnya.
Saya belum dapat memverifikasinya (akan melakukannya, segera setelah saya memiliki beberapa menit luang) tetapi secara teoritis: Jika saya memperluas FileOutputFormat dan menimpa checkOutputSpecs ke metode yang tidak membuang pengecualian pada direktori yang sudah ada, dan menyesuaikan metode commitJob dari custom output committer saya untuk melakukan logika mana yang saya inginkan (misalnya, Override beberapa file, tambahkan yang lain) daripada saya mungkin dapat mencapai perilaku yang diinginkan dengan RDD juga.
Format keluaran diteruskan ke: saveAsNewAPIHadoopFile (yang merupakan metode saveAsTextFile yang dipanggil juga untuk benar-benar menyimpan file). Dan Output committer dikonfigurasi di tingkat aplikasi.
sumber