Tugas tidak serializable: java.io.NotSerializableException saat memanggil fungsi penutupan luar hanya pada kelas bukan objek

224

Mendapatkan perilaku aneh saat memanggil fungsi di luar penutupan:

  • ketika fungsi dalam suatu objek semuanya berfungsi
  • ketika fungsi di kelas dapatkan:

Tugas tidak serializable: java.io.NotSerializableException: pengujian

Masalahnya adalah saya perlu kode saya di kelas dan bukan objek. Tahu mengapa hal ini terjadi? Apakah objek Scala diserialisasi (default?)?

Ini adalah contoh kode kerja:

object working extends App {
    val list = List(1,2,3)

    val rddList = Spark.ctx.parallelize(list)
    //calling function outside closure 
    val after = rddList.map(someFunc(_))

    def someFunc(a:Int)  = a+1

    after.collect().map(println(_))
}

Ini adalah contoh tidak berfungsi:

object NOTworking extends App {
  new testing().doIT
}

//adding extends Serializable wont help
class testing {  
  val list = List(1,2,3)  
  val rddList = Spark.ctx.parallelize(list)

  def doIT =  {
    //again calling the fucntion someFunc 
    val after = rddList.map(someFunc(_))
    //this will crash (spark lazy)
    after.collect().map(println(_))
  }

  def someFunc(a:Int) = a+1
}
Nimrod007
sumber
Apa itu Spark.ctx? Tidak ada objek Spark dengan metode ctx AFAICT
javadba

Jawaban:

334

RDD memperluas antarmuka Serialisable , jadi ini bukan yang menyebabkan tugas Anda gagal. Sekarang ini tidak berarti bahwa Anda dapat membuat serial RDDdengan Spark dan menghindariNotSerializableException

Spark adalah mesin komputasi terdistribusi dan abstraksi utamanya adalah dataset didistribusikan tangguh ( RDD ), yang dapat dilihat sebagai koleksi terdistribusi. Pada dasarnya, elemen-elemen RDD dipartisi di seluruh node cluster, tetapi Spark memisahkannya dari pengguna, membiarkan pengguna berinteraksi dengan RDD (koleksi) seolah-olah itu adalah yang lokal.

Tidak masuk ke terlalu banyak rincian, tetapi ketika Anda menjalankan transformasi yang berbeda pada RDD ( map, flatMap, filterdan lain-lain), kode transformasi Anda (penutupan) adalah:

  1. serial pada node driver,
  2. dikirim ke node yang sesuai di cluster,
  3. deserialized,
  4. dan akhirnya dieksekusi pada node

Anda tentu saja dapat menjalankan ini secara lokal (seperti pada contoh Anda), tetapi semua fase tersebut (terlepas dari pengiriman melalui jaringan) masih terjadi. [Ini memungkinkan Anda menangkap bug apa pun bahkan sebelum menyebarkan ke produksi]

Apa yang terjadi dalam kasus kedua Anda adalah bahwa Anda memanggil metode, yang didefinisikan dalam kelas testingdari dalam fungsi peta. Spark melihat itu dan karena metode tidak dapat diserialisasi sendiri, Spark mencoba untuk membuat serial seluruh testing kelas, sehingga kode masih akan berfungsi ketika dieksekusi di JVM lain. Anda memiliki dua kemungkinan:

Entah Anda membuat pengujian kelas secara serial, sehingga seluruh kelas dapat diserialisasi dengan Spark:

import org.apache.spark.{SparkContext,SparkConf}

object Spark {
  val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

object NOTworking extends App {
  new Test().doIT
}

class Test extends java.io.Serializable {
  val rddList = Spark.ctx.parallelize(List(1,2,3))

  def doIT() =  {
    val after = rddList.map(someFunc)
    after.collect().foreach(println)
  }

  def someFunc(a: Int) = a + 1
}

atau Anda membuat someFuncfungsi alih-alih metode (fungsi adalah objek di Scala), sehingga Spark akan dapat membuat cerita bersambung:

import org.apache.spark.{SparkContext,SparkConf}

object Spark {
  val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

object NOTworking extends App {
  new Test().doIT
}

class Test {
  val rddList = Spark.ctx.parallelize(List(1,2,3))

