Spark java.lang.OutOfMemoryError: Java heap space

228

Cluster saya: 1 master, 11 slave, setiap node memiliki memori 6 GB.

Pengaturan saya:

spark.executor.memory=4g, Dspark.akka.frameSize=512

Inilah masalahnya:

Pertama , saya membaca beberapa data (2,19 GB) dari HDFS ke RDD:

val imageBundleRDD = sc.newAPIHadoopFile(...)

Kedua , lakukan sesuatu pada RDD ini:

val res = imageBundleRDD.map(data => {
                               val desPoints = threeDReconstruction(data._2, bg)
                                 (data._1, desPoints)
                             })

Terakhir , keluaran ke HDFS:

res.saveAsNewAPIHadoopFile(...)

Ketika saya menjalankan program saya itu menunjukkan:

.....
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:24 as TID 33 on executor 9: Salve7.Hadoop (NODE_LOCAL)
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:24 as 30618515 bytes in 210 ms
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:36 as TID 34 on executor 2: Salve11.Hadoop (NODE_LOCAL)
14/01/15 21:42:28 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:36 as 30618515 bytes in 449 ms
14/01/15 21:42:28 INFO cluster.ClusterTaskSetManager: Starting task 1.0:32 as TID 35 on executor 7: Salve4.Hadoop (NODE_LOCAL)
Uncaught error from thread [spark-akka.actor.default-dispatcher-3] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[spark]
java.lang.OutOfMemoryError: Java heap space

Terlalu banyak tugas?

PS : Semuanya baik-baik saja ketika input data sekitar 225 MB.

Bagaimana saya bisa menyelesaikan masalah ini?

hequn8128
sumber
bagaimana menjalankan spark? apakah itu dari konsol? atau skrip deploy apa yang Anda gunakan?
Tombart
Saya menggunakan sbt untuk mengkompilasi dan menjalankan aplikasi saya. paket sbt kemudian sbt jalankan. Saya mengimplementasikan program yang sama pada hadoop sebulan yang lalu, dan saya bertemu dengan masalah yang sama dari OutOfMemoryError, tetapi dalam hadoop itu dapat dengan mudah diselesaikan dengan meningkatkan nilai mapred.child.java.opts dari Xmx200m ke Xmx400m. Apakah percikan memiliki pengaturan jvm untuk tugas-tugasnya? Saya ingin tahu apakah spark.executor.memory memiliki arti yang sama seperti mapred.child.java.opts dalam hadoop. Dalam spark.executor.memory program saya telah ditetapkan untuk 4g jauh lebih besar dari Xmx400m di hadoop. Terima kasih ~
hequn8128
Apakah tiga langkah yang Anda sebutkan satu-satunya yang Anda lakukan? Berapa ukuran data yang dihasilkan oleh (data._1, desPoints) - ini harus sesuai dengan memori khususnya jika data ini kemudian dipindahkan ke tahap lain
Arnon Rotem-Gal-Oz
1
Apa konfigurasi memori untuk driver? Periksa server mana yang mendapatkan kesalahan memori. Apakah itu driver atau salah satu pelaksana.
RanP
Lihat di sini semua properti konfigurasi: spark.apache.org/docs/2.1.0/configuration.html
Naramsim

Jawaban:

364

