Apache Spark: Jumlah inti vs jumlah pelaksana

193

Saya mencoba memahami hubungan jumlah core dan jumlah eksekutor ketika menjalankan pekerjaan Spark di BENANG.

Lingkungan pengujian adalah sebagai berikut:

  • Jumlah node data: 3
  • Spesifikasi mesin simpul data:
    • CPU: Core i7-4790 (# core: 4, # utas: 8)
    • RAM: 32GB (8GB x 4)
    • HDD: 8TB (2TB x 4)
  • Jaringan: 1Gb

  • Versi percikan: 1.0.0

  • Versi Hadoop: 2.4.0 (Hortonworks HDP 2.1)

  • Alur pekerjaan percikan: sc.textFile -> filter -> map -> filter -> mapToPair -> kurangiByKey -> map -> saveAsTextFile

  • Memasukan data

    • Ketik: file teks tunggal
    • Ukuran: 165GB
    • Jumlah baris: 454.568.833
  • Keluaran

    • Jumlah baris setelah filter kedua: 310.640.717
    • Jumlah baris file hasil: 99.848.268
    • Ukuran file hasil: 41GB

Pekerjaan dijalankan dengan konfigurasi berikut:

  1. --master yarn-client --executor-memory 19G --executor-cores 7 --num-executors 3 (pelaksana per simpul data, gunakan sebanyak inti)

  2. --master yarn-client --executor-memory 19G --executor-cores 4 --num-executors 3 (# core berkurang)

  3. --master yarn-client --executor-memory 4G --executor-cores 2 --num-executors 12 (kurang inti, lebih banyak eksekutor)

Waktu yang berlalu:

  1. 50 mnt 15 dtk

  2. 55 mnt 48 dtk

  3. 31 mnt 23 dtk

Yang mengejutkan saya, (3) jauh lebih cepat.
Saya berpikir bahwa (1) akan lebih cepat, karena akan ada lebih sedikit komunikasi antar-pelaksana saat pengocokan.
Meskipun # core dari (1) lebih kecil dari (3), #of core bukanlah faktor kunci karena 2) berkinerja baik.

(Berikut ini ditambahkan setelah jawaban pwilmot.)

Sebagai informasi, tangkapan layar monitor kinerja adalah sebagai berikut:

  • Ringkasan simpul data ganglia untuk (1) - pekerjaan dimulai pada 04:37.

Ringkasan simpul data ganglia untuk (1)

  • Ringkasan simpul data ganglia untuk (3) - pekerjaan dimulai pada 19:47. Harap abaikan grafik sebelum waktu itu.

Ringkasan simpul data ganglia untuk (3)

Grafik secara kasar dibagi menjadi 2 bagian:

  • Pertama: dari mulai hingga mengurangiByKey: CPU intensif, tidak ada aktivitas jaringan
  • Kedua: setelah mengurangiByKey: CPU lebih rendah, jaringan I / O selesai.

Seperti yang ditunjukkan grafik, (1) dapat menggunakan daya CPU sebanyak yang diberikan. Jadi, mungkin bukan masalah jumlah utas.

Bagaimana cara menjelaskan hasil ini?

zeodtr
sumber
2
Sekarang saya mencurigai GC ... Faktanya, di Spark UI total waktu yang dihabiskan untuk GC lebih lama 1) daripada 2).
zeodtr
Mengapa Anda tidak mencoba 3) dengan 19G? Mungkinkah membatasi pekerja pada 4G mengurangi efek NUMA yang dimiliki beberapa ppl? yaitu 4G Anda terletak di salah satu dari 2 core yang dialokasikan untuk alur kerja Anda dan karenanya ada sedikit perlambatan i / o, yang mengarah ke kinerja keseluruhan yang lebih baik. Kalau tidak, saya pikir pertanyaan utama adalah: berapa banyak inti / utas dapat menggunakan satu eksekutor tunggal pada pekerja? (Seseorang hanya dapat menentukan jumlah inti untuk seorang pekerja, bukan pada rincian pelaksananya)
Bacon
4
Btw saya baru saja memeriksa kode di core / src / main / scala / org / apache / spark / deploy / pekerja / ExecutorRunner.scala dan tampaknya 1 pelaksana = 1 utas pekerja.
Bacon
agak terlambat tapi di sini ada posting di cloudera tentang topik ini: blog.cloudera.com/blog/2015/03/…
Orelus
1
Ngomong-ngomong, saya menemukan informasi ini di dek slide cloudera slideshare.net/cloudera/… , yang menjelaskan sedikit tentang pengambilan keputusan dalam eksekutor, core dan memori
Manish Sahni

