Jalan buntu ketika banyak pekerjaan percikan dijadwalkan secara bersamaan

17

Menggunakan percikan 2.4.4 yang berjalan dalam mode gugus BENANG dengan penjadwal FIFO percikan.

Saya mengirimkan beberapa operasi dataframe percikan (yaitu menulis data ke S3) menggunakan pelaksana kumpulan thread dengan sejumlah variabel utas. Ini berfungsi dengan baik jika saya memiliki ~ 10 utas, tetapi jika saya menggunakan ratusan utas, tampaknya ada jalan buntu, dengan tidak ada pekerjaan yang dijadwalkan menurut Spark UI.

Faktor-faktor apa yang mengontrol berapa banyak pekerjaan yang dapat dijadwalkan secara bersamaan? Sumber daya driver (mis. Memori / inti)? Beberapa pengaturan konfigurasi percikan lainnya?

EDIT:

Berikut sinopsis singkat kode saya

ExecutorService pool = Executors.newFixedThreadPool(nThreads);
ExecutorCompletionService<Void> ecs = new ExecutorCompletionService<>(pool);

Dataset<Row> aHugeDf = spark.read.json(hundredsOfPaths);

List<Future<Void>> futures = listOfSeveralHundredThings
  .stream()
  .map(aThing -> ecs.submit(() -> {
    df
      .filter(col("some_column").equalTo(aThing))
      .write()
      .format("org.apache.hudi")
      .options(writeOptions)
      .save(outputPathFor(aThing));
    return null;
  }))
  .collect(Collectors.toList());

IntStream.range(0, futures.size()).forEach(i -> ecs.poll(30, TimeUnit.MINUTES));
exec.shutdownNow();

Pada titik tertentu, seiring nThreadskenaikan, percikan tampaknya tidak lagi menjadwalkan pekerjaan apa pun sebagaimana dibuktikan oleh:

  • ecs.poll(...) akhirnya kehabisan waktu
  • Tab Spark UI jobs tidak menunjukkan pekerjaan aktif
  • Tab pelaksana Spark UI tidak menampilkan tugas aktif untuk pelaksana apa pun
  • Tab SQL UI Spark menampilkan nThreadspermintaan menjalankan tanpa ID pekerjaan yang sedang berjalan

Lingkungan eksekusi saya adalah

  • AWS EMR 5.28.1
  • Percikan 2.4.4
  • Master node = m5.4xlarge
  • Node inti = 3x rd5.24xlarge
  • spark.driver.cores=24
  • spark.driver.memory=32g
  • spark.executor.memory=21g
  • spark.scheduler.mode=FIFO
Scott
sumber
Apakah ada bagian khusus yang membahas ini? Saya telah membaca dokumen itu beberapa kali dalam beberapa hari terakhir dan belum menemukan jawaban yang saya cari.
Scott
2
Bisakah Anda tunjukkan kode yang Anda gunakan untuk mengirimkan pekerjaan Spark melalui pelaksana kumpulan thread? Sepertinya kebuntuan terjadi sebelum pekerjaan Spark diajukan.
Salim
1
Bisakah Anda memposting kode Anda? Harap berikan perincian tentang env Anda: CPU, RAM; juga bagaimana Anda membuat utas: secara bersamaan atau dalam kelompok kecil 10?
Saheed
Maaf, bagaimana maksud Anda pekerjaan tidak dijadwalkan? Mereka tidak muncul di Spark UI, atau mereka muncul di daftar pekerjaan, tetapi tugas tidak dieksekusi? Either way, jika Anda mencurigai kebuntuan, silakan jalankan jstack -luntuk mendapatkan dump thread dengan info penguncian.
Daniel Darabos

Jawaban:

0

Jika mungkin tuliskan output dari pekerjaan ke AWS Elastic MapReduce hdfs (untuk memanfaatkan perubahan nama yang hampir seketika dan IO file yang lebih baik dari hdfs lokal) dan tambahkan langkah dstcp untuk memindahkan file ke S3, untuk menyelamatkan diri Anda dari semua masalah dalam menangani jeroan dari sebuah toko objek yang mencoba menjadi sistem file. Juga menulis ke hdfs lokal akan memungkinkan Anda untuk mengaktifkan spekulasi untuk mengontrol tugas-tugas pelarian tanpa jatuh ke dalam jebakan kebuntuan yang terkait dengan DirectOutputCommiter.

Jika Anda harus menggunakan S3 sebagai direktori keluaran, pastikan konfigurasi Spark berikut sudah disetel

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
spark.speculation false

Catatan: DirectParquetOutputCommitter dihapus dari Spark 2.0 karena kemungkinan kehilangan data. Sayangnya sampai kami telah meningkatkan konsistensi dari S3a, kami harus bekerja dengan solusinya. Semuanya membaik dengan Hadoop 2.8

Hindari keynames dalam urutan leksikografis. Satu dapat menggunakan hashing / awalan acak atau membalikkan tanggal-waktu untuk berkeliling. Triknya adalah memberi nama kunci Anda secara hierarkis, meletakkan hal-hal paling umum yang Anda saring di sebelah kiri kunci Anda. Dan tidak pernah memiliki garis bawah dalam nama bucket karena masalah DNS.

Mengaktifkan fs.s3a.fast.upload uploadbagian-bagian dari satu file ke Amazon S3 secara paralel

Rujuk artikel ini untuk lebih detail-

Mengatur spark.speculation di Spark 2.1.0 saat menulis ke s3

https://medium.com/@subhojit20_27731/apache-spark-and-amazon-s3-gotchas-and-best-practices-a767242f3d98

Devesh mehta
sumber
AWS memiliki committer sendiri docs.aws.amazon.com/emr/latest/ReleaseGuide/…
mazaneicha
0

IMO Anda kemungkinan mendekati masalah ini dengan salah. Kecuali jika Anda dapat menjamin bahwa jumlah tugas per pekerjaan sangat rendah, Anda kemungkinan tidak akan mendapatkan banyak peningkatan kinerja dengan memparalelkan 100 pekerjaan sekaligus. Cluster Anda hanya dapat mendukung 300 tugas sekaligus, dengan asumsi Anda menggunakan paralelisme default 200, itu hanya 1,5 pekerjaan. Saya sarankan menulis ulang kode Anda untuk menutup kueri maksimum bersamaan pada 10. Saya sangat curiga bahwa Anda memiliki 300 kueri dengan hanya satu tugas beberapa ratus yang benar-benar berjalan. Sebagian besar sistem pemrosesan data OLTP sengaja memiliki tingkat permintaan konkuren yang cukup rendah dibandingkan dengan sistem RDS yang lebih tradisional karena alasan ini.

juga

  1. Apache Hudi memiliki paralelisme default beberapa ratus FYI.
  2. Mengapa tidak Anda hanya partisi berdasarkan kolom filter Anda?
Andrew Long
sumber