Saya punya beberapa saran:

  • Jika node Anda dikonfigurasi untuk memiliki maksimum 6g untuk Spark (dan meninggalkan sedikit untuk proses lain), maka gunakan 6g daripada 4g , spark.executor.memory=6g. Pastikan Anda menggunakan memori sebanyak mungkin dengan memeriksa UI (ini akan mengatakan berapa banyak yang Anda gunakan)
  • Coba gunakan lebih banyak partisi, Anda harus memiliki 2-4 per CPU. Meningkatkan jumlah partisi sering kali merupakan cara termudah untuk membuat program lebih stabil (dan seringkali lebih cepat). Untuk sejumlah besar data, Anda mungkin perlu lebih dari 4 per CPU, saya harus menggunakan 8000 partisi dalam beberapa kasus!
  • Kurangi fraksi memori yang disediakan untuk caching , gunakan spark.storage.memoryFraction. Jika Anda tidak menggunakan cache()atau persistdalam kode Anda, ini mungkin juga 0. Secara default adalah 0,6, yang berarti Anda hanya mendapatkan 0,4 * 4g memori untuk tumpukan Anda. IME mengurangi mem frac sering membuat OOM pergi. UPDATE: Dari spark 1.6 tampaknya kita tidak perlu lagi bermain dengan nilai-nilai ini, spark akan menentukannya secara otomatis.
  • Mirip dengan di atas tetapi mengocok fraksi memori . Jika pekerjaan Anda tidak memerlukan banyak memori shuffle, kemudian atur ke nilai yang lebih rendah (ini dapat menyebabkan shuffles Anda tumpah ke disk yang dapat berdampak buruk pada kecepatan). Kadang-kadang ketika itu adalah operasi acak yang perlu Anda lakukan sebaliknya yaitu mengaturnya menjadi sesuatu yang besar, seperti 0,8, atau pastikan Anda membiarkan shuffles Anda tumpah ke disk (itu adalah default sejak 1.0.0).
  • Berhati- hatilah dengan kebocoran memori , ini sering disebabkan oleh tidak sengaja menutup objek yang tidak Anda butuhkan di lambda Anda. Cara untuk mendiagnosis adalah dengan melihat "tugas berseri sebagai XXX byte" dalam log, jika XXX lebih besar dari beberapa k atau lebih dari satu MB, Anda mungkin memiliki kebocoran memori. Lihat https://stackoverflow.com/a/25270600/1586965
  • Terkait dengan di atas; gunakan variabel broadcast jika Anda benar-benar membutuhkan objek besar.
  • Jika Anda melakukan caching RDD besar dan dapat mengorbankan beberapa waktu akses, pertimbangkan untuk membuat serial RDD http://spark.apache.org/docs/latest/tuning.html#serialized-rdd-storage . Atau bahkan caching di disk (yang terkadang tidak terlalu buruk jika menggunakan SSD).
  • ( Lanjutan ) Terkait dengan di atas, hindari Stringdan struktur bersarang banyak (seperti Mapdan kelas kasus bersarang). Jika memungkinkan cobalah untuk hanya menggunakan tipe primitif dan indeks semua non-primitif terutama jika Anda mengharapkan banyak duplikat. Pilih WrappedArraylebih dari struktur bersarang bila memungkinkan. Atau bahkan meluncurkan serialisasi Anda sendiri - ANDA akan memiliki informasi paling banyak tentang cara efisien mengembalikan data Anda ke dalam byte, GUNAKAN !
  • ( bit hacky ) Sekali lagi ketika melakukan caching, pertimbangkan untuk menggunakan Datasetcache struktur Anda karena akan menggunakan serialisasi yang lebih efisien. Ini harus dianggap sebagai retasan jika dibandingkan dengan poin sebelumnya. Membangun pengetahuan domain Anda ke dalam algo / serialisasi Anda dapat meminimalkan memori / cache-ruang sebesar 100x atau 1000x, sedangkan semua Datasetkemungkinan akan memberi adalah 2x - 5x dalam memori dan 10x dikompresi (parket) pada disk.

http://spark.apache.org/docs/1.2.1/configuration.html

EDIT: (Jadi saya bisa google sendiri lebih mudah) Berikut ini juga menunjukkan masalah ini:

java.lang.OutOfMemoryError : GC overhead limit exceeded
samthebest
sumber
Terima kasih atas saran Anda ~ Jika saya menetapkan spark.executor.memory = 6g, percikan akan memiliki masalah: "periksa UI cluster Anda untuk memastikan bahwa pekerja terdaftar dan memiliki memori yang cukup". Menyetel spark.storage.memoryFraction ke 0,1 tidak dapat menyelesaikan masalah dengan baik. Mungkin masalahnya terletak pada kode saya. Terima kasih!
hequn8128
2
@samthebest Ini adalah jawaban yang fantastis. Saya sangat menghargai bantuan pencatatan untuk menemukan kebocoran memori.
Myles Baker
1
Hai @samthebest bagaimana Anda menentukan 8000 partisi? Karena saya menggunakan Spark sql, saya hanya dapat menentukan partisi menggunakan spark.sql.shuffle.partitions, nilai default adalah 200 jika saya mengaturnya menjadi lebih banyak saya mencoba mengaturnya menjadi 1000 tetapi tidak membantu mendapatkan OOM apakah Anda tahu apa yang harus optimal nilai partisi Saya memiliki data miring 1 TB untuk diproses dan melibatkan kelompok menurut permintaan sarang. Tolong dibimbing
Umesh K
2
Hai @ user449355 tolong bisakah Anda mengajukan pertanyaan baru? Karena takut memulai utas komentar yang panjang :) Jika Anda mengalami masalah, kemungkinan orang lain melakukannya, dan sebuah pertanyaan akan membuatnya lebih mudah ditemukan untuk semua.
samthebest
1
Untuk poin pertama Anda, @samthebest, Anda tidak boleh menggunakan SEMUA memori untuk spark.executor.memorykarena Anda pasti membutuhkan sejumlah memori untuk overhead I / O. Jika Anda menggunakan semua itu, itu akan memperlambat program Anda. Pengecualian untuk ini mungkin Unix, dalam hal ini Anda memiliki ruang swap.
Hunle
58

