Spark: Mengapa Python secara signifikan mengungguli Scala dalam kasus penggunaan saya?

16

Untuk membandingkan kinerja Spark ketika menggunakan Python dan Scala saya menciptakan pekerjaan yang sama di kedua bahasa dan membandingkan runtime. Saya berharap kedua pekerjaan akan memakan waktu yang kira-kira sama, tetapi pekerjaan Python hanya memakan waktu 27min, sementara pekerjaan Scala memakan waktu 37min(hampir 40% lebih lama!). Saya menerapkan pekerjaan yang sama di Jawa juga dan butuh 37minutesjuga. Bagaimana mungkin Python jauh lebih cepat?

Contoh minimal yang dapat diverifikasi:

Pekerjaan python:

# Configuration
conf = pyspark.SparkConf()
conf.set("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
conf.set("spark.executor.instances", "4")
conf.set("spark.executor.cores", "8")
sc = pyspark.SparkContext(conf=conf)

# 960 Files from a public dataset in 2 batches
input_files = "s3a://commoncrawl/crawl-data/CC-MAIN-2019-35/segments/1566027312025.20/warc/CC-MAIN-20190817203056-20190817225056-00[0-5]*"
input_files2 = "s3a://commoncrawl/crawl-data/CC-MAIN-2019-35/segments/1566027312128.3/warc/CC-MAIN-20190817102624-20190817124624-00[0-3]*"

# Count occurances of a certain string
logData = sc.textFile(input_files)
logData2 = sc.textFile(input_files2)
a = logData.filter(lambda value: value.startswith('WARC-Type: response')).count()
b = logData2.filter(lambda value: value.startswith('WARC-Type: response')).count()

print(a, b)

Pekerjaan scala:

// Configuration
config.set("spark.executor.instances", "4")
config.set("spark.executor.cores", "8")
val sc = new SparkContext(config)
sc.setLogLevel("WARN")
sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")

// 960 Files from a public dataset in 2 batches 
val input_files = "s3a://commoncrawl/crawl-data/CC-MAIN-2019-35/segments/1566027312025.20/warc/CC-MAIN-20190817203056-20190817225056-00[0-5]*"
val input_files2 = "s3a://commoncrawl/crawl-data/CC-MAIN-2019-35/segments/1566027312128.3/warc/CC-MAIN-20190817102624-20190817124624-00[0-3]*"

// Count occurances of a certain string
val logData1 = sc.textFile(input_files)
val logData2 = sc.textFile(input_files2)
val num1 = logData1.filter(line => line.startsWith("WARC-Type: response")).count()
val num2 = logData2.filter(line => line.startsWith("WARC-Type: response")).count()

println(s"Lines with a: $num1, Lines with b: $num2")

Hanya dengan melihat kodenya, mereka tampak identik. Saya melihat DAG dan mereka tidak memberikan wawasan apa pun (atau setidaknya saya kurang tahu cara membuat penjelasan berdasarkan pada mereka).

Saya akan sangat menghargai petunjuk apa pun.

maestromusica
sumber
Komentar bukan untuk diskusi panjang; percakapan ini telah dipindahkan ke obrolan .
Samuel Liew
1
Saya akan memulai analisis, sebelum bertanya apa pun, dengan mengatur waktu blok dan pernyataan yang sesuai untuk melihat apakah ada tempat tertentu di mana versi python lebih cepat. Maka Anda mungkin bisa mempertajam pertanyaan menjadi 'mengapa pernyataan python ini lebih cepat'.
Terry Jan Reedy

Jawaban:

11

Asumsi dasar Anda, bahwa Scala atau Java harus lebih cepat untuk tugas khusus ini, tidak tepat. Anda dapat dengan mudah memverifikasinya dengan aplikasi lokal minimal. Scala satu:

import scala.io.Source
import java.time.{Duration, Instant}

object App {
  def main(args: Array[String]) {
    val Array(filename, string) = args

    val start = Instant.now()

    Source
      .fromFile(filename)
      .getLines
      .filter(line => line.startsWith(string))
      .length

    val stop = Instant.now()
    val duration = Duration.between(start, stop).toMillis
    println(s"${start},${stop},${duration}")
  }
}

Python satu

import datetime
import sys

if __name__ == "__main__":
    _, filename, string = sys.argv
    start = datetime.datetime.now()
    with open(filename) as fr:
        # Not idiomatic or the most efficient but that's what
        # PySpark will use
        sum(1 for _ in filter(lambda line: line.startswith(string), fr))

    end = datetime.datetime.now()
    duration = round((end - start).total_seconds() * 1000)
    print(f"{start},{end},{duration}")

Hasil (masing-masing 300 repetisi, Python 3.7.6, Scala 2.11.12), Posts.xmldari hermeneutics.stackexchange.com dump data dengan campuran pola yang cocok dan tidak cocok:

boxplots durartion dalam milid untuk program di atas

  • Python 273.50 (258.84, 288.16)
  • Scala 634.13 (533.81, 734.45)

Seperti yang Anda lihat Python tidak hanya lebih cepat secara sistematis, tetapi juga lebih konsisten (spread lebih rendah).

Pesan yang diambil adalah - jangan percaya FUD tidak berdasar - bahasa dapat lebih cepat atau lebih lambat pada tugas tertentu atau dengan lingkungan tertentu (misalnya di sini Scala dapat terkena oleh startup JVM dan / atau GC dan / atau JIT), tetapi jika Anda mengklaim seperti "XYZ adalah X4 lebih cepat" atau "XYZ lambat dibandingkan dengan ZYX (..) Kira-kira, 10x lebih lambat" itu biasanya berarti seseorang menulis kode yang sangat buruk untuk menguji sesuatu.

Edit :

Untuk mengatasi beberapa masalah yang muncul dalam komentar:

  • Dalam kode OP data sebagian besar dilewatkan dalam satu arah (JVM -> Python) dan tidak diperlukan serialisasi nyata (jalur spesifik ini hanya melewati bytestring apa adanya dan mendekode pada UTF-8 di sisi lain). Itu semurah yang didapat ketika datang ke "serialisasi".
  • Apa yang dikembalikan hanyalah sebuah integer tunggal oleh partisi, sehingga dampak ke arah itu dapat diabaikan.
  • Komunikasi dilakukan melalui soket lokal (semua komunikasi pada pekerja di luar koneksi awal dan auth dilakukan menggunakan deskriptor file yang dikembalikan dari local_connect_and_auth, dan tidak lain adalah file terkait soket ). Sekali lagi, semurah yang didapat ketika sampai pada komunikasi antar proses.
  • Mengingat perbedaan dalam kinerja mentah yang ditunjukkan di atas (jauh lebih tinggi dari apa yang Anda lihat dalam program Anda), ada banyak margin untuk overhead yang tercantum di atas.
  • Kasing ini benar-benar berbeda dari kasing di mana benda sederhana atau kompleks harus diteruskan ke dan dari juru bahasa Python dalam bentuk yang dapat diakses oleh kedua pihak sebagai dump yang kompatibel dengan acar (contoh paling terkenal termasuk UDF gaya lama, beberapa bagian dari yang lama). -gaya MLLib).

