Spark: UDF dieksekusi berkali-kali

9

Saya memiliki kerangka data dengan kode berikut:

def test(lat: Double, lon: Double) = {
  println(s"testing ${lat / lon}")
  Map("one" -> "one", "two" -> "two")
}

val testUDF = udf(test _)

df.withColumn("test", testUDF(col("lat"), col("lon")))
  .withColumn("test1", col("test.one"))
  .withColumn("test2", col("test.two"))

Sekarang memeriksa log, saya menemukan bahwa untuk setiap baris UDF dieksekusi 3 kali. Jika saya menambahkan "test3" dari kolom "test.three" maka UDF dieksekusi sekali lagi.

Adakah yang bisa menjelaskan mengapa?

Bisakah ini dihindari dengan benar (tanpa caching kerangka data setelah "tes" ditambahkan, bahkan jika ini berhasil)?

Rolintocour
sumber
Apa maksudmu? Anda memanggil fungsi tes tiga kali. Itu sebabnya dieksekusi tiga kali. Tidak yakin mengapa Anda membuatnya menjadi UDF. Mengapa tidak hanya membuat Peta sebagai val?
user4601931
Ini hanya contoh untuk menunjukkan perilaku percikan. Bagi saya "test" adalah kolom baru yang berisi struktur, lalu mengakses bagian mana pun dari struktur tidak boleh mengeksekusi UDF lagi. Bagaimana saya salah?
Rolintocour
Saya mencoba untuk mencetak skema, tipe data dari "tes" adalah Mapdan bukan Struct. Sekarang alih-alih mengembalikan Peta, jika UDF mengembalikan kelas kasus seperti Uji (satu String, dua: String) maka testmemang merupakan Struct tetapi selalu ada banyak eksekusi dari UDF.
Rolintocour
caching harus bekerja sesuai dengan jawaban ini: stackoverflow.com/a/40962714/1138523
Raphael Roth

Jawaban:

5

Jika Anda ingin menghindari beberapa panggilan ke udf (yang berguna terutama jika udf adalah hambatan dalam pekerjaan Anda), Anda dapat melakukannya sebagai berikut:

val testUDF = udf(test _).asNondeterministic()

Pada dasarnya Anda memberi tahu Spark bahwa fungsi Anda tidak deterministik dan sekarang Spark memastikan itu dipanggil hanya sekali karena tidak aman untuk memanggilnya berkali-kali (setiap panggilan mungkin dapat mengembalikan hasil yang berbeda).

Perlu diketahui juga bahwa trik ini tidak gratis, dengan melakukan ini Anda meletakkan beberapa kendala pada pengoptimal, satu efek samping dari ini adalah misalnya pengoptimal Spark tidak mendorong filter melalui ekspresi yang tidak deterministik sehingga Anda menjadi bertanggung jawab untuk optimal posisi filter dalam permintaan Anda.

David Vrba
sumber
bagus! jawaban ini juga ada di sini: stackoverflow.com/questions/40320563/…
Raphael Roth
Dalam kasus saya, asNondeterministickekuatan UDF untuk mengeksekusi hanya sekali. Dengan explode(array(myUdf($"id")))solusinya, masih bisa dieksekusi dua kali.
Rolintocour