Untuk menambahkan kasus penggunaan ke ini yang sering tidak dibahas, saya akan mengajukan solusi ketika mengirimkan Sparkaplikasi melalui spark-submitdalam mode lokal .

Menurut gitbook Mastering Apache Spark oleh Jacek Laskowski :

Anda dapat menjalankan Spark dalam mode lokal. Dalam mode penyebaran JVM tunggal yang tidak terdistribusi ini, Spark memunculkan semua komponen eksekusi - driver, pelaksana, backend, dan master - dalam JVM yang sama. Ini adalah satu-satunya mode di mana driver digunakan untuk eksekusi.

Jadi, jika Anda mengalami OOMkesalahan dengan heap, itu sudah cukup untuk menyesuaikan driver-memorydaripada executor-memory.

Berikut ini sebuah contoh:

spark-1.6.1/bin/spark-submit
  --class "MyClass"
  --driver-memory 12g
  --master local[*] 
  target/scala-2.10/simple-project_2.10-1.0.jar 
Brian
sumber
Berapa banyak persentase yang harus kita pertimbangkan untuk memori driver dalam mode stand-alone.
Yashwanth Kambala
@ Brian, Dalam mode lokal, apakah memori driver harus lebih besar dari ukuran data input? Apakah mungkin untuk menentukan jumlah partisi untuk dataset input, sehingga pekerjaan Spark dapat menangani dataset jauh lebih besar daripada RAM yang tersedia?
fuyi
19

Anda harus mengkonfigurasi pengaturan memori offHeap seperti yang ditunjukkan di bawah ini:

val spark = SparkSession
     .builder()
     .master("local[*]")
     .config("spark.executor.memory", "70g")
     .config("spark.driver.memory", "50g")
     .config("spark.memory.offHeap.enabled",true)
     .config("spark.memory.offHeap.size","16g")   
     .appName("sampleCodeForReference")
     .getOrCreate()

Berikan memori driver dan memori pelaksana sesuai ketersediaan RAM mesin Anda. Anda dapat meningkatkan ukuran offHeap jika Anda masih menghadapi masalah OutofMemory .

pavan.vn101
sumber
Ditambahkan pengaturan offHeap membantu
kennyut
2
pengaturan memori driver dalam kode Anda tidak akan berfungsi, baca dokumentasi percikan untuk ini: Properti percikan terutama dapat dibagi menjadi dua jenis: satu terkait dengan penggunaan, seperti "spark.driver.memory", "spark.executor.instances", properti semacam ini mungkin tidak terpengaruh ketika mengatur secara terprogram melalui SparkConf dalam runtime, atau perilaku tergantung pada manajer cluster dan mode penyebaran yang Anda pilih, sehingga akan disarankan untuk mengatur melalui file konfigurasi atau opsi perintah baris percikan-kirim.
Abdulhafeth Sartawi
1
JAWABAN TERBAIK! Masalah saya adalah bahwa Spark tidak diinstal pada master node, saya hanya menggunakan PySpark untuk terhubung ke HDFS dan mendapatkan kesalahan yang sama. Penggunaan configmemecahkan masalah.
Mikhail_Sam
Saya baru saja menambahkan konfigurasi menggunakan perintah spark-submit untuk memperbaiki masalah ukuran tumpukan. Terima kasih.
Pritam Sadhukhan
16

Anda harus menambah memori driver. Dalam folder $ SPARK_HOME / conf Anda, Anda harus menemukan file spark-defaults.conf, mengedit dan mengatur spark.driver.memory 4000mtergantung pada memori pada master Anda, saya kira. Inilah yang memperbaiki masalah bagi saya dan semuanya berjalan lancar