Edit 2 :

Karena jasper-m prihatin dengan biaya startup di sini, orang dapat dengan mudah membuktikan bahwa Python masih memiliki keunggulan signifikan atas Scala bahkan jika ukuran input meningkat secara signifikan.

Berikut adalah hasil untuk 2003360 baris / 5.6G (input yang sama, hanya digandakan beberapa kali, 30 pengulangan), yang mana cara melebihi apa pun yang dapat Anda harapkan dalam satu tugas Spark.

masukkan deskripsi gambar di sini

  • Python 22809.57 (21466.26, 24152.87)
  • Scala 27315.28 (24367.24, 30263.31)

Harap perhatikan interval kepercayaan yang tidak tumpang tindih.

Edit 3 :

Untuk menanggapi komentar lain dari Jasper-M:

Sebagian besar dari semua pemrosesan masih terjadi di dalam JVM dalam kasus Spark.

Itu tidak benar dalam kasus khusus ini:

  • Pekerjaan yang dimaksud adalah pekerjaan peta dengan pengurangan global tunggal menggunakan RDD PySpark.
  • PySpark RDD (tidak seperti katakanlah DataFrame) mengimplementasikan fungsi kotor secara asli di Python, dengan pengecualian input, output, dan komunikasi antar-node.
  • Karena ini adalah pekerjaan satu tahap, dan hasil akhir cukup kecil untuk diabaikan, tanggung jawab utama JVM (jika seseorang ingin melakukan nitpick, ini diimplementasikan sebagian besar di Jawa bukan Scala) adalah untuk memanggil format input Hadoop, dan mendorong data melalui soket file ke Python.
  • Bagian baca identik untuk JVM dan Python API, sehingga dapat dianggap sebagai overhead konstan. Itu juga tidak memenuhi syarat sebagai bagian terbesar dari pemrosesan , bahkan untuk pekerjaan sederhana seperti ini.
