Saya memiliki kerangka data dengan kode berikut:
def test(lat: Double, lon: Double) = {
println(s"testing ${lat / lon}")
Map("one" -> "one", "two" -> "two")
}
val testUDF = udf(test _)
df.withColumn("test", testUDF(col("lat"), col("lon")))
.withColumn("test1", col("test.one"))
.withColumn("test2", col("test.two"))
Sekarang memeriksa log, saya menemukan bahwa untuk setiap baris UDF dieksekusi 3 kali. Jika saya menambahkan "test3" dari kolom "test.three" maka UDF dieksekusi sekali lagi.
Adakah yang bisa menjelaskan mengapa?
Bisakah ini dihindari dengan benar (tanpa caching kerangka data setelah "tes" ditambahkan, bahkan jika ini berhasil)?
scala
apache-spark
apache-spark-sql
Rolintocour
sumber
sumber
Map
dan bukan Struct. Sekarang alih-alih mengembalikan Peta, jika UDF mengembalikan kelas kasus seperti Uji (satu String, dua: String) makatest
memang merupakan Struct tetapi selalu ada banyak eksekusi dari UDF.Jawaban:
Jika Anda ingin menghindari beberapa panggilan ke udf (yang berguna terutama jika udf adalah hambatan dalam pekerjaan Anda), Anda dapat melakukannya sebagai berikut:
Pada dasarnya Anda memberi tahu Spark bahwa fungsi Anda tidak deterministik dan sekarang Spark memastikan itu dipanggil hanya sekali karena tidak aman untuk memanggilnya berkali-kali (setiap panggilan mungkin dapat mengembalikan hasil yang berbeda).
Perlu diketahui juga bahwa trik ini tidak gratis, dengan melakukan ini Anda meletakkan beberapa kendala pada pengoptimal, satu efek samping dari ini adalah misalnya pengoptimal Spark tidak mendorong filter melalui ekspresi yang tidak deterministik sehingga Anda menjadi bertanggung jawab untuk optimal posisi filter dalam permintaan Anda.
sumber
asNondeterministic
kekuatan UDF untuk mengeksekusi hanya sekali. Denganexplode(array(myUdf($"id")))
solusinya, masih bisa dieksekusi dua kali.