kulit biru
sumber
Berapa persentase mem yang harus dibagikan, dalam keadaan berdiri sendiri
Yashwanth Kambala
14

Lihat skrip start up ukuran heap Java diatur di sana, sepertinya Anda tidak mengatur ini sebelum menjalankan Spark pekerja.

# Set SPARK_MEM if it isn't already set since we also use it for this process
SPARK_MEM=${SPARK_MEM:-512m}
export SPARK_MEM

# Set JAVA_OPTS to be able to load native libraries and to set heap size
JAVA_OPTS="$OUR_JAVA_OPTS"
JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH"
JAVA_OPTS="$JAVA_OPTS -Xms$SPARK_MEM -Xmx$SPARK_MEM"

Anda dapat menemukan dokumentasi untuk menggunakan skrip di sini .

Tombart
sumber
Terima kasih ~ saya akan coba nanti. Dari spark ui, itu menunjukkan memori setiap pelaksana adalah 4096. Jadi pengaturan telah diaktifkan, bukan?
hequn8128
Lihat jawaban Anda saat saya menghadapi masalah serupa ( stackoverflow.com/questions/34762432/… ). Melihat tautan yang Anda berikan sepertinya menyetel Xms / Xmx sudah tidak ada lagi, dapatkah Anda memberi tahu alasannya?
Seffy
start up scriptsSayangnya, konten pada skrip yang ditautkan oleh telah berubah. Tidak ada opsi seperti ada pada 2019-12-19
David Groomes
7

Saya sangat menderita karena masalah ini, kami menggunakan alokasi sumber daya dinamis dan saya pikir ini akan memanfaatkan sumber daya kluster saya yang paling sesuai dengan aplikasi.

Tetapi kenyataannya adalah, alokasi sumber daya dinamis tidak mengatur memori driver dan menyimpannya ke nilai default yaitu 1g.

Saya telah mengatasinya dengan menetapkan spark.driver.memory ke nomor yang sesuai dengan memori pengemudi saya (untuk ram 32 GB saya atur ke 18gb)

Anda dapat mengaturnya menggunakan perintah spark submit sebagai berikut:

spark-submit --conf spark.driver.memory=18gb ....cont

Catatan yang sangat penting, properti ini tidak akan dipertimbangkan jika Anda menetapkannya dari kode, menurut dokumentasi percikan:

Properti Spark terutama dapat dibagi menjadi dua jenis: satu terkait dengan penggunaan, seperti "spark.driver.memory", "spark.executor.instances", properti jenis ini mungkin tidak terpengaruh ketika menyetel secara terprogram melalui SparkConf saat runtime, atau perilakunya tergantung pada manajer kluster dan mode penyebaran mana yang Anda pilih, sehingga disarankan untuk mengatur melalui file konfigurasi atau opsi baris perintah percikan-kirim; lain terutama terkait dengan kontrol runtime Spark, seperti "spark.task.maxFailures", properti semacam ini dapat diatur dengan cara baik.

Abdulhafeth Sartawi
sumber
2
Anda harus menggunakan --conf spark.driver.memory = 18g
merenptah
5

Secara garis besar, percikan memori JVM Pelaksana dapat dibagi menjadi dua bagian. Memori percikan dan memori pengguna. Ini dikontrol oleh properti spark.memory.fraction- nilainya antara 0 dan 1. Saat bekerja dengan gambar atau melakukan pemrosesan intensif memori dalam aplikasi percikan, pertimbangkan untuk mengurangispark.memory.fraction . Ini akan membuat lebih banyak memori tersedia untuk pekerjaan aplikasi Anda. Spark dapat tumpah, sehingga masih akan bekerja dengan berbagi memori yang lebih sedikit.

Bagian kedua dari masalah adalah pembagian kerja. Jika memungkinkan, partisi data Anda menjadi potongan yang lebih kecil. Data yang lebih kecil mungkin membutuhkan lebih sedikit memori. Tetapi jika itu tidak mungkin, Anda berkorban menghitung untuk ingatan. Biasanya satu eksekutor akan menjalankan banyak core. Memori total pelaksana harus cukup untuk menangani persyaratan memori dari semua tugas bersamaan. Jika menambah memori pelaksana bukan opsi, Anda dapat mengurangi inti per pelaksana sehingga setiap tugas mendapatkan lebih banyak memori untuk bekerja. Uji dengan 1 pelaksana inti yang memiliki memori sebesar mungkin yang dapat Anda berikan dan kemudian terus tingkatkan core sampai Anda menemukan jumlah inti terbaik.

