Saya menjelajahi perilaku Spark ketika bergabung dengan tabel untuk dirinya sendiri. Saya menggunakan Databricks.
Skenario boneka saya adalah:
Baca tabel eksternal sebagai dataframe A (file yang mendasarinya dalam format delta)
Definisikan dataframe B sebagai dataframe A dengan hanya kolom tertentu yang dipilih
Bergabunglah dengan dataframe A dan B pada kolom1 dan kolom2
(Ya, itu tidak masuk akal, aku hanya bereksperimen untuk memahami mekanika dasar Spark)
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
b = a.select("column1", "column2", "columnA")
c= a.join(b, how="left", on = ["column1", "column2"])
Upaya pertama saya adalah menjalankan kode apa adanya (percobaan 1). Saya kemudian mencoba melakukan partisi ulang dan cache (percobaan 2)
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).cache()
Akhirnya, saya partisi ulang, disortir, dan di-cache
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).sortWithinPartitions(col("column1"), col("column2")).cache()
Dags masing-masing yang dihasilkan adalah sebagaimana terlampir.
Pertanyaan saya adalah:
Mengapa dalam upaya 1 tabel tampaknya di-cache meskipun caching belum secara eksplisit ditentukan.
Mengapa InMemoreTableScan selalu diikuti oleh simpul lain dari jenis ini.
Mengapa dalam percobaan 3 caching tampaknya terjadi pada dua tahap?
Mengapa dalam upaya 3 WholeStageCodegen mengikuti satu (dan hanya satu) InMemoreTableScan.
Jawaban:
Apa yang Anda amati dalam 3 paket ini adalah campuran runtime dan Spark DataBricks.
Pertama-tama, saat menjalankan runtime DataBricks 3.3+, caching secara otomatis diaktifkan untuk semua file parket. Konfigurasi yang sesuai untuk itu:
spark.databricks.io.cache.enabled true
Untuk kueri kedua Anda, InMemoryTableScan terjadi dua kali karena saat gabung dipanggil, percikan mencoba menghitung Dataset A dan Dataset B secara paralel. Dengan asumsi pelaksana yang berbeda ditugaskan tugas-tugas di atas, keduanya harus memindai tabel dari cache (DataBricks).
Untuk yang ketiga, InMemoryTableScan tidak merujuk pada caching itu sendiri. Ini hanya berarti bahwa apapun rencana katalis yang dibentuk melibatkan pemindaian tabel cache beberapa kali.
PS: Saya tidak bisa memvisualisasikan poin 4 :)
sumber