(Mengapa) kita perlu memanggil cache atau tetap menggunakan RDD

171

Ketika dataset didistribusikan (RDD) yang tangguh dibuat dari file teks atau koleksi (atau dari RDD lain), apakah kita perlu memanggil "cache" atau "bertahan" secara eksplisit untuk menyimpan data RDD ke dalam memori? Atau apakah data RDD disimpan secara terdistribusi dalam memori secara default?

val textFile = sc.textFile("/user/emp.txt")

Sesuai pemahaman saya, setelah langkah di atas, textFile adalah RDD dan tersedia di semua / sebagian memori node.

Jika demikian, mengapa kita perlu memanggil "cache" atau "bertahan" di textFile RDD?

Ramana
sumber

Jawaban:

300

Sebagian besar operasi RDD malas. Pikirkan RDD sebagai deskripsi dari serangkaian operasi. RDD bukan data. Jadi baris ini:

val textFile = sc.textFile("/user/emp.txt")

Tidak melakukan apa-apa. Ini menciptakan RDD yang mengatakan "kita perlu memuat file ini". File tidak dimuat pada saat ini.

Operasi RDD yang membutuhkan mengamati isi data tidak bisa malas. (Ini disebut tindakan .) Contohnya adalah RDD.count- untuk memberi tahu Anda jumlah baris dalam file, file tersebut harus dibaca. Jadi jika Anda menulis textFile.count, pada titik ini file akan dibaca, baris akan dihitung, dan jumlah akan dikembalikan.

Bagaimana jika Anda menelepon textFile.countlagi? Hal yang sama: file akan dibaca dan dihitung lagi. Tidak ada yang disimpan. RDD bukan data.

Jadi apa fungsinya RDD.cache? Jika Anda menambahkan textFile.cachekode di atas:

val textFile = sc.textFile("/user/emp.txt")
textFile.cache

Tidak melakukan apa-apa. RDD.cachejuga merupakan operasi yang malas. File masih belum dibaca. Tapi sekarang RDD mengatakan "baca file ini dan kemudian cache isinya". Jika Anda menjalankan textFile.countpertama kali, file akan dimuat, di-cache, dan dihitung. Jika Anda memanggil textFile.countkedua kalinya, operasi akan menggunakan cache. Itu hanya akan mengambil data dari cache dan menghitung garis.

Perilaku cache tergantung pada memori yang tersedia. Jika file tersebut tidak sesuai dengan memori, misalnya, maka textFile.countakan kembali ke perilaku biasa dan membaca kembali file tersebut.

Daniel Darabos
sumber
4
Hai daniel, - ketika Anda memanggil cache, apakah ini berarti bahwa RDD tidak dimuat ulang dari sumber (misalnya file teks) - bagaimana Anda bisa yakin bahwa data dari file teks paling baru ketika di-cache? (Apakah percikan mengetahui hal ini atau apakah ini merupakan operasi manual untuk menghapus daftar () secara berkala untuk memastikan data sumber akan dikomputasi kembali nanti dalam garis keturunan?)
andrew.butkus
juga - jika Anda harus menghapus daftar secara berkala, - jika Anda memiliki rdd yang di-cache, bergantung pada RDD lain yang di-cache, haruskah Anda membatalkan kedua RDD untuk melihat hasil rekomputasi?
andrew.butkus
21
Spark hanya menganggap file tidak akan pernah berubah. Itu membaca file pada titik waktu yang sewenang-wenang dan dapat membaca kembali bagian-bagian yang diperlukan nanti. (Misalnya jika sepotong data didorong keluar dari cache.) Jadi, lebih baik Anda menyimpan file Anda tidak berubah! Cukup buat file baru dengan nama baru saat Anda memiliki data baru, lalu muat sebagai RDD baru. Jika Anda terus mendapatkan data baru, lihat Spark Streaming.
Daniel Darabos
10
Iya. RDD tidak dapat diubah, sehingga setiap RDD menganggap dependensinya juga tidak berubah. Spark Streaming memungkinkan Anda mengatur pohon seperti itu yang beroperasi pada aliran perubahan. Tetapi solusi yang lebih sederhana adalah dengan membangun pohon dalam fungsi yang mengambil nama file sebagai parameternya. Kemudian panggil saja fungsi untuk file baru dan poof, Anda sudah mendapatkan pohon perhitungan baru.
Daniel Darabos
1
@Humoyun: Pada tab Penyimpanan di UI UI, Anda dapat melihat seberapa banyak setiap RDD di-cache. Data mungkin sangat besar sehingga hanya 40% yang sesuai dengan total memori yang Anda miliki untuk caching. Salah satu opsi dalam hal ini adalah menggunakan perisistdan memilih opsi penyimpanan yang memungkinkan menumpahkan data cache ke disk.
Daniel Darabos
197