Rohit Karlupia
sumber
5

Apakah Anda membuang log master gc Anda? Jadi saya menemukan masalah yang sama dan saya menemukan SPARK_DRIVER_MEMORY hanya mengatur heap Xmx. Ukuran tumpukan awal tetap 1G dan ukuran tumpukan tidak pernah naik ke tumpukan Xmx.

Melewati "--conf" spark.driver.extraJavaOptions = -Xms20g "menyelesaikan masalah saya.

ps aux | grep java dan Anda akan melihat log ikuti: =

24501 30.7 1.7 41782944 2318184 poin / 0 Sl + 18:49 0:33 / usr / java / terbaru / bin / java -cp / opt / spark / conf /: / opt / spark / jars / * -Xmx30g -Xms20g

Yunzhao Yang
sumber
3

Lokasi untuk mengatur ukuran tumpukan memori (setidaknya di spark-1.0.0) di conf / spark-env. Variabel yang relevan adalah SPARK_EXECUTOR_MEMORY& SPARK_DRIVER_MEMORY. Lebih banyak dokumen ada dalam panduan penerapan

Juga, jangan lupa menyalin file konfigurasi ke semua node slave.

Amnon
sumber
4
Bagaimana Anda tahu yang mana yang harus disesuaikan antara SPARK_EXECUTOR_MEMORY& SPARK_DRIVER_MEMORY?
Hunle
13
yaitu kesalahan apa yang akan memberitahu Anda untuk meningkatkan SPARK_EXECUTOR_MEMORY, dan kesalahan apa yang akan memberitahu Anda untuk meningkatkan SPARK_DRIVER_MEMORY?
Hunle
2

Saya memiliki beberapa saran untuk kesalahan yang disebutkan di atas.

● Periksa memori pelaksana yang ditetapkan sebagai pelaksana mungkin harus berurusan dengan partisi yang membutuhkan lebih banyak memori daripada yang ditugaskan.

● Cobalah untuk melihat apakah lebih banyak shuffles aktif karena shuffles adalah operasi yang mahal karena melibatkan disk I / O, serialisasi data, dan I / O jaringan

● Gunakan Broadcast Joins

● Hindari menggunakan groupByKeys dan coba ganti dengan ReduceByKey

● Hindari menggunakan Java Objects besar di mana pun pengocokan terjadi

Unmesha SreeVeni
sumber
Maaf untuk membajak kueri orang lain, tetapi bagaimana cara menggunakan dikurangiByKey atas groupBy?
Somil Aseeja
1

Dari pemahaman saya tentang kode yang diberikan di atas, itu memuat file dan melakukan operasi peta dan menyimpannya kembali. Tidak ada operasi yang membutuhkan pengocokan. Juga, tidak ada operasi yang memerlukan data untuk dibawa ke pengemudi sehingga menyetel apa pun yang terkait dengan shuffle atau driver mungkin tidak berdampak. Pengemudi memang memiliki masalah ketika ada terlalu banyak tugas tapi ini hanya sampai versi 2.0.2 memicu. Mungkin ada dua hal yang salah.

  • Hanya ada satu atau beberapa pelaksana. Tambah jumlah eksekutor sehingga mereka dapat dialokasikan ke budak yang berbeda. Jika Anda menggunakan benang perlu mengubah konfigurasi num-executors atau jika Anda menggunakan spark standalone maka perlu menyetel num core per executor dan conf spark max core. Di standalone num executors = max core / core per executor.
  • Jumlah partisi sangat sedikit atau mungkin hanya satu. Jadi, jika ini rendah walaupun kita memiliki multi-core, multi-executor tidak akan banyak membantu karena paralelisasi tergantung pada jumlah partisi. Jadi tambah partisi dengan melakukan imageBundleRDD.repartition (11)
Shridhar
sumber
0

Pengaturan konfigurasi yang tepat ini membantu menyelesaikan masalah.

spark-submit --conf spark.yarn.maxAppAttempts=2 --executor-memory 10g --num-executors 50 --driver-memory 12g
swapnil shashank
sumber