Saya ingin membaca banyak file teks dari lokasi HDFS dan melakukan pemetaan pada iterasi menggunakan percikan.
JavaRDD<String> records = ctx.textFile(args[1], 1);
mampu membaca hanya satu file pada satu waktu.
Saya ingin membaca lebih dari satu file dan memprosesnya sebagai RDD tunggal. Bagaimana?
apache-spark
pengguna3705662
sumber
sumber
Path
opsi yang sama berlaku.sc.wholeTextFiles
berguna untuk data yang tidak dibatasi garissc.textFile(multipleCommaSeparatedDirs,320)
itu mengarah pada19430
total tugas alih-alih320
... berperilaku sepertiunion
yang juga mengarah pada jumlah tugas gila dari paralelisme yang sangat rendahwholeTextFiles
. Apa kasus penggunaan Anda? Saya bisa memikirkan solusinya asalkan Anda menggunakan jumlah partisi yang sama dengan file ...Gunakan
union
sebagai berikut:Maka itu
bigRdd
adalah RDD dengan semua file.sumber
Anda dapat menggunakan satu panggilan textFile untuk membaca banyak file. Scala:
sumber
sc.textFile(files.mkString(","))
Anda bisa menggunakan ini
Pertama, Anda bisa mendapatkan Buffer / Daftar S3 Paths:
Sekarang Lewati objek Daftar ini ke potongan kode berikut, catatan: sc adalah objek SQLContext
Sekarang Anda mendapatkan RDD Unified akhir yaitu df
Opsional, Dan Anda juga dapat mempartisi ulang dalam BigRDD tunggal
Partisi ulang selalu berfungsi: D
sumber
Di PySpark, saya telah menemukan cara tambahan yang berguna untuk mem-parsing file. Mungkin ada padanan dalam Scala, tapi saya tidak cukup nyaman dengan terjemahan yang berfungsi. Ini adalah, pada dasarnya, panggilan textFile dengan penambahan label (dalam contoh di bawah ini kunci = nama file, nilai = 1 baris dari file).
TextFile "berlabel"
memasukkan:
output: array dengan setiap entri yang berisi tuple menggunakan nama file-sebagai-kunci dan dengan nilai = setiap baris file. (Secara teknis, menggunakan metode ini Anda juga dapat menggunakan kunci yang berbeda di samping nama file sebenarnya - mungkin representasi hashing untuk menghemat memori). yaitu.
Anda juga dapat menggabungkan kembali baik sebagai daftar baris:
Spark_Full.groupByKey().map(lambda x: (x[0], list(x[1]))).collect()
Atau gabungkan kembali seluruh file kembali ke string tunggal (dalam contoh ini hasilnya sama dengan apa yang Anda dapatkan dari wholeTextFiles, tetapi dengan string "file:" yang dilepas dari filepathing.):
Spark_Full.groupByKey().map(lambda x: (x[0], ' '.join(list(x[1])))).collect()
sumber
Spark_Full += sc.textFile(filename).keyBy(lambda x: filename)
saya mendapat kesalahan yaituTypeError: 'PipelinedRDD' object is not iterable
. Pemahaman saya adalah bahwa, baris itu menciptakan RDD yang tidak berubah, jadi saya bertanya-tanya bagaimana Anda bisa menambahkannya ke variabel lain?kamu bisa memakai
di sini Anda akan mendapatkan jalur file Anda dan konten file itu. sehingga Anda dapat melakukan tindakan apa pun dari seluruh file pada waktu yang menghemat overhead
sumber
Semua jawaban benar
sc.textFile
Saya hanya ingin tahu mengapa tidak
wholeTextFiles
Misalnya, dalam hal ini ...satu batasannya adalah, kita harus memuat file kecil jika tidak kinerja akan buruk dan dapat menyebabkan OOM.
Catatan :
Referensi lebih lanjut untuk dikunjungi
sumber
sc.wholeTextFiles(folder).flatMap...
Ada solusi bersih lurus ke depan yang tersedia. Gunakan metode wholeTextFiles (). Ini akan mengambil direktori dan membentuk pasangan nilai kunci. RDD yang dikembalikan akan menjadi pasangan RDD. Temukan deskripsi di bawah ini dari Spark docs :
sumber
COBALAH Antarmuka INI digunakan untuk menulis DataFrame ke sistem penyimpanan eksternal (mis. Sistem file, penyimpanan nilai kunci, dll). Gunakan DataFrame.write () untuk mengakses ini.
Baru dalam versi 1.4.
csv (jalur, mode = Tidak Ada, kompresi = Tidak ada, sep = Tidak ada, kutipan = Tidak ada, melarikan diri = Tidak ada, header = Tidak ada, nullValue = Tidak ada, escapeQuotes = Tidak ada, quoteAll = Tidak ada, dateFormat = Tidak ada, timestampFormat = Tidak Ada) Menyimpan konten DataFrame dalam format CSV di jalur yang ditentukan.
Parameter: jalur - jalur dalam mode sistem file apa pun yang didukung Hadoop - menentukan perilaku operasi penyimpanan saat data sudah ada.
append: Tambahkan konten DataFrame ini ke data yang ada. menimpa: Menimpa data yang ada. abaikan: Abaikan operasi ini secara diam-diam jika data sudah ada. error (case default): Melempar pengecualian jika data sudah ada. kompresi - kompresi codec untuk digunakan saat menyimpan ke file. Ini bisa menjadi salah satu dari nama pendek yang tidak peka terhadap huruf yang diketahui (tidak ada, bzip2, gzip, lz4, tajam dan mengempis). sep - menetapkan karakter tunggal sebagai pemisah untuk setiap bidang dan nilai. Jika Tidak ada yang diset, ini menggunakan nilai default,,. kutipan - mengatur karakter tunggal yang digunakan untuk keluar dari nilai yang dikutip di mana pemisah dapat menjadi bagian dari nilai. Jika Tidak ada yang ditetapkan, ini menggunakan nilai default, ". Jika Anda ingin mematikan kutipan, Anda perlu mengatur string kosong. Escape - mengatur karakter tunggal yang digunakan untuk keluar dari kutipan di dalam nilai yang sudah dikutip. Jika Tidak ada yang ditetapkan , menggunakan nilai default, \ escapeQuotes - Bendera yang menunjukkan apakah nilai yang mengandung tanda kutip harus selalu dilampirkan dalam tanda kutip. Jika Tidak Ada yang disetel, ini menggunakan nilai default true, keluar dari semua nilai yang mengandung karakter kutipan. quoteAll - Bendera yang menunjukkan apakah semua nilai harus selalu dilampirkan dalam tanda kutip. Jika Tidak ada yang ditetapkan, ini menggunakan nilai default false, hanya melarikan diri nilai yang mengandung karakter kutipan. header - menulis nama kolom sebagai baris pertama. Jika Tidak ada yang diset, ini menggunakan nilai default, false. nullValue - mengatur representasi string dari nilai null. Jika Tidak ada yang diset, ini menggunakan nilai default, string kosong. dateFormat - mengatur string yang menunjukkan format tanggal. Format tanggal khusus mengikuti format di java.text.SimpleDateFormat. Ini berlaku untuk tipe tanggal. Jika Tidak Ada yang diset, ini menggunakan nilai nilai default, yyyy-MM-dd. timestampFormat - mengatur string yang menunjukkan format timestamp. Format tanggal khusus mengikuti format di java.text.SimpleDateFormat. Ini berlaku untuk jenis cap waktu. Jika Tidak Ada yang diset, ini menggunakan nilai nilai default, yyyy-MM-dd'T'HH: mm: ss.SSSZZ.
sumber
sumber