Bagaimana menimpa direktori keluaran di spark

107

Saya memiliki aplikasi streaming percikan yang menghasilkan kumpulan data untuk setiap menit. Saya perlu menyimpan / menimpa hasil dari data yang diproses.

Ketika saya mencoba menimpa dataset org.apache.hadoop.mapred.FileAlreadyExistsException menghentikan eksekusi.

Saya mengatur properti Spark set("spark.files.overwrite","true"), tetapi tidak berhasil.

Bagaimana cara menimpa atau menghapus file dari percikan?

Vijay Innamuri
sumber
1
Ya itu menyebalkan bukan, saya menganggapnya sebagai regresi ke 0.9.0. Mohon terima jawaban saya :)
samthebest
set("spark.files.overwrite","true")bekerja hanya untuk file yang ditambahkan pikirspark.addFile()
aiman

Jawaban:

106

UPDATE: Sarankan penggunaan Dataframes, plus sesuatu seperti ... .write.mode(SaveMode.Overwrite) ....

Germo yang berguna:

implicit class PimpedStringRDD(rdd: RDD[String]) {
    def write(p: String)(implicit ss: SparkSession): Unit = {
      import ss.implicits._
      rdd.toDF().as[String].write.mode(SaveMode.Overwrite).text(p)
    }
  }

Untuk versi yang lebih lama coba

yourSparkConf.set("spark.hadoop.validateOutputSpecs", "false")
val sc = SparkContext(yourSparkConf)

Di 1.1.0 Anda dapat menyetel pengaturan conf menggunakan skrip spark-submit dengan flag --conf.

PERINGATAN (versi lama): Menurut @piggybox, ada bug di Spark yang hanya akan menimpa file yang diperlukan untuk menulis part-file itu, file lain tidak akan dihapus.

samthebest
sumber
29
Untuk Spark 1.4:df.write.mode(SaveMode.Overwrite).parquet(path)
Ha Pham
Untuk Spark SQL, Anda memiliki opsi untuk menentukan SaveMode untuk Core Spark Anda tidak memiliki yang seperti itu. Benar-benar ingin menggunakan fitur semacam itu untuk saveAsTextFile dan transformasi lainnya
Murtaza Kanchwala
3
Masalah tersembunyi: membandingkan solusi @ pzecevic untuk menghapus seluruh folder melalui HDFS, dalam pendekatan ini Spark hanya akan menimpa file bagian dengan nama file yang sama di folder output. Ini berfungsi sebagian besar waktu, tetapi jika ada sesuatu yang lain seperti file bagian tambahan dari pekerjaan Spark / Hadoop lain di folder, ini tidak akan menimpa file-file ini.
piggybox
6
Anda juga dapat menggunakan df.write.mode(mode: String).parquet(path)mode Di mana: String bisa menjadi: "overwrite", "append", "ignore", "error".
gandum hitam
1
@avocado Yup berpikir begitu, Spark API semakin buruk di setiap rilis: P
samthebest
27

Dokumentasi untuk parameter spark.files.overwritemengatakan ini: "Apakah akan menimpa file yang ditambahkan SparkContext.addFile()saat file target ada dan isinya tidak cocok dengan yang ada di sumbernya." Jadi tidak berpengaruh pada metode saveAsTextFiles.

Anda dapat melakukan ini sebelum menyimpan file:

val hadoopConf = new org.apache.hadoop.conf.Configuration()
val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://localhost:9000"), hadoopConf)
try { hdfs.delete(new org.apache.hadoop.fs.Path(filepath), true) } catch { case _ : Throwable => { } }

Aas dijelaskan di sini: http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-td6696. html

pzecevic.dll
sumber
29
bagaimana dengan pyspark?
javadba
Jawaban berikutnya untuk menggunakan 'write.mode (SaveMode.Overwrite)' adalah caranya
YaOg
hdfs dapat menghapus file baru yang masuk karena masih menghapus yang lama.
Jake
25

Dari dokumentasi pyspark.sql.DataFrame.save (saat ini di 1.3.1), Anda dapat menentukan mode='overwrite'saat menyimpan DataFrame:

myDataFrame.save(path='myPath', source='parquet', mode='overwrite')

Saya telah memverifikasi bahwa ini bahkan akan menghapus file partisi yang tersisa. Jadi jika Anda awalnya mengatakan 10 partisi / file, tetapi kemudian menimpa folder dengan DataFrame yang hanya memiliki 6 partisi, folder yang dihasilkan akan memiliki 6 partisi / file.

Lihat dokumentasi Spark SQL untuk informasi selengkapnya tentang opsi mode.

