Apache Spark: dampak partisi ulang, penyortiran, dan caching pada suatu join

10

Saya menjelajahi perilaku Spark ketika bergabung dengan tabel untuk dirinya sendiri. Saya menggunakan Databricks.

Skenario boneka saya adalah:

  1. Baca tabel eksternal sebagai dataframe A (file yang mendasarinya dalam format delta)

  2. Definisikan dataframe B sebagai dataframe A dengan hanya kolom tertentu yang dipilih

  3. Bergabunglah dengan dataframe A dan B pada kolom1 dan kolom2

(Ya, itu tidak masuk akal, aku hanya bereksperimen untuk memahami mekanika dasar Spark)

a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))

b = a.select("column1", "column2", "columnA")

c= a.join(b, how="left", on = ["column1", "column2"])

Upaya pertama saya adalah menjalankan kode apa adanya (percobaan 1). Saya kemudian mencoba melakukan partisi ulang dan cache (percobaan 2)

a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).cache()

Akhirnya, saya partisi ulang, disortir, dan di-cache

 a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).sortWithinPartitions(col("column1"), col("column2")).cache()

Dags masing-masing yang dihasilkan adalah sebagaimana terlampir.

Pertanyaan saya adalah:

  1. Mengapa dalam upaya 1 tabel tampaknya di-cache meskipun caching belum secara eksplisit ditentukan.

  2. Mengapa InMemoreTableScan selalu diikuti oleh simpul lain dari jenis ini.

  3. Mengapa dalam percobaan 3 caching tampaknya terjadi pada dua tahap?

  4. Mengapa dalam upaya 3 WholeStageCodegen mengikuti satu (dan hanya satu) InMemoreTableScan.

usaha 1

usaha 2

masukkan deskripsi gambar di sini

Dawid
sumber
Saya menduga bahwa pembaca DataFrame cache data secara otomatis ketika sumbernya adalah tabel eksternal. Saya memiliki situasi yang sama di mana saya membaca data dari tabel database, sementara dapat diunduh tab "SQL" pada 'Aplikasi detail UI' menunjukkan kepada saya jumlah baris yang diunduh tetapi belum ada file yang disimpan di lokasi yang ditentukan . Saya kira itu tahu hitungan karena memiliki cache data di suatu tempat dan itulah yang muncul di DAG. Jika Anda membaca data dari file teks secara lokal maka Anda tidak akan melihat status cache.
Salim

Jawaban:

4

Apa yang Anda amati dalam 3 paket ini adalah campuran runtime dan Spark DataBricks.

Pertama-tama, saat menjalankan runtime DataBricks 3.3+, caching secara otomatis diaktifkan untuk semua file parket. Konfigurasi yang sesuai untuk itu: spark.databricks.io.cache.enabled true

Untuk kueri kedua Anda, InMemoryTableScan terjadi dua kali karena saat gabung dipanggil, percikan mencoba menghitung Dataset A dan Dataset B secara paralel. Dengan asumsi pelaksana yang berbeda ditugaskan tugas-tugas di atas, keduanya harus memindai tabel dari cache (DataBricks).

Untuk yang ketiga, InMemoryTableScan tidak merujuk pada caching itu sendiri. Ini hanya berarti bahwa apapun rencana katalis yang dibentuk melibatkan pemindaian tabel cache beberapa kali.

PS: Saya tidak bisa memvisualisasikan poin 4 :)

Ashvjit Singh
sumber