Saya pikir pertanyaannya akan dirumuskan sebagai:

Kapan kita perlu memanggil cache atau tetap menggunakan RDD?

Proses percikan bersifat malas, yaitu, tidak ada yang akan terjadi sampai diperlukan. Untuk menjawab pertanyaan dengan cepat, setelah val textFile = sc.textFile("/user/emp.txt")dikeluarkan, tidak ada yang terjadi pada data, hanya yang HadoopRDDdibangun, menggunakan file sebagai sumber.

Katakanlah kita mengubah data itu sedikit:

val wordsRDD = textFile.flatMap(line => line.split("\\W"))

Sekali lagi, tidak ada yang terjadi pada data. Sekarang ada RDD baru wordsRDDyang berisi referensi ke testFiledan fungsi untuk diterapkan ketika dibutuhkan.

Hanya ketika suatu tindakan dipanggil pada RDD, seperti wordsRDD.count, rantai RDD, yang disebut garis keturunan akan dieksekusi. Artinya, data, yang dipecah dalam partisi, akan dimuat oleh pelaksana cluster Spark, flatMapfungsi akan diterapkan dan hasilnya akan dihitung.

Pada garis keturunan linier, seperti yang ada dalam contoh ini, cache()tidak diperlukan. Data akan dimuat ke pelaksana, semua transformasi akan diterapkan dan akhirnya countakan dihitung, semua dalam memori - jika data cocok dalam memori.

cacheberguna ketika garis silsilah RDD keluar. Katakanlah Anda ingin memfilter kata-kata dari contoh sebelumnya ke dalam hitungan untuk kata-kata positif dan negatif. Anda bisa melakukan ini seperti itu:

val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count()
val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()

Di sini, setiap cabang mengeluarkan ulang data. Menambahkan cachepernyataan eksplisit akan memastikan bahwa pemrosesan yang dilakukan sebelumnya dipertahankan dan digunakan kembali. Pekerjaan akan terlihat seperti ini:

val textFile = sc.textFile("/user/emp.txt")
val wordsRDD = textFile.flatMap(line => line.split("\\W"))
wordsRDD.cache()
val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count()
val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()

Untuk alasan itu, cachedikatakan 'break the lineage' karena menciptakan pos pemeriksaan yang dapat digunakan kembali untuk diproses lebih lanjut.

Aturan praktis: Gunakan cacheketika garis silsilah RDD Anda bercabang atau ketika RDD digunakan beberapa kali seperti dalam satu lingkaran.

maasg
sumber
1
Luar biasa. Terima kasih. Satu lagi pertanyaan terkait. Saat kita cache atau bertahan, data akan disimpan dalam memori pelaksana atau memori simpul pekerja. Jika itu adalah memori pelaksana, Bagaimana Spark mengidentifikasi pelaksana mana yang memiliki data.
Ramana
1
@RamanaUppala, memori pelaksana digunakan. Fraksi memori pelaksana yang digunakan untuk caching dikendalikan oleh konfigurasi spark.storage.memoryFraction. Mengenai pelaksana mana yang memiliki data mana, RDD akan melacak partisinya yang didistribusikan pada pelaksana.
maasg
5
@maasg Perbaiki saya jika saya salah tetapi tidak cachejuga tidak persist dapat memutus garis silsilah .
nol323
Di mana kata-kataRDD disimpan jika kita belum memiliki pernyataan .cache () pada contoh di atas?
sun_dare
bagaimana jika sebelum dua hitungan, kita menyatukan dua cabang kembali ke satu rdd dan menghitung? dalam hal ini, apakah cache bermanfaat?
Xiawei Zhang
30