dnlbrky.dll
sumber
2
Benar dan bermanfaat, terima kasih, tetapi solusi khusus DataFrame - spark.hadoop.validateOutputSpecsakan berfungsi di semua API Spark.
Samthebest
Untuk beberapa alasan, spark.hadoop.validateOutputSpecstidak berhasil untuk saya di 1.3, tapi ini berhasil.
Eric Walker
1
@samthebest Dengan save(... , mode=rute, Anda dapat menimpa satu set file, menambahkan yang lain, dll. dalam konteks Spark yang sama. Tidakkah spark.hadoop.validateOutputSpecsmembatasi Anda hanya pada satu mode per konteks?
dnlbrky
1
@dnlbrky OP tidak meminta untuk menambahkan. Seperti yang saya katakan, benar, berguna, tetapi tidak perlu. Jika OP bertanya "bagaimana cara menambahkan" maka berbagai macam jawaban bisa diberikan. Tapi jangan membahasnya. Juga saya sarankan Anda mempertimbangkan untuk menggunakan versi Scala dari DataFrames karena memiliki jenis keamanan dan pemeriksaan lebih lanjut - misalnya jika Anda memiliki kesalahan ketik dalam "timpa" Anda tidak akan mengetahuinya sampai DAG itu dievaluasi - yang dalam pekerjaan Big Data dapat menjadi 2 jam kemudian !! Jika Anda menggunakan versi Scala, kompilator akan memeriksa semuanya di depan! Cukup keren, dan sangat penting untuk Big Data.
Samthebest
15

df.write.mode('overwrite').parquet("/output/folder/path")berfungsi jika Anda ingin menimpa file parket menggunakan python. Ini ada di percikan 1.6.2. API mungkin berbeda di versi yang lebih baru

akn
sumber
Ya ini berfungsi dengan baik untuk kebutuhan saya (Databricks)
Nick.McDermaid
4
  val jobName = "WordCount";
  //overwrite the output directory in spark  set("spark.hadoop.validateOutputSpecs", "false")
  val conf = new 
  SparkConf().setAppName(jobName).set("spark.hadoop.validateOutputSpecs", "false");
  val sc = new SparkContext(conf)
vaquar khan
sumber
Hanya untuk Spark 1, dalam penggunaan versi terbarudf.write.mode(SaveMode.Overwrite)
ChikuMiku
3

Versi fungsi simpan yang kelebihan beban ini berfungsi untuk saya:

yourDF.save (outputPath, org.apache.spark.sql.SaveMode.valueOf ("Overwrite"))

Contoh di atas akan menimpa folder yang sudah ada. Savemode juga dapat menggunakan parameter ini ( https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/SaveMode.html ):

Tambahkan : Mode Tambah berarti bahwa saat menyimpan DataFrame ke sumber data, jika data / tabel sudah ada, konten DataFrame diharapkan ditambahkan ke data yang sudah ada.

ErrorIfExists : Mode ErrorIfExists berarti bahwa ketika menyimpan DataFrame ke sumber data, jika data sudah ada, pengecualian diharapkan akan dilemparkan.

Abaikan : Mode Abaikan berarti saat menyimpan DataFrame ke sumber data, jika data sudah ada, operasi penyimpanan diharapkan tidak menyimpan konten DataFrame dan tidak mengubah data yang ada.

Shay
sumber
1

Jika Anda ingin menggunakan format keluaran kustom Anda sendiri, Anda akan bisa mendapatkan perilaku yang diinginkan dengan RDD juga.

Lihat kelas-kelas berikut: FileOutputFormat , FileOutputCommitter

Dalam format output file, Anda memiliki metode bernama checkOutputSpecs, yang memeriksa apakah direktori output ada. Di FileOutputCommitter Anda memiliki commitJob yang biasanya mentransfer data dari direktori sementara ke tempat terakhirnya.

Saya belum dapat memverifikasinya (akan melakukannya, segera setelah saya memiliki beberapa menit luang) tetapi secara teoritis: Jika saya memperluas FileOutputFormat dan menimpa checkOutputSpecs ke metode yang tidak membuang pengecualian pada direktori yang sudah ada, dan menyesuaikan metode commitJob dari custom output committer saya untuk melakukan logika mana yang saya inginkan (misalnya, Override beberapa file, tambahkan yang lain) daripada saya mungkin dapat mencapai perilaku yang diinginkan dengan RDD juga.

Format keluaran diteruskan ke: saveAsNewAPIHadoopFile (yang merupakan metode saveAsTextFile yang dipanggil juga untuk benar-benar menyimpan file). Dan Output committer dikonfigurasi di tingkat aplikasi.

Michael Kopaniov
sumber
Saya akan menghindari mendekati subclass FileOutputCommitter jika Anda dapat membantunya: itu sedikit kode yang menakutkan. Hadoop 3.0 menambahkan titik plugin di mana FileOutputFormat dapat mengambil implementasi berbeda dari superclass yang direfraktorisasi (PathOutputCommitter). S3 dari Netflix akan menulis di tempat ke pohon yang dipartisi, hanya melakukan resolusi konflik (gagal, menghapus, menambah) saat pekerjaan komit, dan hanya di partisi yang diperbarui
stevel