Jawaban:

58

Untuk mudah-mudahan membuat semua ini sedikit lebih konkret, berikut ini adalah contoh kerja mengkonfigurasi aplikasi Spark untuk menggunakan sebanyak mungkin cluster: Bayangkan sebuah cluster dengan enam node yang menjalankan NodeManagers, masing-masing dilengkapi dengan 16 core dan memori 64GB . Kapasitas NodeManager, yarn.nodemanager.resource.memory-mb dan yarn.nodemanager.resource.cpu-vcores, mungkin harus diatur ke 63 * 1024 = 64512 (megabita) dan 15 masing-masing. Kami menghindari mengalokasikan 100% dari sumber daya ke wadah BENANG karena node membutuhkan beberapa sumber daya untuk menjalankan OS dan daemon Hadoop. Dalam hal ini, kami meninggalkan satu gigabyte dan inti untuk proses sistem ini. Cloudera Manager membantu dengan memperhitungkan ini dan mengkonfigurasi properti BENANG ini secara otomatis.

Dorongan pertama kemungkinan akan menggunakan --num-executors 6 --executor-core 15 --executor-memory 63G . Namun, ini adalah pendekatan yang salah karena:

63GB + overhead memori pelaksana tidak sesuai dengan kapasitas 63GB NodeManagers. Master aplikasi akan mengambil inti pada salah satu node, yang berarti bahwa tidak akan ada ruang untuk pelaksana 15-inti pada simpul itu. 15 core per pelaksana dapat menyebabkan throughput HDFS I / O yang buruk.

Pilihan yang lebih baik adalah menggunakan --num-executors 17 --executor-core 5 --executor-memory 19G . Mengapa?

Konfigurasi ini menghasilkan tiga eksekutor pada semua node kecuali untuk yang memiliki AM, yang akan memiliki dua eksekutor. --executor-memory diturunkan sebagai (63/3 eksekutor per node) = 21. 21 * 0,07 = 1,47. 21 - 1.47 ~ 19.

Penjelasannya diberikan dalam sebuah artikel di blog Cloudera, How-to: Tune Your Apache Spark Jobs (Bagian 2) .

DzOrdre
sumber
1
"Konfigurasi ini menghasilkan tiga eksekutor pada semua node kecuali satu yang memiliki AM, yang akan memiliki dua eksekutor." Apa artinya ini dengan "--executor-core 5"?
derek
Ini berarti setiap pelaksana menggunakan 5 core. Setiap node memiliki 3 eksekutor karena itu menggunakan 15 core, kecuali salah satu dari node tersebut juga akan menjalankan aplikasi master untuk pekerjaan tersebut, sehingga hanya dapat menampung 2 eksekutor yaitu 10 core yang digunakan sebagai eksekutor.
Davos
Dijelaskan dengan baik - harap dicatat bahwa ini berlaku untuk yarn.scheduler.capacity.resource-calculatordinonaktifkan, yang merupakan default. Ini karena secara default itu menjadwalkan oleh Memori dan bukan oleh CPU.
YoYo
1
Semakin banyak pelaksana dapat menyebabkan throughput HDFS I / O yang buruk. Jadi jika saya tidak menggunakan HDFS sama sekali, dalam hal ini dapatkah saya menggunakan lebih dari 5 core per pelaksana?
Darshan
Saya pikir master Aplikasi berjalan pada setiap Node. Per di atas, yang berarti hanya akan ada 1 Master Aplikasi untuk menjalankan pekerjaan. Apakah itu benar?
Roshan Fernando
15

Saat Anda menjalankan aplikasi percikan Anda di atas HDFS, menurut Sandy Ryza

Saya telah memperhatikan bahwa klien HDFS memiliki masalah dengan banyak utas bersamaan. Tebakan kasar adalah bahwa paling banyak lima tugas per pelaksana dapat mencapai throughput penulisan penuh, jadi ada baiknya untuk menjaga jumlah inti per pelaksana di bawah angka itu.

Jadi saya percaya bahwa konfigurasi pertama Anda lebih lambat dari yang ketiga adalah karena throughput HDFS I / O yang buruk

tgbaggio
sumber
11