Apakah kita perlu memanggil "cache" atau "bertahan" secara eksplisit untuk menyimpan data RDD ke dalam memori?

Ya, hanya jika dibutuhkan.

Data RDD disimpan secara terdistribusi di memori secara default?

Tidak!

Dan inilah alasannya:

  • Spark mendukung dua jenis variabel bersama: variabel siaran, yang dapat digunakan untuk menyimpan nilai dalam memori pada semua node, dan akumulator, yang merupakan variabel yang hanya ditambahkan ke, seperti penghitung dan jumlah.

  • RDD mendukung dua jenis operasi: transformasi, yang membuat dataset baru dari yang sudah ada, dan tindakan, yang mengembalikan nilai ke program driver setelah menjalankan perhitungan pada dataset. Misalnya, peta adalah transformasi yang melewati setiap elemen dataset melalui fungsi dan mengembalikan RDD baru yang mewakili hasil. Di sisi lain, reduksi adalah tindakan yang mengagregasi semua elemen RDD menggunakan beberapa fungsi dan mengembalikan hasil akhir ke program driver (meskipun ada juga pengurangan paralel olehYeyey yang mengembalikan dataset terdistribusi).

  • Semua transformasi di Spark malas, karena mereka tidak menghitung hasilnya segera. Sebagai gantinya, mereka hanya mengingat transformasi yang diterapkan pada beberapa dataset dasar (misalnya file). Transformasi hanya dihitung ketika suatu tindakan membutuhkan hasil untuk dikembalikan ke program driver. Desain ini memungkinkan Spark berjalan lebih efisien - misalnya, kita dapat menyadari bahwa dataset yang dibuat melalui peta akan digunakan dalam pengurangan dan hanya mengembalikan hasil pengurangan ke driver, daripada dataset yang dipetakan lebih besar.

  • Secara default, setiap RDD yang diubah dapat dihitung ulang setiap kali Anda menjalankan suatu tindakan di atasnya. Namun, Anda juga dapat mempertahankan RDD dalam memori menggunakan metode persist (atau cache), dalam hal ini Spark akan menjaga elemen-elemen di dalam cluster untuk akses yang lebih cepat saat berikutnya Anda menanyakannya. Ada juga dukungan untuk RDD yang ada pada disk, atau direplikasi di beberapa node.

Untuk detail lebih lanjut silakan periksa panduan pemrograman Spark .

eliasah
sumber
1
Itu tidak menjawab pertanyaan saya.
Ramana
Apa yang tidak menjawabnya?
eliasah
1
ketika data RDD disimpan dalam memori default, mengapa kita perlu memanggil Cache atau Bertahan?
Ramana
RDD tidak disimpan dalam memori secara default, jadi dengan RDD yang bertahan membuat Spark melakukan transformasi lebih cepat pada cluster
eliasah
2
Ini jawaban yang bagus, saya tidak tahu mengapa itu dibatalkan. Ini adalah jawaban top-down, menjelaskan bagaimana RDD bekerja dari konsep tingkat tinggi. Saya telah menambahkan jawaban lain yang masuk dari bawah ke atas: mulai dari "apa yang dilakukan baris ini". Mungkin lebih mudah diikuti untuk seseorang yang baru memulai dengan Spark.
Daniel Darabos
11

Di bawah ini adalah tiga situasi Anda harus men-cache RDD Anda:

menggunakan RDD berkali-kali

melakukan banyak tindakan pada RDD yang sama

untuk rantai panjang (atau sangat mahal) transformasi

rileys
sumber
7

Menambahkan alasan lain untuk menambahkan (atau menambahkan sementara) cachemetode panggilan.

untuk masalah memori debug

dengan cachemetode, percikan akan memberikan informasi debug tentang ukuran RDD. jadi pada spark terintegrasi UI, Anda akan mendapatkan info konsumsi memori RDD. dan ini terbukti sangat membantu mendiagnosis masalah memori.

zinking
sumber