  def doIT() =  {
    val after = rddList.map(someFunc)
    after.collect().foreach(println)
  }

  val someFunc = (a: Int) => a + 1
}

Serupa, tetapi bukan masalah yang sama dengan serialisasi kelas yang dapat menarik bagi Anda dan Anda dapat membacanya di presentasi Spark Summit 2013 ini .

Sebagai catatan, Anda dapat menulis ulang rddList.map(someFunc(_))untuk rddList.map(someFunc), mereka adalah persis sama. Biasanya, yang kedua lebih disukai karena kurang verbose dan lebih bersih untuk dibaca.

EDIT (2015-03-15): SPARK-5307 memperkenalkan SerializationDebugger dan Spark 1.3.0 adalah versi pertama yang menggunakannya. Ini menambahkan jalur serialisasi ke NotSerializableException . Ketika NotSerializableException ditemui, debugger mengunjungi grafik objek untuk menemukan jalur menuju objek yang tidak dapat diserialisasi, dan menyusun informasi untuk membantu pengguna menemukan objek.

Dalam kasus OP, inilah yang akan dicetak ke stdout:

Serialization stack:
    - object not serializable (class: testing, value: testing@2dfe2f00)
    - field (class: testing$$anonfun$1, name: $outer, type: class testing)
    - object (class testing$$anonfun$1, <function1>)
Grega Kešpret
sumber
1
Hmm, apa yang telah Anda jelaskan jelas masuk akal, dan menjelaskan mengapa seluruh kelas mendapatkan serial (sesuatu yang saya tidak sepenuhnya mengerti). Namun demikian saya akan tetap berpendapat bahwa rdd's tidak serializable (well mereka memperpanjang Serializable, tetapi itu tidak berarti mereka tidak menyebabkan NotSerializableException, coba saja). Inilah sebabnya mengapa jika Anda menempatkan mereka di luar kelas itu memperbaiki kesalahan. Saya akan mengedit jawaban saya sedikit untuk lebih tepat tentang apa yang saya maksud - yaitu mereka menyebabkan pengecualian, bukan karena mereka memperluas antarmuka.
samthebest
35
Jika Anda tidak memiliki kontrol atas kelas, Anda perlu serializable ... jika Anda menggunakan Scala, Anda bisa instantiate dengan Serializable:val test = new Test with Serializable
Mark S
4
"rddList.map (someFunc (_)) ke rddList.map (someFunc), mereka persis sama" Tidak mereka tidak persis sama, dan pada kenyataannya menggunakan yang terakhir dapat menyebabkan pengecualian serialisasi kalau yang sebelumnya tidak.
samthebest
1
@samthebest dapatkah Anda menjelaskan mengapa map (someFunc (_)) tidak akan menyebabkan pengecualian serialisasi sedangkan map (someFunc) akan?
Alon
31

Jawaban Grega sangat bagus dalam menjelaskan mengapa kode asli tidak bekerja dan dua cara untuk memperbaiki masalah ini. Namun, solusi ini tidak terlalu fleksibel; pertimbangkan kasus di mana penutupan Anda mencakup pemanggilan metode pada non- Serializablekelas yang tidak dapat Anda kendalikan. Anda tidak bisa menambahkan Serializabletag ke kelas ini atau mengubah implementasi yang mendasarinya untuk mengubah metode menjadi fungsi.

Nilesh menyajikan solusi yang bagus untuk ini, tetapi solusinya dapat dibuat lebih ringkas dan umum:

def genMapper[A, B](f: A => B): A => B = {
  val locker = com.twitter.chill.MeatLocker(f)
  x => locker.get.apply(x)
}

Fungsi-serializer ini kemudian dapat digunakan untuk secara otomatis membungkus penutupan dan panggilan metode:

rdd map genMapper(someFunc)

Teknik ini juga memiliki manfaat karena tidak memerlukan dependensi Hiu tambahan untuk mengakses KryoSerializationWrapper, karena Chill Twitter sudah ditarik oleh inti Spark

Ben Sidhom
sumber
Hai, saya bertanya-tanya apakah saya perlu mendaftarkan sesuatu jika saya menggunakan kode Anda? Saya mencoba dan mendapatkan pengecualian kelas Tidak dapat menemukan dari kryo. THX
G_cy
25

Pembicaraan lengkap sepenuhnya menjelaskan masalah, yang mengusulkan cara pergeseran paradigma yang bagus untuk menghindari masalah serialisasi ini: https://github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory- kebocoran-no-ws.md

Jawaban pilihan teratas pada dasarnya menyarankan untuk membuang seluruh fitur bahasa - yang tidak lagi menggunakan metode dan hanya menggunakan fungsi. Memang dalam metode pemrograman fungsional di kelas harus dihindari, tetapi mengubahnya menjadi fungsi tidak menyelesaikan masalah desain di sini (lihat tautan di atas).

Sebagai perbaikan cepat dalam situasi khusus ini, Anda bisa menggunakan @transientanotasi untuk mengatakannya agar tidak mencoba membuat serialisasi nilai yang menyinggung (di sini, Spark.ctxadalah kelas khusus bukan kelas Spark yang mengikuti penamaan OP):

@transient
val rddList = Spark.ctx.parallelize(list)

Anda juga dapat menyusun ulang kode sehingga rddList tinggal di tempat lain, tetapi itu juga jahat.

Masa Depan Mungkin Spora

Di masa depan, Scala akan memasukkan hal-hal ini yang disebut "spora" yang seharusnya memungkinkan kita untuk mengontrol butir apa yang bisa dan tidak secara tepat ditarik oleh penutupan. Lebih jauh lagi, ini akan mengubah semua kesalahan karena tidak sengaja menarik jenis yang tidak dapat diserialkan (atau nilai yang tidak diinginkan) menjadi kesalahan kompilasi daripada sekarang yang merupakan pengecualian runtime yang mengerikan / kebocoran memori.

http://docs.scala-lang.org/sips/pending/spores.html

Kiat tentang serialisasi Kryo

Saat menggunakan kyro, buatlah sehingga diperlukan pendaftaran, ini berarti Anda mendapatkan kesalahan alih-alih kebocoran memori:

"Akhirnya, saya tahu bahwa kryo memiliki kryo.setRegistrationOptional (benar) tetapi saya mengalami kesulitan untuk mencari cara menggunakannya. Ketika opsi ini diaktifkan, kryo tampaknya masih melempar pengecualian jika saya belum mendaftar. kelas. "

Strategi untuk mendaftarkan kelas dengan kryo

Tentu saja ini hanya memberi Anda kontrol tipe-level, bukan kontrol level-nilai.

... lebih banyak ide yang akan datang.

samthebest
sumber
9

Saya memecahkan masalah ini menggunakan pendekatan yang berbeda. Anda hanya perlu membuat cerita bersambung benda sebelum melewati penutupan, dan menghapus cerita bersambung setelahnya. Pendekatan ini hanya berfungsi, bahkan jika kelas Anda tidak Serializable, karena menggunakan Kryo di belakang layar. Yang Anda butuhkan hanyalah kari. ;)

Berikut ini contoh bagaimana saya melakukannya:

def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
               (foo: Foo) : Bar = {
    kryoWrapper.value.apply(foo)
}
val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _
rdd.flatMap(mapper).collectAsMap()

object Blah(abc: ABC) extends (Foo => Bar) {
    def apply(foo: Foo) : Bar = { //This is the real function }
}

Jangan ragu untuk membuat Blah serumit yang Anda inginkan, kelas, objek pendamping, kelas bersarang, referensi ke beberapa lib pihak ke-3.

KryoSerializationWrapper merujuk pada: https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala

Nilesh
sumber
Apakah ini benar-benar membuat cerita bersambung instance atau membuat contoh statis dan membuat cerita bersambung referensi (lihat jawaban saya).
samthebest
2
@samthebest, bisakah Anda menguraikan? Jika Anda menyelidiki KryoSerializationWrapperAnda akan menemukan bahwa itu membuat Spark berpikir bahwa itu memang java.io.Serializable- itu hanya membuat cerita bersambung objek secara internal menggunakan Kryo - lebih cepat, lebih sederhana. Dan saya tidak berpikir itu berkaitan dengan contoh statis - itu hanya menderialisasi nilai ketika value.apply () dipanggil.
Nilesh
8

Saya menghadapi masalah serupa, dan apa yang saya pahami dari jawaban Grega adalah

object NOTworking extends App {
 new testing().doIT
}
//adding extends Serializable wont help
class testing {

val list = List(1,2,3)

val rddList = Spark.ctx.parallelize(list)

def doIT =  {
  //again calling the fucntion someFunc 
  val after = rddList.map(someFunc(_))
  //this will crash (spark lazy)
  after.collect().map(println(_))
}

def someFunc(a:Int) = a+1

}

metode doIT Anda mencoba membuat serialisasi metode SomeFunc (_) , tetapi karena metode ini tidak bisa serial, ia mencoba membuat serialisasi pengujian kelas yang lagi-lagi tidak bisa serial.

Jadi, buat kode Anda berfungsi, Anda harus mendefinisikan someFunc di dalam metode doIT . Sebagai contoh:

def doIT =  {
 def someFunc(a:Int) = a+1
  //function definition
 }
 val after = rddList.map(someFunc(_))
 after.collect().map(println(_))
}

Dan jika ada beberapa fungsi yang muncul dalam gambar, maka semua fungsi tersebut harus tersedia untuk konteks induk.

Tarang Bhalodia
sumber
7

Saya tidak sepenuhnya yakin bahwa ini berlaku untuk Scala tetapi, di Jawa, saya menyelesaikannya NotSerializableExceptiondengan refactoring kode saya sehingga penutupan tidak mengakses bidang non-serializable final.

Trebor Rude
sumber
saya menghadapi masalah yang sama di Jawa, saya mencoba menggunakan kelas FileWriter dari paket Java IO di dalam metode RDD foreach. Bisakah Anda memberi tahu saya bagaimana kami bisa menyelesaikan ini?
Shankar
1
Nah, Shankar, jika FileWriterini adalah finalbidang dari kelas luar, Anda tidak bisa melakukannya. Tetapi FileWriterdapat dibangun dari a Stringatau a File, keduanya Serializable. Jadi refactor kode Anda untuk membangun lokal FileWriterberdasarkan nama file dari kelas luar.
Trebor Rude
0

FYI di Spark 2.4 banyak dari Anda mungkin akan menghadapi masalah ini. Serialisasi Kryo menjadi lebih baik tetapi dalam banyak kasus Anda tidak dapat menggunakan spark.kryo.unsafe = true atau serializer kryo naif.

Untuk perbaikan cepat, coba ubah yang berikut dalam konfigurasi Spark Anda

spark.kryo.unsafe="false"

ATAU

spark.serializer="org.apache.spark.serializer.JavaSerializer"

Saya memodifikasi transformasi RDD kustom yang saya temui atau pribadi menulis dengan menggunakan variabel siaran eksplisit dan memanfaatkan inbuilt twitter-chill api baru, mengubah mereka dari rdd.map(row =>ke rdd.mapPartitions(partition => {fungsi.

Contoh

Jalan Tua (tidak terlalu bagus)

val sampleMap = Map("index1" -> 1234, "index2" -> 2345)
val outputRDD = rdd.map(row => {
    val value = sampleMap.get(row._1)
    value
})

Cara Alternatif (lebih baik)

import com.twitter.chill.MeatLocker
val sampleMap = Map("index1" -> 1234, "index2" -> 2345)
val brdSerSampleMap = spark.sparkContext.broadcast(MeatLocker(sampleMap))

rdd.mapPartitions(partition => {
    val deSerSampleMap = brdSerSampleMap.value.get
    partition.map(row => {
        val value = sampleMap.get(row._1)
        value
    }).toIterator
})

Cara baru ini hanya akan memanggil variabel broadcast satu kali per partisi yang lebih baik. Anda masih perlu menggunakan Serialisasi Java jika Anda tidak mendaftar kelas.

Gereja Gabe
sumber