Sebagian besar operasi RDD malas. Pikirkan RDD sebagai deskripsi dari serangkaian operasi. RDD bukan data. Jadi baris ini:
val textFile = sc.textFile("/user/emp.txt")
Tidak melakukan apa-apa. Ini menciptakan RDD yang mengatakan "kita perlu memuat file ini". File tidak dimuat pada saat ini.
Operasi RDD yang membutuhkan mengamati isi data tidak bisa malas. (Ini disebut tindakan .) Contohnya adalah RDD.count
- untuk memberi tahu Anda jumlah baris dalam file, file tersebut harus dibaca. Jadi jika Anda menulis textFile.count
, pada titik ini file akan dibaca, baris akan dihitung, dan jumlah akan dikembalikan.
Bagaimana jika Anda menelepon textFile.count
lagi? Hal yang sama: file akan dibaca dan dihitung lagi. Tidak ada yang disimpan. RDD bukan data.
Jadi apa fungsinya RDD.cache
? Jika Anda menambahkan textFile.cache
kode di atas:
val textFile = sc.textFile("/user/emp.txt")
textFile.cache
Tidak melakukan apa-apa. RDD.cache
juga merupakan operasi yang malas. File masih belum dibaca. Tapi sekarang RDD mengatakan "baca file ini dan kemudian cache isinya". Jika Anda menjalankan textFile.count
pertama kali, file akan dimuat, di-cache, dan dihitung. Jika Anda memanggil textFile.count
kedua kalinya, operasi akan menggunakan cache. Itu hanya akan mengambil data dari cache dan menghitung garis.
Perilaku cache tergantung pada memori yang tersedia. Jika file tersebut tidak sesuai dengan memori, misalnya, maka textFile.count
akan kembali ke perilaku biasa dan membaca kembali file tersebut.
perisist
dan memilih opsi penyimpanan yang memungkinkan menumpahkan data cache ke disk.Saya pikir pertanyaannya akan dirumuskan sebagai:
Kapan kita perlu memanggil cache atau tetap menggunakan RDD?
Proses percikan bersifat malas, yaitu, tidak ada yang akan terjadi sampai diperlukan. Untuk menjawab pertanyaan dengan cepat, setelah
val textFile = sc.textFile("/user/emp.txt")
dikeluarkan, tidak ada yang terjadi pada data, hanya yangHadoopRDD
dibangun, menggunakan file sebagai sumber.Katakanlah kita mengubah data itu sedikit:
Sekali lagi, tidak ada yang terjadi pada data. Sekarang ada RDD baru
wordsRDD
yang berisi referensi ketestFile
dan fungsi untuk diterapkan ketika dibutuhkan.Hanya ketika suatu tindakan dipanggil pada RDD, seperti
wordsRDD.count
, rantai RDD, yang disebut garis keturunan akan dieksekusi. Artinya, data, yang dipecah dalam partisi, akan dimuat oleh pelaksana cluster Spark,flatMap
fungsi akan diterapkan dan hasilnya akan dihitung.Pada garis keturunan linier, seperti yang ada dalam contoh ini,
cache()
tidak diperlukan. Data akan dimuat ke pelaksana, semua transformasi akan diterapkan dan akhirnyacount
akan dihitung, semua dalam memori - jika data cocok dalam memori.cache
berguna ketika garis silsilah RDD keluar. Katakanlah Anda ingin memfilter kata-kata dari contoh sebelumnya ke dalam hitungan untuk kata-kata positif dan negatif. Anda bisa melakukan ini seperti itu:Di sini, setiap cabang mengeluarkan ulang data. Menambahkan
cache
pernyataan eksplisit akan memastikan bahwa pemrosesan yang dilakukan sebelumnya dipertahankan dan digunakan kembali. Pekerjaan akan terlihat seperti ini:Untuk alasan itu,
cache
dikatakan 'break the lineage' karena menciptakan pos pemeriksaan yang dapat digunakan kembali untuk diproses lebih lanjut.Aturan praktis: Gunakan
cache
ketika garis silsilah RDD Anda bercabang atau ketika RDD digunakan beberapa kali seperti dalam satu lingkaran.sumber
spark.storage.memoryFraction
. Mengenai pelaksana mana yang memiliki data mana, RDD akan melacak partisinya yang didistribusikan pada pelaksana.cache
juga tidakpersist
dapat memutus garis silsilah .Apakah kita perlu memanggil "cache" atau "bertahan" secara eksplisit untuk menyimpan data RDD ke dalam memori?
Ya, hanya jika dibutuhkan.
Data RDD disimpan secara terdistribusi di memori secara default?
Tidak!
Dan inilah alasannya:
Spark mendukung dua jenis variabel bersama: variabel siaran, yang dapat digunakan untuk menyimpan nilai dalam memori pada semua node, dan akumulator, yang merupakan variabel yang hanya ditambahkan ke, seperti penghitung dan jumlah.
RDD mendukung dua jenis operasi: transformasi, yang membuat dataset baru dari yang sudah ada, dan tindakan, yang mengembalikan nilai ke program driver setelah menjalankan perhitungan pada dataset. Misalnya, peta adalah transformasi yang melewati setiap elemen dataset melalui fungsi dan mengembalikan RDD baru yang mewakili hasil. Di sisi lain, reduksi adalah tindakan yang mengagregasi semua elemen RDD menggunakan beberapa fungsi dan mengembalikan hasil akhir ke program driver (meskipun ada juga pengurangan paralel olehYeyey yang mengembalikan dataset terdistribusi).
Semua transformasi di Spark malas, karena mereka tidak menghitung hasilnya segera. Sebagai gantinya, mereka hanya mengingat transformasi yang diterapkan pada beberapa dataset dasar (misalnya file). Transformasi hanya dihitung ketika suatu tindakan membutuhkan hasil untuk dikembalikan ke program driver. Desain ini memungkinkan Spark berjalan lebih efisien - misalnya, kita dapat menyadari bahwa dataset yang dibuat melalui peta akan digunakan dalam pengurangan dan hanya mengembalikan hasil pengurangan ke driver, daripada dataset yang dipetakan lebih besar.
Secara default, setiap RDD yang diubah dapat dihitung ulang setiap kali Anda menjalankan suatu tindakan di atasnya. Namun, Anda juga dapat mempertahankan RDD dalam memori menggunakan metode persist (atau cache), dalam hal ini Spark akan menjaga elemen-elemen di dalam cluster untuk akses yang lebih cepat saat berikutnya Anda menanyakannya. Ada juga dukungan untuk RDD yang ada pada disk, atau direplikasi di beberapa node.
Untuk detail lebih lanjut silakan periksa panduan pemrograman Spark .
sumber
Di bawah ini adalah tiga situasi Anda harus men-cache RDD Anda:
sumber
Menambahkan alasan lain untuk menambahkan (atau menambahkan sementara)
cache
metode panggilan.untuk masalah memori debug
dengan
cache
metode, percikan akan memberikan informasi debug tentang ukuran RDD. jadi pada spark terintegrasi UI, Anda akan mendapatkan info konsumsi memori RDD. dan ini terbukti sangat membantu mendiagnosis masalah memori.sumber