Saya belum bermain dengan pengaturan ini sendiri jadi ini hanya spekulasi tetapi jika kita menganggap masalah ini sebagai inti dan utas normal dalam sistem terdistribusi maka di kluster Anda, Anda dapat menggunakan hingga 12 inti (mesin 4 * 3) dan 24 utas (8 * 3 mesin). Dalam dua contoh pertama Anda, Anda memberikan pekerjaan Anda sejumlah inti (ruang komputasi potensial) tetapi jumlah utas (pekerjaan) untuk dijalankan pada inti tersebut sangat terbatas sehingga Anda tidak dapat menggunakan banyak daya pemrosesan yang dialokasikan dan dengan demikian pekerjaan lebih lambat meskipun ada lebih banyak sumber daya komputasi yang dialokasikan.

Anda menyebutkan bahwa kekhawatiran Anda adalah pada langkah acak - sementara itu bagus untuk membatasi overhead pada langkah acak itu umumnya jauh lebih penting untuk memanfaatkan paralelisasi cluster. Pikirkan tentang kasus ekstrim - program berulir tunggal dengan nol acak.

pwilmot
sumber
Terima kasih atas jawaban Anda. Tetapi saya menduga bahwa jumlah utas bukanlah masalah utama. Saya telah menambahkan tangkapan layar pemantauan. Seperti yang ditunjukkan grafik, 1) dapat menggunakan daya CPU sebanyak yang diberikan.
zeodtr
1
@zeodtr pwilmot benar - Anda perlu 2-4 tugas MINIMUM untuk memanfaatkan potensi penuh dari inti Anda. Begini - saya biasanya menggunakan setidaknya 1000 partisi untuk 80 core cluster saya.
samthebest
@ samthebest Yang ingin saya ketahui adalah alasan perbedaan kinerja antara 1) dan 3). Ketika saya menonton Spark UI, keduanya menjalankan 21 tugas secara paralel di bagian 2. (mengapa 21 bukannya 24 dalam kasus 3) tidak diketahui untuk saat ini) Tapi, tugas untuk 3) hanya berjalan lebih cepat.
zeodtr
10

Jawaban singkat : Saya pikir tgbaggio benar. Anda menekan batas throughput HDFS pada eksekutor Anda.

Saya pikir jawabannya di sini mungkin sedikit lebih sederhana daripada beberapa rekomendasi di sini.

Petunjuk bagi saya adalah dalam grafik jaringan cluster. Untuk menjalankan 1 pemanfaatannya stabil pada ~ 50 M byte / s. Untuk menjalankan 3 utilisasi stabil digandakan, sekitar 100 M byte / s.

Dari posting blog cloudera yang dibagikan oleh DzOrd , Anda dapat melihat kutipan penting ini:

Saya telah memperhatikan bahwa klien HDFS memiliki masalah dengan banyak utas bersamaan. Tebakan kasarnya adalah bahwa paling banyak lima tugas per pelaksana dapat mencapai throughput penulisan penuh, jadi ada baiknya untuk menjaga jumlah inti per pelaksana di bawah angka itu.

Jadi, mari kita lakukan beberapa perhitungan untuk melihat kinerja apa yang kita harapkan jika itu benar.


Jalankan 1: 19 GB, 7 core, 3 eksekutor

  • 3 pelaksana x 7 utas = 21 utas
  • dengan 7 core per pelaksana, kami mengharapkan IO terbatas untuk HDFS (maksimal ~ 5 core)
  • throughput efektif ~ = 3 pelaksana x 5 utas = 15 utas

Jalankan 3: 4 GB, 2 core, 12 pelaksana

  • 2 pelaksana x 12 utas = 24 utas
  • 2 core per pelaksana, jadi throughput hdfs ok
  • throughput efektif ~ = 12 pelaksana x 2 utas = 24 utas

Jika pekerjaan itu 100% dibatasi oleh konkurensi (jumlah utas). Kami perkirakan runtime akan berkorelasi terbalik sempurna dengan jumlah utas.

ratio_num_threads = nthread_job1 / nthread_job3 = 15/24 = 0.625
inv_ratio_runtime = 1/(duration_job1 / duration_job3) = 1/(50/31) = 31/50 = 0.62

Jadi ratio_num_threads ~= inv_ratio_runtime, dan sepertinya jaringan kami terbatas.

Efek yang sama ini menjelaskan perbedaan antara Run 1 dan Run 2.


Jalankan 2: 19 GB, 4 core, 3 eksekutor

  • 3 pelaksana x 4 utas = 12 utas
  • dengan 4 core per pelaksana, ok IO ke HDFS
  • throughput efektif ~ = 3 pelaksana x 4 utas = 12 utas

Membandingkan jumlah utas efektif dan runtime:

ratio_num_threads = nthread_job2 / nthread_job1 = 12/15 = 0.8
inv_ratio_runtime = 1/(duration_job2 / duration_job1) = 1/(55/50) = 50/55 = 0.91

Ini tidak sesempurna perbandingan terakhir, tapi kami masih melihat penurunan kinerja yang sama ketika kami kehilangan utas.

Sekarang untuk bagian terakhir: mengapa kita mendapatkan kinerja yang lebih baik dengan lebih banyak utas, khususnya. lebih banyak utas daripada jumlah CPU?

Penjelasan yang baik tentang perbedaan antara paralelisme (apa yang kita dapatkan dengan membagi data ke banyak CPU) dan konkurensi (apa yang kita dapatkan ketika kita menggunakan banyak utas untuk melakukan pekerjaan pada satu CPU) disediakan dalam pos yang luar biasa ini oleh Rob Pike: Concurrency bukan paralelisme .

Penjelasan singkatnya adalah bahwa jika pekerjaan Spark berinteraksi dengan sistem file atau jaringan CPU menghabiskan banyak waktu menunggu komunikasi dengan antarmuka tersebut dan tidak menghabiskan banyak waktu sebenarnya "melakukan pekerjaan". Dengan memberikan CPU lebih dari 1 tugas untuk dikerjakan sekaligus, mereka menghabiskan lebih sedikit waktu menunggu dan lebih banyak waktu untuk bekerja, dan Anda melihat kinerja yang lebih baik.

turtlemonvh
sumber
1
Penjelasan menarik dan meyakinkan, saya bertanya-tanya apakah bagaimana Anda menebak Anda bahwa eksekutor memiliki 5 tugas batas untuk mencapai throughput maksimum.
Dat Nguyen
Jadi nomor 5 bukanlah sesuatu yang saya temukan: Saya hanya melihat tanda-tanda kemacetan IO dan pergi mencari dari mana kemacetan itu mungkin berasal.
turtlemonvh
8

Dari sumber daya luar biasa yang tersedia di halaman paket Sparklyr RStudio :

DEFINISI SPARK :

Mungkin bermanfaat untuk memberikan beberapa definisi sederhana untuk nomenklatur Spark:

Node : Server

Worker Node : Server yang merupakan bagian dari cluster dan tersedia untuk menjalankan pekerjaan Spark

Master Node : Server yang mengoordinasikan node Worker.

Pelaksana : Semacam mesin virtual di dalam sebuah simpul. Satu Node dapat memiliki beberapa Pelaksana.

Driver Node : Node yang memulai sesi Spark. Biasanya, ini akan menjadi server tempat sparklyr berada.

Driver (Pelaksana) : Driver Node juga akan muncul di daftar Executor.

d8aninja
sumber
1

Alokasi Spark Dinamis memberi fleksibilitas dan mengalokasikan sumber daya secara dinamis. Dalam jumlah ini, eksekutif minimum dan maksimum dapat diberikan. Juga jumlah pelaksana yang harus diluncurkan pada awal aplikasi juga dapat diberikan.

Baca di bawah ini pada yang sama:

http://spark.apache.org/docs/latest/configuration.html#dynamic-allocation

Harikrishnan Ck
sumber
1

Ada masalah kecil dalam dua konfigurasi pertama yang saya pikir. Konsep-konsep utas dan inti seperti berikut. Konsep threading adalah jika core ideal maka gunakan core itu untuk memproses data. Jadi memori tidak sepenuhnya digunakan dalam dua kasus pertama. Jika Anda ingin menandai contoh ini pilih mesin yang memiliki lebih dari 10 core pada setiap mesin. Kemudian lakukan tanda bench.

Tapi jangan memberikan lebih dari 5 core per pelaksana, akan ada neck bottle pada kinerja i / o.

Jadi mesin terbaik untuk melakukan penandaan bangku ini mungkin data node yang memiliki 10 core.

Spesifikasi mesin simpul data: CPU: Core i7-4790 (# inti: 10, # utas: 20) RAM: 32GB (8GB x 4) HDD: 8TB (2TB x 4)

loneStar
sumber
0

Saya pikir salah satu alasan utama adalah lokalitas. Ukuran file input Anda adalah 165G, blok terkait file tersebut pasti didistribusikan lebih dari beberapa DataNodes, lebih banyak pelaksana dapat menghindari salinan jaringan.

Cobalah untuk mengatur jumlah blok sama dengan pelaksana, saya pikir bisa lebih cepat.

zwb
sumber