Memperbarui
Jawaban ini masih berlaku dan informatif, meskipun hal-hal yang sekarang lebih baik karena 2.2 / 2.3, yang menambahkan dukungan built-in encoder untuk Set
, Seq
, Map
, Date
, Timestamp
, dan BigDecimal
. Jika Anda tetap membuat jenis dengan hanya kelas kasus dan jenis Scala biasa, Anda harus baik-baik saja dengan yang tersirat di SQLImplicits
.
Sayangnya, hampir tidak ada yang ditambahkan untuk membantu ini. Mencari @since 2.0.0
di Encoders.scala
atau SQLImplicits.scala
temuan hal sebagian besar hubungannya dengan tipe primitif (dan beberapa tweaking kelas kasus). Jadi, hal pertama yang ingin saya katakan: saat ini tidak ada dukungan nyata yang baik untuk pengkodekan kelas khusus . Dengan keluar dari jalan, berikut adalah beberapa trik yang melakukan pekerjaan sebaik yang bisa kita harapkan, mengingat apa yang saat ini kita miliki. Sebagai penafian dimuka: ini tidak akan bekerja dengan sempurna dan saya akan melakukan yang terbaik untuk membuat semua batasan jelas dan dimuka.
Apa sebenarnya masalahnya
Ketika Anda ingin membuat dataset, Spark "memerlukan encoder (untuk mengkonversi objek JVM tipe T ke dan dari representasi SQL Spark internal) yang umumnya dibuat secara otomatis melalui implisit dari SparkSession
, atau dapat dibuat secara eksplisit dengan memanggil metode statis pada Encoders
"(diambil dari dokumen padacreateDataset
). Encoder akan mengambil bentuk di Encoder[T]
mana T
jenis yang Anda encoding. Saran pertama adalah menambahkan import spark.implicits._
(yang memberi Anda ini encoders implisit) dan saran kedua adalah untuk secara eksplisit lulus dalam encoder implisit menggunakan ini set fungsi encoder terkait.
Tidak ada encoder yang tersedia untuk kelas reguler, jadi
import spark.implicits._
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
akan memberi Anda kesalahan waktu kompilasi terkait implisit berikut:
Tidak dapat menemukan encoder untuk tipe yang disimpan dalam Dataset. Tipe primitif (Int, String, dll) dan tipe Produk (kelas kasus) didukung dengan mengimpor sqlContext.implicits._ Dukungan untuk membuat serial jenis lain akan ditambahkan dalam rilis mendatang
Namun, jika Anda membungkus tipe apa pun yang baru saja Anda gunakan untuk mendapatkan kesalahan di atas di beberapa kelas yang meluas Product
, kesalahan tersebut tertunda hingga runtime, jadi
import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))
Mengkompilasi dengan baik, tetapi gagal saat runtime dengan
java.lang.UnsupportedOperationException: Tidak ditemukan Encoder untuk MyObj
Alasan untuk ini adalah bahwa Spark pembuat enkode dengan implisit sebenarnya hanya dibuat pada saat runtime (melalui scala relfection). Dalam kasus ini, semua pemeriksaan Spark pada waktu kompilasi adalah bahwa kelas terluar meluas Product
(yang dilakukan semua kelas kasus), dan hanya menyadari pada saat runtime bahwa ia masih tidak tahu apa yang harus dilakukan dengan MyObj
(masalah yang sama terjadi jika saya mencoba membuat a Dataset[(Int,MyObj)]
- Spark menunggu sampai runtime untuk muntah pada MyObj
). Ini adalah masalah sentral yang sangat perlu diperbaiki:
- beberapa kelas yang memperpanjang
Product
kompilasi meskipun selalu crash saat runtime dan
- tidak ada cara meneruskan encoders khusus untuk tipe bersarang (saya tidak punya cara untuk memberi makan Spark encoder untuk hanya
MyObj
sehingga ia tahu bagaimana untuk menyandikan Wrap[MyObj]
atau (Int,MyObj)
).
Gunakan saja kryo
Solusi yang disarankan semua orang adalah dengan menggunakan kryo
enkoder.
import spark.implicits._
class MyObj(val i: Int)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
Ini menjadi sangat cepat membosankan. Terutama jika kode Anda memanipulasi semua jenis kumpulan data, bergabung, mengelompokkan, dll. Anda akhirnya mengumpulkan banyak implisit tambahan. Jadi, mengapa tidak membuat implisit yang melakukan ini semua secara otomatis?
import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) =
org.apache.spark.sql.Encoders.kryo[A](ct)
Dan sekarang, sepertinya saya bisa melakukan hampir semua yang saya inginkan (contoh di bawah ini tidak akan berfungsi di spark-shell
tempat spark.implicits._
yang diimpor secara otomatis)
class MyObj(val i: Int)
val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..
val d3 = d1.map(d => (d.i, d)).alias("d3") // .. deals with the new type
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!
Atau hampir. Masalahnya adalah bahwa menggunakan kryo
lead ke Spark hanya menyimpan setiap baris dalam dataset sebagai objek biner datar. Untuk map
, filter
, foreach
yang cukup, tetapi untuk operasi seperti join
, Spark benar-benar perlu ini untuk dipisahkan ke dalam kolom. Memeriksa skema untuk d2
atau d3
, Anda melihat hanya ada satu kolom biner:
d2.printSchema
// root
// |-- value: binary (nullable = true)
Solusi parsial untuk tupel
Jadi, menggunakan keajaiban implisit dalam Scala (lebih banyak dalam 6.26.3 Resolusi Kelebihan Beban ), saya dapat membuat sendiri serangkaian implisit yang akan melakukan pekerjaan sebaik mungkin, setidaknya untuk tupel, dan akan bekerja dengan baik dengan implisit yang ada:
import org.apache.spark.sql.{Encoder,Encoders}
import scala.reflect.ClassTag
import spark.implicits._ // we can still take advantage of all the old implicits
implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)
implicit def tuple2[A1, A2](
implicit e1: Encoder[A1],
e2: Encoder[A2]
): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)
implicit def tuple3[A1, A2, A3](
implicit e1: Encoder[A1],
e2: Encoder[A2],
e3: Encoder[A3]
): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)
// ... you can keep making these
Kemudian, dengan berbekal implisit ini, saya dapat membuat contoh di atas berhasil, meskipun dengan penggantian nama kolom
class MyObj(val i: Int)
val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2")
val d3 = d1.map(d => (d.i ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")
Saya belum menemukan cara untuk mendapatkan nama tuple yang diharapkan (_1
,, _2
...) secara default tanpa mengubah nama mereka - jika orang lain ingin bermain-main dengan ini, ini adalah di mana nama tersebut "value"
diperkenalkan dan ini adalah di mana tuple nama biasanya ditambahkan. Namun, kuncinya adalah bahwa saya sekarang memiliki skema terstruktur yang bagus:
d4.printSchema
// root
// |-- _1: struct (nullable = false)
// | |-- _1: integer (nullable = true)
// | |-- _2: binary (nullable = true)
// |-- _2: struct (nullable = false)
// | |-- _1: integer (nullable = true)
// | |-- _2: binary (nullable = true)
Jadi, secara ringkas, solusi ini:
- memungkinkan kita mendapatkan kolom terpisah untuk tupel (jadi kita bisa bergabung di tupel lagi, yay!)
- kita bisa lagi hanya mengandalkan implisit (jadi tidak perlu lewat di
kryo
semua tempat)
- hampir seluruhnya kompatibel dengan
import spark.implicits._
(dengan beberapa penggantian nama terlibat)
- tidak tidak mari kita bergabung pada
kyro
kolom biner serial, apalagi di bidang-bidang mungkin memiliki
- memiliki efek samping yang tidak menyenangkan dari penggantian nama beberapa kolom tuple menjadi "nilai" (jika perlu, ini dapat dibatalkan dengan mengonversi
.toDF
, menentukan nama kolom baru, dan mengonversi kembali ke dataset - dan nama skema tampaknya dipertahankan melalui penggabungan). , di mana mereka paling dibutuhkan).
Solusi parsial untuk kelas secara umum
Yang ini kurang menyenangkan dan tidak memiliki solusi yang baik. Namun, sekarang kita memiliki solusi tuple di atas, saya punya firasat bahwa solusi konversi implisit dari jawaban lain juga akan sedikit lebih menyakitkan karena Anda dapat mengonversi kelas Anda yang lebih kompleks menjadi tupel. Kemudian, setelah membuat dataset, Anda mungkin akan mengganti nama kolom menggunakan pendekatan dataframe. Jika semuanya berjalan dengan baik, ini benar - benar - perbaikan karena saya sekarang dapat melakukan gabung di bidang kelas saya. Jika saya baru saja menggunakan satu kryo
serializer biner datar yang tidak akan mungkin terjadi.
Berikut adalah contoh yang tidak sedikit semuanya: Aku punya kelas MyObj
yang memiliki bidang jenis Int
, java.util.UUID
dan Set[String]
. Yang pertama mengurus dirinya sendiri. Yang kedua, meskipun saya bisa membuat serial menggunakan kryo
akan lebih berguna jika disimpan sebagai String
(karena UUID
s biasanya adalah sesuatu yang saya ingin bergabung melawan). Yang ketiga benar-benar hanya berada di kolom biner.
class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])
// alias for the type to convert to and from
type MyObjEncoded = (Int, String, Set[String])
// implicit conversions
implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)
implicit def fromEncoded(e: MyObjEncoded): MyObj =
new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)
Sekarang, saya bisa membuat dataset dengan skema yang bagus menggunakan mesin ini:
val d = spark.createDataset(Seq[MyObjEncoded](
new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
new MyObj(2, java.util.UUID.randomUUID, Set("bar"))
)).toDF("i","u","s").as[MyObjEncoded]
Dan skema menunjukkan saya kolom dengan nama yang tepat dan dengan dua hal pertama yang saya dapat bergabung.
d.printSchema
// root
// |-- i: integer (nullable = false)
// |-- u: string (nullable = true)
// |-- s: binary (nullable = true)
ExpressionEncoder
menggunakan serialisasi JSON? Dalam kasus saya, saya tidak bisa lolos dengan tupel, dan kryo memberi saya kolom biner ..Menggunakan encoders generik.
Ada dua encoders generik yang tersedia untuk saat ini
kryo
dan dijavaSerialization
mana yang terakhir secara eksplisit digambarkan sebagai:Dengan asumsi kelas berikut
Anda dapat menggunakan encoders ini dengan menambahkan encoder implisit:
yang dapat digunakan bersama sebagai berikut:
Ini menyimpan objek sebagai
binary
kolom sehingga ketika dikonversi keDataFrame
Anda mendapatkan skema berikut:Dimungkinkan juga untuk menyandikan tupel menggunakan
kryo
enkoder untuk bidang tertentu:Harap perhatikan bahwa kami tidak bergantung pada enkode tersirat di sini, tetapi lulus enkode secara eksplisit sehingga kemungkinan besar ini tidak akan berfungsi dengan
toDS
metode.Menggunakan konversi tersirat:
Berikan konversi implisit antara representasi yang dapat disandikan dan kelas khusus, misalnya:
Pertanyaan-pertanyaan Terkait:
sumber
Set
) saya dapatkanException in thread "main" java.lang.UnsupportedOperationException: No Encoder found for Set[Bar]
.kryo[Set[Bar]]
. Cara yang sama jika kelas berisi bidang yangBar
Anda butuhkan encoder untuk seluruh objek. Ini adalah metode yang sangat kasar.Bar
Anda perlu encoder untuk seluruh objek". pertanyaan saya adalah bagaimana menyandikan "seluruh proyek" ini?Anda dapat menggunakan UDTRegistration dan kemudian Kelas Kasus, Tuple, dll ... semua berfungsi dengan benar dengan Tipe Buatan Pengguna Anda!
Katakanlah Anda ingin menggunakan Enum khusus:
Daftarkan seperti ini:
Lalu GUNAKAN!
Katakanlah Anda ingin menggunakan Catatan Polimorfik:
... dan gunakan seperti ini:
Anda dapat menulis UDT khusus yang mengkodekan segalanya ke byte (Saya menggunakan serialisasi java di sini tapi mungkin lebih baik untuk instrumen konteks Kryo Spark).
Pertama-tama tentukan kelas UDT:
Kemudian daftarkan:
Maka Anda bisa menggunakannya!
sumber
Encoder bekerja kurang lebih sama di
Spark2.0
. DanKryo
masih merupakanserialization
pilihan yang disarankan .Anda dapat melihat contoh berikut dengan cangkang
Sampai sekarang] tidak ada
appropriate encoders
dalam ruang lingkup saat ini sehingga orang-orang kami tidak dikodekan sebagaibinary
nilai. Tapi itu akan berubah setelah kami menyediakan beberapaimplicit
encoders menggunakanKryo
serialisasi.sumber
Dalam kasus kelas Java Bean, ini bisa bermanfaat
Sekarang Anda cukup membaca dataFrame sebagai DataFrame kustom
Ini akan membuat encoder kelas khusus dan bukan yang biner.
sumber
Contoh saya akan di Jawa, tapi saya tidak membayangkan itu sulit beradaptasi dengan Scala.
Saya telah cukup berhasil mengkonversi
RDD<Fruit>
keDataset<Fruit>
menggunakan spark.createDataset dan Encoders.bean selamaFruit
adalah sederhana Java Bean .Langkah 1: Buat Java Bean sederhana.
Saya akan tetap berpegang pada kelas dengan tipe primitif dan String sebagai bidang sebelum orang-orang DataBricks menambah Encoders mereka. Jika Anda memiliki kelas dengan objek bersarang, buat Java Bean lain sederhana dengan semua bidangnya diratakan, sehingga Anda dapat menggunakan transformasi RDD untuk memetakan tipe kompleks ke yang lebih sederhana. Tentu ini sedikit kerja ekstra, tapi saya membayangkan itu akan banyak membantu kinerja bekerja dengan skema datar.
Langkah 2: Dapatkan Dataset Anda dari RDD
Dan voila! Busa, bilas, ulangi.
sumber
Bagi mereka yang mungkin dalam situasi saya, saya juga meletakkan jawaban saya di sini.
Untuk lebih spesifik,
Saya sedang membaca 'Set data yang diketik' dari SQLContext. Jadi format data asli adalah DataFrame.
val sample = spark.sqlContext.sql("select 1 as a, collect_set(1) as b limit 1") sample.show()
+---+---+ | a| b| +---+---+ | 1|[1]| +---+---+
Kemudian konversikan ke RDD menggunakan rdd.map () dengan tipe mutable.WrappedArray.
sample .rdd.map(r => (r.getInt(0), r.getAs[mutable.WrappedArray[Int]](1).toSet)) .collect() .foreach(println)
Hasil:
(1,Set(1))
sumber
Selain saran yang sudah diberikan, opsi lain yang baru-baru ini saya temukan adalah bahwa Anda dapat mendeklarasikan kelas khusus Anda termasuk sifat tersebut
org.apache.spark.sql.catalyst.DefinedByConstructorParams
.Ini berfungsi jika kelas memiliki konstruktor yang menggunakan jenis ExpressionEncoder dapat mengerti, yaitu nilai-nilai primitif dan koleksi standar. Ini bisa berguna ketika Anda tidak dapat mendeklarasikan kelas sebagai kelas kasus, tetapi tidak ingin menggunakan Kryo untuk menyandikannya setiap kali itu termasuk dalam Dataset.
Sebagai contoh, saya ingin mendeklarasikan kelas kasus yang menyertakan vektor Breeze. Satu-satunya pembuat kode yang dapat menangani itu adalah Kryo. Tetapi jika saya mendeklarasikan subclass yang memperpanjang Breeze DenseVector dan DefinedByConstructorParams, ExpressionEncoder mengerti bahwa itu bisa diserialisasi sebagai array Doubles.
Begini cara saya menyatakannya:
Sekarang saya dapat menggunakan
SerializableDenseVector
Dataset (secara langsung, atau sebagai bagian dari Produk) menggunakan ExpressionEncoder sederhana dan tanpa Kryo. Ia bekerja seperti Breeze DenseVector tetapi membuat serial sebagai Array [Double].sumber