Mari kita asumsikan sebagai berikut bahwa hanya satu tugas Spark yang berjalan di setiap titik waktu.
Apa yang saya dapatkan sejauh ini
Inilah yang saya pahami tentang apa yang terjadi di Spark:
- Saat a
SparkContext
dibuat, setiap node pekerja memulai eksekutor. Pelaksana adalah proses terpisah (JVM), yang menghubungkan kembali ke program driver. Setiap pelaksana memiliki toples program driver. Keluar dari pengemudi, matikan pelaksana. Setiap pelaksana dapat menampung beberapa partisi. - Ketika sebuah pekerjaan dijalankan, rencana eksekusi dibuat sesuai dengan grafik garis keturunan.
- Pekerjaan eksekusi dibagi menjadi beberapa tahap, di mana tahapan yang berisi transformasi dan tindakan tetangga (dalam grafik garis keturunan) sebanyak mungkin, tetapi tidak ada pengacakan. Jadi tahapan dipisahkan oleh pengocokan.
aku mengerti itu
- Tugas adalah perintah yang dikirim dari driver ke eksekutor dengan membuat serial objek Fungsi.
- Eksekutor deserializes (dengan driver jar) perintah (tugas) dan mengeksekusinya di partisi.
tapi
Pertanyaan
Bagaimana cara membagi tahapan menjadi tugas-tugas itu?
Secara khusus:
- Apakah tugas ditentukan oleh transformasi dan tindakan atau dapatkah beberapa transformasi / tindakan menjadi tugas?
- Apakah tugas ditentukan oleh partisi (misalnya satu tugas per tahap per partisi).
- Apakah tugas ditentukan oleh node (misalnya satu tugas per tahap per node)?
Apa yang saya pikirkan (hanya jawaban parsial, meskipun benar)
Di https://0x0fff.com/spark-architecture-shuffle , pengocokan dijelaskan dengan gambar
dan saya mendapat kesan bahwa aturannya adalah
setiap tahap dibagi menjadi # tugas jumlah partisi, tanpa memperhatikan jumlah node
Untuk gambar pertama saya, saya akan mengatakan bahwa saya memiliki 3 tugas peta dan 3 tugas pengurangan.
Untuk gambar dari 0x0fff, saya akan mengatakan ada 8 tugas peta dan 3 tugas pengurangan (dengan asumsi bahwa hanya ada tiga file oranye dan tiga file hijau tua).
Pertanyaan terbuka dalam hal apa pun
Apakah itu benar? Tetapi meskipun itu benar, pertanyaan saya di atas tidak semuanya terjawab, karena masih terbuka, apakah beberapa operasi (mis. Beberapa peta) berada dalam satu tugas atau dipisahkan menjadi satu tugas per operasi.
Apa yang dikatakan orang lain
Apa tugas di Spark? Bagaimana cara pekerja Spark mengeksekusi file jar? dan Bagaimana penjadwal Apache Spark membagi file menjadi tugas? mirip, tetapi saya tidak merasa pertanyaan saya dijawab dengan jelas di sana.
sumber
Jawaban:
Anda memiliki garis besar yang cukup bagus di sini. Untuk menjawab pertanyaan Anda
task
tidak perlu diluncurkan untuk setiap partisi data untuk masing-masingstage
. Pertimbangkan bahwa setiap partisi kemungkinan besar akan berada di lokasi fisik yang berbeda - misalnya blok di HDFS atau direktori / volume untuk sistem file lokal.Perhatikan bahwa pengiriman
Stage
s didorong olehDAG Scheduler
. Ini berarti bahwa tahapan yang tidak saling bergantung dapat dikirimkan ke cluster untuk dieksekusi secara paralel: ini memaksimalkan kemampuan paralelisasi pada cluster. Jadi, jika operasi dalam aliran data kami dapat terjadi secara bersamaan, kami berharap akan melihat beberapa tahapan diluncurkan.Kita dapat melihat itu beraksi dalam contoh mainan berikut di mana kita melakukan jenis operasi berikut:
Jadi, berapa banyak tahapan yang akan kita capai?
join
yang bergantung pada dua tahap lainnyaIni program mainan itu
Dan inilah DAG dari hasilnya
Sekarang: berapa banyak tugas ? Jumlah tugas harus sama dengan
Jumlah (
Stage
*#Partitions in the stage
)sumber
sum(..)
untuk memperhitungkan variasi itu.Ini mungkin membantu Anda lebih memahami bagian yang berbeda:
sumber
Jika saya mengerti dengan benar ada 2 hal (terkait) yang membingungkan Anda:
1) Apa yang menentukan konten tugas?
2) Apa yang menentukan jumlah tugas yang akan dijalankan?
Mesin Spark "merekatkan" operasi sederhana pada rdds yang berurutan, misalnya:
jadi ketika rdd3 (malas) dihitung, spark akan menghasilkan tugas per partisi rdd1 dan setiap tugas akan mengeksekusi filter dan peta per baris untuk menghasilkan rdd3.
Jumlah tugas ditentukan oleh jumlah partisi. Setiap RDD memiliki jumlah partisi yang ditentukan. Untuk RDD sumber yang dibaca dari HDFS (misalnya menggunakan sc.textFile (...)) jumlah partisi adalah jumlah pemisahan yang dihasilkan oleh format input. Beberapa operasi pada RDD dapat menghasilkan RDD dengan jumlah partisi yang berbeda:
Contoh lainnya adalah join:
(Sebagian besar) operasi yang mengubah jumlah partisi melibatkan pengacakan, Ketika kami melakukannya misalnya:
yang sebenarnya terjadi adalah tugas pada setiap partisi rdd1 perlu menghasilkan keluaran akhir yang dapat dibaca oleh tahap berikut sehingga untuk membuat rdd2 memiliki tepat 1000 partisi (Bagaimana mereka melakukannya? Hash atau Urutkan ). Tugas di sisi ini kadang-kadang disebut sebagai "Tugas peta (samping)". Sebuah tugas yang nantinya akan berjalan di rdd2 akan bekerja pada satu partisi (dari rdd2!) Dan harus mencari cara untuk membaca / menggabungkan keluaran sisi peta yang relevan dengan partisi tersebut. Tugas di sisi ini terkadang disebut sebagai "Kurangi (tugas sampingan)".
2 pertanyaan terkait: jumlah tugas dalam sebuah tahapan adalah jumlah partisi (umum untuk rdds berurutan yang "direkatkan" bersama) dan jumlah partisi rdd dapat berubah antar tahapan (dengan menentukan jumlah partisi ke beberapa shuffle menyebabkan operasi misalnya).
Setelah eksekusi tahapan dimulai, tugasnya dapat menempati slot tugas. Jumlah slot tugas serentak adalah numExecutors * ExecutorCores. Secara umum, ini dapat ditempati oleh tugas-tugas dari tahapan yang berbeda dan tidak bergantung.
sumber