pengguna10938362
sumber
3
pendekatan masalah yang sangat baik. Terima kasih telah berbagi ini
Alexandros Biratsis
1
@egordoe Alexandros mengatakan "tidak ada UDF dipanggil di sini" bukan "Python tidak dipanggil" - yang membuat semua perbedaan. Serialisasi overhead adalah penting di mana data dipertukarkan antara sistem (yaitu ketika Anda ingin meneruskan data ke UDF dan kembali).
user10938362
1
@ primordoe Anda jelas membingungkan dua hal - overhead serialisasi, yang merupakan masalah di mana benda-benda non-sepele dilewatkan bolak-balik. Dan overhead komunikasi. Ada sedikit atau tidak ada serialisasi overhead di sini, karena Anda hanya melewatkan dan mendekode bytestrings, dan itu terjadi sebagian besar pada arah, karena kembali Anda mendapatkan bilangan bulat tunggal per partisi. Komunikasi menjadi perhatian, tetapi mengirimkan data melalui soket lokal adalah efisien karena benar-benar didapat ketika menyangkut komunikasi antar-proses. Jika itu tidak jelas saya sarankan membaca sumbernya - tidak sulit dan akan mencerahkan.
user10938362
1
Selain itu metode serialisasi tidak dibuat sama. Karena Spark case menunjukkan metode serialisasi yang baik dapat memotong biaya ke tingkat di mana tidak lagi menjadi masalah (lihat Pandas UDF dengan Panah) dan ketika itu terjadi, faktor-faktor lain dapat mendominasi (lihat misalnya perbandingan kinerja antara fungsi jendela Scala dan yang setara dengan Pandas UDFs - Python menang dengan margin yang jauh lebih tinggi di sana, daripada di pertanyaan ini).
user10938362
1
Dan maksud Anda adalah @ Jasper-M? Tugas Spark individu biasanya cukup kecil untuk memiliki beban kerja yang sebanding dengan ini. Jangan anggap saya salah, tetapi jika Anda memiliki contoh tandingan aktual yang membatalkan ini atau seluruh pertanyaan, silakan kirim. Saya sudah mencatat bahwa tindakan sekunder berkontribusi sampai batas tertentu pada nilai ini, tetapi mereka tidak mendominasi biaya. Kita semua insinyur (semacam) di sini - mari kita bicara angka dan kode, bukan kepercayaan, bukan?
user10938362
4

Pekerjaan Scala memakan waktu lebih lama karena memiliki kesalahan konfigurasi dan, oleh karena itu, pekerjaan Python dan Scala telah disediakan dengan sumber daya yang tidak setara.

Ada dua kesalahan dalam kode:

val sc = new SparkContext(config) // LINE #1
sc.setLogLevel("WARN")
sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
sc.hadoopConfiguration.set("spark.executor.instances", "4") // LINE #4
sc.hadoopConfiguration.set("spark.executor.cores", "8") // LINE #5
  1. LINE 1. Setelah saluran dieksekusi, konfigurasi sumber daya dari pekerjaan Spark sudah ditetapkan dan diperbaiki. Mulai saat ini, tidak ada cara untuk menyesuaikan apa pun. Baik jumlah pelaksana maupun jumlah inti per pelaksana.
  2. GARIS 4-5. sc.hadoopConfigurationadalah tempat yang salah untuk mengatur konfigurasi Spark. Itu harus diatur dalam configcontoh Anda lolos new SparkContext(config).

[TAMBAH] Mengingat hal di atas, saya akan mengusulkan untuk mengubah kode pekerjaan Scala menjadi

config.set("spark.executor.instances", "4")
config.set("spark.executor.cores", "8")
val sc = new SparkContext(config) // LINE #1
sc.setLogLevel("WARN")
sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")

dan coba lagi. Saya yakin versi Scala akan menjadi X kali lebih cepat sekarang.

egordoe
sumber
Saya memverifikasi bahwa kedua pekerjaan menjalankan 32 tugas secara paralel sehingga saya tidak berpikir inilah penyebabnya?
maestromusica
terima kasih untuk hasil editnya, akan mencoba mengujinya sekarang
maestromusica
hi @maestromusica pasti ada sesuatu dalam konfigurasi sumber daya karena, secara intrinsik, Python mungkin tidak mengungguli Scala dalam kasus penggunaan khusus ini. Alasan lain mungkin beberapa faktor acak yang tidak berkorelasi, yaitu beban cluster pada saat tertentu dan serupa. Btw, mode apa yang Anda gunakan? mandiri, lokal, benang?
egordoe
Ya, saya telah memverifikasi bahwa jawaban ini salah. Runtime-nya sama. Saya juga mencetak konfigurasi dalam kedua kasus dan itu identik.
maestromusica
1
Saya pikir Anda mungkin benar. Saya menanyakan pertanyaan ini untuk menyelidiki semua kemungkinan lain seperti kesalahan dalam kode atau mungkin saya salah mengerti sesuatu. Terima kasih atas masukan Anda.
maestromusica