Kinerja Spark untuk Scala vs Python

178

Saya lebih suka Python daripada Scala. Tetapi, karena Spark secara asli ditulis dalam Scala, saya mengharapkan kode saya berjalan lebih cepat di Scala daripada versi Python karena alasan yang jelas.

Dengan asumsi itu, saya berpikir untuk belajar & menulis versi Scala dari beberapa kode preprocessing yang sangat umum untuk sekitar 1 GB data. Data diambil dari kompetisi SpringLeaf di Kaggle . Hanya untuk memberikan ikhtisar data (berisi 1936 dimensi dan 145.232 baris). Data terdiri dari berbagai jenis misalnya int, float, string, boolean. Saya menggunakan 6 core dari 8 untuk pemrosesan Spark; itu sebabnya saya menggunakan minPartitions=6sehingga setiap inti memiliki sesuatu untuk diproses.

Kode Scala

val input = sc.textFile("train.csv", minPartitions=6)

val input2 = input.mapPartitionsWithIndex { (idx, iter) => 
  if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"

def separateCols(line: String): Array[String] = {
  val line2 = line.replaceAll("true", "1")
  val line3 = line2.replaceAll("false", "0")
  val vals: Array[String] = line3.split(",")

  for((x,i) <- vals.view.zipWithIndex) {
    vals(i) = "VAR_%04d".format(i) + delim1 + x
  }
  vals
}

val input3 = input2.flatMap(separateCols)

def toKeyVal(line: String): (String, String) = {
  val vals = line.split(delim1)
  (vals(0), vals(1))
}

val input4 = input3.map(toKeyVal)

def valsConcat(val1: String, val2: String): String = {
  val1 + "," + val2
}

val input5 = input4.reduceByKey(valsConcat)

input5.saveAsTextFile("output")

Kode Python

input = sc.textFile('train.csv', minPartitions=6)
DELIM_1 = '\001'


def drop_first_line(index, itr):
  if index == 0:
    return iter(list(itr)[1:])
  else:
    return itr

input2 = input.mapPartitionsWithIndex(drop_first_line)

def separate_cols(line):
  line = line.replace('true', '1').replace('false', '0')
  vals = line.split(',')
  vals2 = ['VAR_%04d%s%s' %(e, DELIM_1, val.strip('\"'))
           for e, val in enumerate(vals)]
  return vals2


input3 = input2.flatMap(separate_cols)

def to_key_val(kv):
  key, val = kv.split(DELIM_1)
  return (key, val)
input4 = input3.map(to_key_val)

def vals_concat(v1, v2):
  return v1 + ',' + v2

input5 = input4.reduceByKey(vals_concat)
input5.saveAsTextFile('output')

Tahap Kinerja Scala 0 (38 menit), Tahap 1 (18 detik) masukkan deskripsi gambar di sini

Tahap Kinerja Python 0 (11 menit), Tahap 1 (7 detik) masukkan deskripsi gambar di sini

Keduanya menghasilkan grafik visualisasi DAG yang berbeda (karena kedua gambar menunjukkan fungsi tahap 0 yang berbeda untuk Scala ( map) dan Python ( reduceByKey))

Tapi, pada dasarnya kedua kode mencoba mengubah data menjadi (dimensional_id, string of list of values) RDD dan simpan ke disk. Output akan digunakan untuk menghitung berbagai statistik untuk setiap dimensi.

Dari segi kinerja, kode Scala untuk data nyata seperti ini tampaknya berjalan 4 kali lebih lambat daripada versi Python. Kabar baik bagi saya adalah memberi saya motivasi yang baik untuk tetap bersama Python. Berita buruknya adalah saya tidak begitu mengerti mengapa?

Mrityunjay
sumber
8
Mungkin ini tergantung kode dan aplikasi saat saya mendapatkan hasil yang lain, bahwa apache spark python lebih lambat daripada scala, ketika menjumlahkan satu miliar syarat rumus Leibniz untuk π
Paul
3
Pertanyaan menarik! Btw, lihat juga di sini: emptypipes.org/2015/01/17/python-vs-scala-vs-spark Semakin banyak core yang Anda miliki, semakin sedikit Anda dapat melihat perbedaan antar bahasa.
Markon
Sudahkah Anda mempertimbangkan untuk menerima jawaban yang ada?
10465355 kata Reinstate Monica

Jawaban:

357

Jawaban asli yang membahas kode dapat ditemukan di bawah.


Pertama-tama, Anda harus membedakan antara berbagai jenis API, masing-masing dengan pertimbangan kinerjanya sendiri.

API RDD

(Struktur Python murni dengan orkestrasi berbasis JVM)

Ini adalah komponen yang akan paling dipengaruhi oleh kinerja kode Python dan detail implementasi PySpark. Sementara kinerja Python agak tidak mungkin menjadi masalah, ada setidaknya beberapa faktor yang harus Anda pertimbangkan:

  • Overhead komunikasi JVM. Secara praktis semua data yang datang ke dan dari Python executor harus melalui soket dan pekerja JVM. Meskipun ini adalah komunikasi lokal yang relatif efisien, masih belum gratis.
  • Pelaksana berbasis proses (Python) versus pelaksana berbasis benang (JVM multipel tunggal) (Scala). Setiap pelaksana Python berjalan dalam prosesnya sendiri. Sebagai efek samping, ia memberikan isolasi yang lebih kuat daripada JVM dan kontrol atas siklus hidup eksekutor tetapi berpotensi penggunaan memori yang jauh lebih tinggi:

    • jejak memori interpreter
    • tapak pustaka yang dimuat
    • penyiaran yang kurang efisien (setiap proses membutuhkan salinan siarannya sendiri)
  • Performa kode Python itu sendiri. Scala secara umum lebih cepat dari Python tetapi akan bervariasi pada tugas ke tugas. Selain itu Anda memiliki banyak pilihan termasuk JIT seperti Numba , ekstensi C ( Cython ) atau perpustakaan khusus seperti Theano . Terakhir, jika Anda tidak menggunakan ML / MLlib (atau hanya tumpukan NumPy) , pertimbangkan untuk menggunakan PyPy sebagai penerjemah alternatif. Lihat SPARK-3094 .

  • Konfigurasi PySpark menyediakan spark.python.worker.reuseopsi yang dapat digunakan untuk memilih antara proses Python forking untuk setiap tugas dan menggunakan kembali proses yang ada. Opsi terakhir tampaknya berguna untuk menghindari pengumpulan sampah yang mahal (ini lebih merupakan kesan daripada hasil tes sistematis), sedangkan yang pertama (default) adalah optimal untuk dalam hal siaran mahal dan impor.
  • Penghitungan referensi, digunakan sebagai metode pengumpulan sampah lini pertama dalam CPython, bekerja cukup baik dengan beban kerja Spark yang khas (pemrosesan seperti aliran, tanpa siklus referensi) dan mengurangi risiko jeda GC yang lama.

MLlib

(eksekusi Python dan JVM campuran)

Pertimbangan dasar hampir sama seperti sebelumnya dengan beberapa masalah tambahan. Sementara struktur dasar yang digunakan dengan MLlib adalah objek RDD Python sederhana, semua algoritma dieksekusi secara langsung menggunakan Scala.

Ini berarti biaya tambahan untuk mengkonversi objek Python ke objek Scala dan sebaliknya, peningkatan penggunaan memori dan beberapa batasan tambahan yang akan kita bahas nanti.

Sampai sekarang (Spark 2.x), API berbasis RDD berada dalam mode pemeliharaan dan dijadwalkan akan dihapus di Spark 3.0 .

API DataFrame dan Spark ML

(Eksekusi JVM dengan kode Python terbatas pada driver)

Ini mungkin merupakan pilihan terbaik untuk tugas pemrosesan data standar. Karena kode Python sebagian besar terbatas pada operasi logis tingkat tinggi pada driver, seharusnya tidak ada perbedaan kinerja antara Python dan Scala.

Satu pengecualian adalah penggunaan UDFs Python baris-bijaksana yang secara signifikan kurang efisien daripada setara Scala mereka. Meskipun ada beberapa peluang untuk perbaikan (telah ada pengembangan substansial dalam Spark 2.0.0), batasan terbesarnya adalah perjalanan bolak-balik penuh antara representasi internal (JVM) dan interpreter Python. Jika memungkinkan, Anda harus memilih komposisi ekspresi bawaan ( contoh . Perilaku Python UDF telah ditingkatkan di Spark 2.0.0, tetapi masih kurang optimal dibandingkan dengan eksekusi asli.

Ini mungkin membaik di masa depan telah meningkat secara signifikan dengan diperkenalkannya UDF vektor (SPARK-21190 dan ekstensi lebih lanjut) , yang menggunakan Arrow Streaming untuk pertukaran data yang efisien dengan deserialisasi nol-salinan. Untuk sebagian besar aplikasi overhead sekunder mereka bisa saja diabaikan.

Pastikan juga untuk menghindari lewatnya data yang tidak perlu antara DataFramesdan RDDs. Ini membutuhkan serialisasi dan deserialisasi yang mahal, belum lagi transfer data ke dan dari juru bahasa Python.

Perlu dicatat bahwa panggilan Py4J memiliki latensi yang cukup tinggi. Ini termasuk panggilan sederhana seperti:

from pyspark.sql.functions import col

col("foo")

Biasanya, itu tidak masalah (overhead konstan dan tidak tergantung pada jumlah data) tetapi dalam kasus aplikasi real-time lunak, Anda dapat mempertimbangkan caching / menggunakan kembali pembungkus Java.

GraphX ​​dan Spark DataSets

Adapun untuk saat ini (Spark 1.6 2.1) tidak ada yang menyediakan API PySpark sehingga Anda dapat mengatakan bahwa PySpark lebih buruk daripada Scala.

GraphX

Dalam praktiknya, pengembangan GraphX ​​berhenti hampir sepenuhnya dan proyek saat ini dalam mode pemeliharaan dengan tiket JIRA terkait ditutup karena tidak akan diperbaiki . Pustaka GraphFrames menyediakan pustaka pemrosesan grafik alternatif dengan binding Python.

Himpunan data

Berbicara secara subyektif tidak ada banyak tempat untuk mengetikkan DatasetsPython secara statis dan bahkan jika ada implementasi Scala saat ini terlalu sederhana dan tidak memberikan manfaat kinerja yang sama seperti DataFrame.

Streaming

Dari apa yang saya lihat sejauh ini, saya sangat merekomendasikan menggunakan Scala over Python. Ini mungkin berubah di masa depan jika PySpark mendapatkan dukungan untuk aliran terstruktur tetapi saat ini Scala API tampaknya jauh lebih kuat, komprehensif dan efisien. Pengalaman saya cukup terbatas.

Streaming terstruktur di Spark 2.x tampaknya mengurangi kesenjangan antar bahasa tetapi untuk saat ini masih dalam masa awal. Namun demikian, API berbasis RDD telah dirujuk sebagai "streaming warisan" dalam Dokumentasi Databricks (tanggal akses 2017-03-03)) sehingga masuk akal untuk mengharapkan upaya penyatuan lebih lanjut.

Pertimbangan non-kinerja

Paritas fitur

Tidak semua fitur Spark diekspos melalui API PySpark. Pastikan untuk memeriksa apakah bagian-bagian yang Anda butuhkan sudah diimplementasikan dan mencoba memahami batasan yang mungkin.

Ini sangat penting ketika Anda menggunakan MLlib dan konteks campuran serupa (lihat Memanggil fungsi Java / Scala dari tugas ). Agar adil, beberapa bagian API PySpark, seperti mllib.linalg, menyediakan serangkaian metode yang lebih komprehensif daripada Scala.

Desain API

API PySpark sangat mencerminkan rekanan Scala-nya dan karenanya bukan Pythonic. Ini berarti cukup mudah untuk memetakan antar bahasa tetapi pada saat yang sama, kode Python dapat secara signifikan lebih sulit untuk dipahami.

Arsitektur yang kompleks

Aliran data PySpark relatif kompleks dibandingkan dengan eksekusi JVM murni. Jauh lebih sulit untuk beralasan tentang program atau debug PySpark. Apalagi setidaknya pemahaman dasar tentang Scala dan JVM secara umum cukup banyak yang harus dimiliki.

Spark 2.x dan seterusnya

Pergeseran yang sedang berlangsung menuju DatasetAPI, dengan RDD API yang dibekukan membawa peluang dan tantangan bagi pengguna Python. Sementara bagian tingkat tinggi dari API jauh lebih mudah untuk diekspos dalam Python, fitur yang lebih canggih sangat tidak mungkin untuk digunakan secara langsung .

Apalagi fungsi Python asli terus menjadi warga negara kelas dua di dunia SQL. Semoga ini akan membaik di masa mendatang dengan serialisasi Apache Arrow ( upaya saat ini menargetkan datacollection tetapi serde UDF adalah tujuan jangka panjang ).

Untuk proyek yang sangat bergantung pada basis kode Python, alternatif Python murni (seperti Dask atau Ray ) bisa menjadi alternatif yang menarik.

Itu tidak harus menjadi satu vs yang lain

API DataFrame Spark (SQL, Dataset) menyediakan cara yang elegan untuk mengintegrasikan kode Scala / Java dalam aplikasi PySpark. Anda dapat menggunakan DataFramesuntuk mengekspos data ke kode JVM asli dan membaca kembali hasilnya. Saya sudah menjelaskan beberapa opsi di tempat lain dan Anda dapat menemukan contoh kerja ulang-alik Python-Scala di Cara menggunakan kelas Scala di dalam Pyspark .

Itu dapat lebih ditingkatkan dengan memperkenalkan Jenis yang Ditentukan Pengguna (lihat Bagaimana mendefinisikan skema untuk tipe kustom di Spark SQL? ).


Apa yang salah dengan kode yang disediakan dalam pertanyaan

(Penafian: Pythonista sudut pandang. Kemungkinan besar saya telah melewatkan beberapa trik Scala)

Pertama-tama, ada satu bagian dalam kode Anda yang tidak masuk akal sama sekali. Jika Anda sudah memiliki (key, value)pasangan yang dibuat menggunakan zipWithIndexatau enumerateapa gunanya membuat string hanya untuk membaginya setelah itu? flatMaptidak bekerja secara rekursif sehingga Anda bisa menghasilkan tupel dan melewatkan yang berikutmap apa pun.

Bagian lain yang menurut saya bermasalah adalah reduceByKey. Secara umum, reduceByKeyberguna jika menerapkan fungsi agregat dapat mengurangi jumlah data yang harus dikocok. Karena Anda hanya menyatukan senar, tidak ada untungnya di sini. Mengabaikan hal-hal tingkat rendah, seperti jumlah referensi, jumlah data yang harus Anda transfer persis sama dengan groupByKey.

Biasanya saya tidak akan memikirkan itu, tapi sejauh yang saya tahu itu adalah hambatan dalam kode Scala Anda. Menggabungkan string pada JVM adalah operasi yang agak mahal (lihat misalnya: Apakah rangkaian string dalam scala sama mahalnya dengan di Jawa? ). Ini berarti bahwa sesuatu seperti ini _.reduceByKey((v1: String, v2: String) => v1 + ',' + v2) yang setara dengan input4.reduceByKey(valsConcat)dalam kode Anda bukan ide yang baik.

Jika Anda ingin menghindari groupByKeyAnda dapat mencoba menggunakan aggregateByKeydengan StringBuilder. Sesuatu yang mirip dengan ini harus melakukan trik:

rdd.aggregateByKey(new StringBuilder)(
  (acc, e) => {
    if(!acc.isEmpty) acc.append(",").append(e)
    else acc.append(e)
  },
  (acc1, acc2) => {
    if(acc1.isEmpty | acc2.isEmpty)  acc1.addString(acc2)
    else acc1.append(",").addString(acc2)
  }
)

tapi saya ragu itu semua layak diributkan.

Dengan mengingat hal di atas, saya telah menulis ulang kode Anda sebagai berikut:

Scala :

val input = sc.textFile("train.csv", 6).mapPartitionsWithIndex{
  (idx, iter) => if (idx == 0) iter.drop(1) else iter
}

val pairs = input.flatMap(line => line.split(",").zipWithIndex.map{
  case ("true", i) => (i, "1")
  case ("false", i) => (i, "0")
  case p => p.swap
})

val result = pairs.groupByKey.map{
  case (k, vals) =>  {
    val valsString = vals.mkString(",")
    s"$k,$valsString"
  }
}

result.saveAsTextFile("scalaout")

Python :

def drop_first_line(index, itr):
    if index == 0:
        return iter(list(itr)[1:])
    else:
        return itr

def separate_cols(line):
    line = line.replace('true', '1').replace('false', '0')
    vals = line.split(',')
    for (i, x) in enumerate(vals):
        yield (i, x)

input = (sc
    .textFile('train.csv', minPartitions=6)
    .mapPartitionsWithIndex(drop_first_line))

pairs = input.flatMap(separate_cols)

result = (pairs
    .groupByKey()
    .map(lambda kv: "{0},{1}".format(kv[0], ",".join(kv[1]))))

result.saveAsTextFile("pythonout")

Hasil

Dalam local[6]mode (Intel (R) Xeon (R) CPU E3-1245 V2 @ 3.40GHz) dengan memori 4GB per pelaksana yang dibutuhkan (n = 3):

  • Scala - rata-rata: 250,00s, stdev: 12,49
  • Python - mean: 246.66s, stdev: 1.15

Saya cukup yakin bahwa sebagian besar waktu dihabiskan untuk mengocok, membuat serial, deserializing dan tugas-tugas sekunder lainnya. Hanya untuk bersenang-senang, inilah kode single-threaded naif dalam Python yang melakukan tugas yang sama pada mesin ini dalam waktu kurang dari satu menit:

def go():
    with open("train.csv") as fr:
        lines = [
            line.replace('true', '1').replace('false', '0').split(",")
            for line in fr]
    return zip(*lines[1:])
nol323
sumber
22
Salah satu jawaban yang paling jelas, komprehensif dan bermanfaat yang pernah saya temui untuk sementara waktu. Terima kasih!
etov
Kamu pria yang luar biasa!
DennisLi
-4

Ekstensi untuk jawaban di atas -

Scala terbukti lebih cepat dalam banyak hal dibandingkan dengan python tetapi ada beberapa alasan mengapa python menjadi lebih populer dibandingkan scala, mari kita lihat beberapa di antaranya -

Python untuk Apache Spark cukup mudah dipelajari dan digunakan. Namun, ini bukan satu-satunya alasan mengapa Pyspark adalah pilihan yang lebih baik daripada Scala. Masih ada lagi.

Python API untuk Spark mungkin lebih lambat di cluster, tetapi pada akhirnya, para ilmuwan data dapat melakukan lebih banyak dengannya dibandingkan dengan Scala. Kompleksitas Scala tidak ada. Antarmuka sederhana dan komprehensif.

Berbicara tentang keterbacaan kode, pemeliharaan, dan keakraban dengan Python API untuk Apache Spark jauh lebih baik daripada Scala.

Python dilengkapi dengan beberapa perpustakaan yang terkait dengan pembelajaran mesin dan pemrosesan bahasa alami. Ini membantu dalam analisis data dan juga memiliki statistik yang lebih matang dan teruji waktu. Misalnya, numpy, panda, scikit-learn, seaborn dan matplotlib.

Catatan: Sebagian besar ilmuwan data menggunakan pendekatan hibrid di mana mereka menggunakan yang terbaik dari kedua API.

Terakhir, komunitas Scala seringkali menjadi kurang bermanfaat bagi programmer. Ini membuat Python menjadi pembelajaran yang sangat berharga. Jika Anda memiliki cukup pengalaman dengan bahasa pemrograman yang diketik secara statis seperti Java, Anda dapat berhenti khawatir tentang tidak menggunakan Scala sama sekali.


sumber