Mendapatkan perilaku aneh saat memanggil fungsi di luar penutupan:
- ketika fungsi dalam suatu objek semuanya berfungsi
- ketika fungsi di kelas dapatkan:
Tugas tidak serializable: java.io.NotSerializableException: pengujian
Masalahnya adalah saya perlu kode saya di kelas dan bukan objek. Tahu mengapa hal ini terjadi? Apakah objek Scala diserialisasi (default?)?
Ini adalah contoh kode kerja:
object working extends App {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
//calling function outside closure
val after = rddList.map(someFunc(_))
def someFunc(a:Int) = a+1
after.collect().map(println(_))
}
Ini adalah contoh tidak berfungsi:
object NOTworking extends App {
new testing().doIT
}
//adding extends Serializable wont help
class testing {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
def doIT = {
//again calling the fucntion someFunc
val after = rddList.map(someFunc(_))
//this will crash (spark lazy)
after.collect().map(println(_))
}
def someFunc(a:Int) = a+1
}
scala
serialization
apache-spark
typesafe
Nimrod007
sumber
sumber
Jawaban:
RDD memperluas antarmuka Serialisable , jadi ini bukan yang menyebabkan tugas Anda gagal. Sekarang ini tidak berarti bahwa Anda dapat membuat serial
RDD
dengan Spark dan menghindariNotSerializableException
Spark adalah mesin komputasi terdistribusi dan abstraksi utamanya adalah dataset didistribusikan tangguh ( RDD ), yang dapat dilihat sebagai koleksi terdistribusi. Pada dasarnya, elemen-elemen RDD dipartisi di seluruh node cluster, tetapi Spark memisahkannya dari pengguna, membiarkan pengguna berinteraksi dengan RDD (koleksi) seolah-olah itu adalah yang lokal.
Tidak masuk ke terlalu banyak rincian, tetapi ketika Anda menjalankan transformasi yang berbeda pada RDD (
map
,flatMap
,filter
dan lain-lain), kode transformasi Anda (penutupan) adalah:Anda tentu saja dapat menjalankan ini secara lokal (seperti pada contoh Anda), tetapi semua fase tersebut (terlepas dari pengiriman melalui jaringan) masih terjadi. [Ini memungkinkan Anda menangkap bug apa pun bahkan sebelum menyebarkan ke produksi]
Apa yang terjadi dalam kasus kedua Anda adalah bahwa Anda memanggil metode, yang didefinisikan dalam kelas
testing
dari dalam fungsi peta. Spark melihat itu dan karena metode tidak dapat diserialisasi sendiri, Spark mencoba untuk membuat serial seluruhtesting
kelas, sehingga kode masih akan berfungsi ketika dieksekusi di JVM lain. Anda memiliki dua kemungkinan:Entah Anda membuat pengujian kelas secara serial, sehingga seluruh kelas dapat diserialisasi dengan Spark:
atau Anda membuat
someFunc
fungsi alih-alih metode (fungsi adalah objek di Scala), sehingga Spark akan dapat membuat cerita bersambung:Serupa, tetapi bukan masalah yang sama dengan serialisasi kelas yang dapat menarik bagi Anda dan Anda dapat membacanya di presentasi Spark Summit 2013 ini .
Sebagai catatan, Anda dapat menulis ulang
rddList.map(someFunc(_))
untukrddList.map(someFunc)
, mereka adalah persis sama. Biasanya, yang kedua lebih disukai karena kurang verbose dan lebih bersih untuk dibaca.EDIT (2015-03-15): SPARK-5307 memperkenalkan SerializationDebugger dan Spark 1.3.0 adalah versi pertama yang menggunakannya. Ini menambahkan jalur serialisasi ke NotSerializableException . Ketika NotSerializableException ditemui, debugger mengunjungi grafik objek untuk menemukan jalur menuju objek yang tidak dapat diserialisasi, dan menyusun informasi untuk membantu pengguna menemukan objek.
Dalam kasus OP, inilah yang akan dicetak ke stdout:
sumber
val test = new Test with Serializable
Jawaban Grega sangat bagus dalam menjelaskan mengapa kode asli tidak bekerja dan dua cara untuk memperbaiki masalah ini. Namun, solusi ini tidak terlalu fleksibel; pertimbangkan kasus di mana penutupan Anda mencakup pemanggilan metode pada non-
Serializable
kelas yang tidak dapat Anda kendalikan. Anda tidak bisa menambahkanSerializable
tag ke kelas ini atau mengubah implementasi yang mendasarinya untuk mengubah metode menjadi fungsi.Nilesh menyajikan solusi yang bagus untuk ini, tetapi solusinya dapat dibuat lebih ringkas dan umum:
Fungsi-serializer ini kemudian dapat digunakan untuk secara otomatis membungkus penutupan dan panggilan metode:
Teknik ini juga memiliki manfaat karena tidak memerlukan dependensi Hiu tambahan untuk mengakses
KryoSerializationWrapper
, karena Chill Twitter sudah ditarik oleh inti Sparksumber
Pembicaraan lengkap sepenuhnya menjelaskan masalah, yang mengusulkan cara pergeseran paradigma yang bagus untuk menghindari masalah serialisasi ini: https://github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory- kebocoran-no-ws.md
Jawaban pilihan teratas pada dasarnya menyarankan untuk membuang seluruh fitur bahasa - yang tidak lagi menggunakan metode dan hanya menggunakan fungsi. Memang dalam metode pemrograman fungsional di kelas harus dihindari, tetapi mengubahnya menjadi fungsi tidak menyelesaikan masalah desain di sini (lihat tautan di atas).
Sebagai perbaikan cepat dalam situasi khusus ini, Anda bisa menggunakan
@transient
anotasi untuk mengatakannya agar tidak mencoba membuat serialisasi nilai yang menyinggung (di sini,Spark.ctx
adalah kelas khusus bukan kelas Spark yang mengikuti penamaan OP):Anda juga dapat menyusun ulang kode sehingga rddList tinggal di tempat lain, tetapi itu juga jahat.
Masa Depan Mungkin Spora
Di masa depan, Scala akan memasukkan hal-hal ini yang disebut "spora" yang seharusnya memungkinkan kita untuk mengontrol butir apa yang bisa dan tidak secara tepat ditarik oleh penutupan. Lebih jauh lagi, ini akan mengubah semua kesalahan karena tidak sengaja menarik jenis yang tidak dapat diserialkan (atau nilai yang tidak diinginkan) menjadi kesalahan kompilasi daripada sekarang yang merupakan pengecualian runtime yang mengerikan / kebocoran memori.
http://docs.scala-lang.org/sips/pending/spores.html
Kiat tentang serialisasi Kryo
Saat menggunakan kyro, buatlah sehingga diperlukan pendaftaran, ini berarti Anda mendapatkan kesalahan alih-alih kebocoran memori:
"Akhirnya, saya tahu bahwa kryo memiliki kryo.setRegistrationOptional (benar) tetapi saya mengalami kesulitan untuk mencari cara menggunakannya. Ketika opsi ini diaktifkan, kryo tampaknya masih melempar pengecualian jika saya belum mendaftar. kelas. "
Strategi untuk mendaftarkan kelas dengan kryo
Tentu saja ini hanya memberi Anda kontrol tipe-level, bukan kontrol level-nilai.
... lebih banyak ide yang akan datang.
sumber
Saya memecahkan masalah ini menggunakan pendekatan yang berbeda. Anda hanya perlu membuat cerita bersambung benda sebelum melewati penutupan, dan menghapus cerita bersambung setelahnya. Pendekatan ini hanya berfungsi, bahkan jika kelas Anda tidak Serializable, karena menggunakan Kryo di belakang layar. Yang Anda butuhkan hanyalah kari. ;)
Berikut ini contoh bagaimana saya melakukannya:
Jangan ragu untuk membuat Blah serumit yang Anda inginkan, kelas, objek pendamping, kelas bersarang, referensi ke beberapa lib pihak ke-3.
KryoSerializationWrapper merujuk pada: https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala
sumber
KryoSerializationWrapper
Anda akan menemukan bahwa itu membuat Spark berpikir bahwa itu memangjava.io.Serializable
- itu hanya membuat cerita bersambung objek secara internal menggunakan Kryo - lebih cepat, lebih sederhana. Dan saya tidak berpikir itu berkaitan dengan contoh statis - itu hanya menderialisasi nilai ketika value.apply () dipanggil.Saya menghadapi masalah serupa, dan apa yang saya pahami dari jawaban Grega adalah
metode doIT Anda mencoba membuat serialisasi metode SomeFunc (_) , tetapi karena metode ini tidak bisa serial, ia mencoba membuat serialisasi pengujian kelas yang lagi-lagi tidak bisa serial.
Jadi, buat kode Anda berfungsi, Anda harus mendefinisikan someFunc di dalam metode doIT . Sebagai contoh:
Dan jika ada beberapa fungsi yang muncul dalam gambar, maka semua fungsi tersebut harus tersedia untuk konteks induk.
sumber
Saya tidak sepenuhnya yakin bahwa ini berlaku untuk Scala tetapi, di Jawa, saya menyelesaikannya
NotSerializableException
dengan refactoring kode saya sehingga penutupan tidak mengakses bidang non-serializablefinal
.sumber
FileWriter
ini adalahfinal
bidang dari kelas luar, Anda tidak bisa melakukannya. TetapiFileWriter
dapat dibangun dari aString
atau aFile
, keduanyaSerializable
. Jadi refactor kode Anda untuk membangun lokalFileWriter
berdasarkan nama file dari kelas luar.FYI di Spark 2.4 banyak dari Anda mungkin akan menghadapi masalah ini. Serialisasi Kryo menjadi lebih baik tetapi dalam banyak kasus Anda tidak dapat menggunakan spark.kryo.unsafe = true atau serializer kryo naif.
Untuk perbaikan cepat, coba ubah yang berikut dalam konfigurasi Spark Anda
ATAU
Saya memodifikasi transformasi RDD kustom yang saya temui atau pribadi menulis dengan menggunakan variabel siaran eksplisit dan memanfaatkan inbuilt twitter-chill api baru, mengubah mereka dari
rdd.map(row =>
kerdd.mapPartitions(partition => {
fungsi.Contoh
Jalan Tua (tidak terlalu bagus)
Cara Alternatif (lebih baik)
Cara baru ini hanya akan memanggil variabel broadcast satu kali per partisi yang lebih baik. Anda masih perlu menggunakan Serialisasi Java jika Anda tidak mendaftar kelas.
sumber