Saya lebih suka Python daripada Scala. Tetapi, karena Spark secara asli ditulis dalam Scala, saya mengharapkan kode saya berjalan lebih cepat di Scala daripada versi Python karena alasan yang jelas.
Dengan asumsi itu, saya berpikir untuk belajar & menulis versi Scala dari beberapa kode preprocessing yang sangat umum untuk sekitar 1 GB data. Data diambil dari kompetisi SpringLeaf di Kaggle . Hanya untuk memberikan ikhtisar data (berisi 1936 dimensi dan 145.232 baris). Data terdiri dari berbagai jenis misalnya int, float, string, boolean. Saya menggunakan 6 core dari 8 untuk pemrosesan Spark; itu sebabnya saya menggunakan minPartitions=6
sehingga setiap inti memiliki sesuatu untuk diproses.
Kode Scala
val input = sc.textFile("train.csv", minPartitions=6)
val input2 = input.mapPartitionsWithIndex { (idx, iter) =>
if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"
def separateCols(line: String): Array[String] = {
val line2 = line.replaceAll("true", "1")
val line3 = line2.replaceAll("false", "0")
val vals: Array[String] = line3.split(",")
for((x,i) <- vals.view.zipWithIndex) {
vals(i) = "VAR_%04d".format(i) + delim1 + x
}
vals
}
val input3 = input2.flatMap(separateCols)
def toKeyVal(line: String): (String, String) = {
val vals = line.split(delim1)
(vals(0), vals(1))
}
val input4 = input3.map(toKeyVal)
def valsConcat(val1: String, val2: String): String = {
val1 + "," + val2
}
val input5 = input4.reduceByKey(valsConcat)
input5.saveAsTextFile("output")
Kode Python
input = sc.textFile('train.csv', minPartitions=6)
DELIM_1 = '\001'
def drop_first_line(index, itr):
if index == 0:
return iter(list(itr)[1:])
else:
return itr
input2 = input.mapPartitionsWithIndex(drop_first_line)
def separate_cols(line):
line = line.replace('true', '1').replace('false', '0')
vals = line.split(',')
vals2 = ['VAR_%04d%s%s' %(e, DELIM_1, val.strip('\"'))
for e, val in enumerate(vals)]
return vals2
input3 = input2.flatMap(separate_cols)
def to_key_val(kv):
key, val = kv.split(DELIM_1)
return (key, val)
input4 = input3.map(to_key_val)
def vals_concat(v1, v2):
return v1 + ',' + v2
input5 = input4.reduceByKey(vals_concat)
input5.saveAsTextFile('output')
Tahap Kinerja Scala 0 (38 menit), Tahap 1 (18 detik)
Tahap Kinerja Python 0 (11 menit), Tahap 1 (7 detik)
Keduanya menghasilkan grafik visualisasi DAG yang berbeda (karena kedua gambar menunjukkan fungsi tahap 0 yang berbeda untuk Scala ( map
) dan Python ( reduceByKey
))
Tapi, pada dasarnya kedua kode mencoba mengubah data menjadi (dimensional_id, string of list of values) RDD dan simpan ke disk. Output akan digunakan untuk menghitung berbagai statistik untuk setiap dimensi.
Dari segi kinerja, kode Scala untuk data nyata seperti ini tampaknya berjalan 4 kali lebih lambat daripada versi Python. Kabar baik bagi saya adalah memberi saya motivasi yang baik untuk tetap bersama Python. Berita buruknya adalah saya tidak begitu mengerti mengapa?
sumber
Jawaban:
Jawaban asli yang membahas kode dapat ditemukan di bawah.
Pertama-tama, Anda harus membedakan antara berbagai jenis API, masing-masing dengan pertimbangan kinerjanya sendiri.
API RDD
(Struktur Python murni dengan orkestrasi berbasis JVM)
Ini adalah komponen yang akan paling dipengaruhi oleh kinerja kode Python dan detail implementasi PySpark. Sementara kinerja Python agak tidak mungkin menjadi masalah, ada setidaknya beberapa faktor yang harus Anda pertimbangkan:
Pelaksana berbasis proses (Python) versus pelaksana berbasis benang (JVM multipel tunggal) (Scala). Setiap pelaksana Python berjalan dalam prosesnya sendiri. Sebagai efek samping, ia memberikan isolasi yang lebih kuat daripada JVM dan kontrol atas siklus hidup eksekutor tetapi berpotensi penggunaan memori yang jauh lebih tinggi:
Performa kode Python itu sendiri. Scala secara umum lebih cepat dari Python tetapi akan bervariasi pada tugas ke tugas. Selain itu Anda memiliki banyak pilihan termasuk JIT seperti Numba , ekstensi C ( Cython ) atau perpustakaan khusus seperti Theano . Terakhir,
jika Anda tidak menggunakan ML / MLlib (atau hanya tumpukan NumPy), pertimbangkan untuk menggunakan PyPy sebagai penerjemah alternatif. Lihat SPARK-3094 .spark.python.worker.reuse
opsi yang dapat digunakan untuk memilih antara proses Python forking untuk setiap tugas dan menggunakan kembali proses yang ada. Opsi terakhir tampaknya berguna untuk menghindari pengumpulan sampah yang mahal (ini lebih merupakan kesan daripada hasil tes sistematis), sedangkan yang pertama (default) adalah optimal untuk dalam hal siaran mahal dan impor.MLlib
(eksekusi Python dan JVM campuran)
Pertimbangan dasar hampir sama seperti sebelumnya dengan beberapa masalah tambahan. Sementara struktur dasar yang digunakan dengan MLlib adalah objek RDD Python sederhana, semua algoritma dieksekusi secara langsung menggunakan Scala.
Ini berarti biaya tambahan untuk mengkonversi objek Python ke objek Scala dan sebaliknya, peningkatan penggunaan memori dan beberapa batasan tambahan yang akan kita bahas nanti.
Sampai sekarang (Spark 2.x), API berbasis RDD berada dalam mode pemeliharaan dan dijadwalkan akan dihapus di Spark 3.0 .
API DataFrame dan Spark ML
(Eksekusi JVM dengan kode Python terbatas pada driver)
Ini mungkin merupakan pilihan terbaik untuk tugas pemrosesan data standar. Karena kode Python sebagian besar terbatas pada operasi logis tingkat tinggi pada driver, seharusnya tidak ada perbedaan kinerja antara Python dan Scala.
Satu pengecualian adalah penggunaan UDFs Python baris-bijaksana yang secara signifikan kurang efisien daripada setara Scala mereka. Meskipun ada beberapa peluang untuk perbaikan (telah ada pengembangan substansial dalam Spark 2.0.0), batasan terbesarnya adalah perjalanan bolak-balik penuh antara representasi internal (JVM) dan interpreter Python. Jika memungkinkan, Anda harus memilih komposisi ekspresi bawaan ( contoh . Perilaku Python UDF telah ditingkatkan di Spark 2.0.0, tetapi masih kurang optimal dibandingkan dengan eksekusi asli.
Ini
mungkin membaik di masa depantelah meningkat secara signifikan dengan diperkenalkannya UDF vektor (SPARK-21190 dan ekstensi lebih lanjut) , yang menggunakan Arrow Streaming untuk pertukaran data yang efisien dengan deserialisasi nol-salinan. Untuk sebagian besar aplikasi overhead sekunder mereka bisa saja diabaikan.Pastikan juga untuk menghindari lewatnya data yang tidak perlu antara
DataFrames
danRDDs
. Ini membutuhkan serialisasi dan deserialisasi yang mahal, belum lagi transfer data ke dan dari juru bahasa Python.Perlu dicatat bahwa panggilan Py4J memiliki latensi yang cukup tinggi. Ini termasuk panggilan sederhana seperti:
Biasanya, itu tidak masalah (overhead konstan dan tidak tergantung pada jumlah data) tetapi dalam kasus aplikasi real-time lunak, Anda dapat mempertimbangkan caching / menggunakan kembali pembungkus Java.
GraphX dan Spark DataSets
Adapun untuk saat ini (Spark
GraphX1.62.1) tidak ada yang menyediakan API PySpark sehingga Anda dapat mengatakan bahwa PySpark lebih buruk daripada Scala.Dalam praktiknya, pengembangan GraphX berhenti hampir sepenuhnya dan proyek saat ini dalam mode pemeliharaan dengan tiket JIRA terkait ditutup karena tidak akan diperbaiki . Pustaka GraphFrames menyediakan pustaka pemrosesan grafik alternatif dengan binding Python.
Himpunan dataBerbicara secara subyektif tidak ada banyak tempat untuk mengetikkan
Datasets
Python secara statis dan bahkan jika ada implementasi Scala saat ini terlalu sederhana dan tidak memberikan manfaat kinerja yang sama sepertiDataFrame
.Streaming
Dari apa yang saya lihat sejauh ini, saya sangat merekomendasikan menggunakan Scala over Python. Ini mungkin berubah di masa depan jika PySpark mendapatkan dukungan untuk aliran terstruktur tetapi saat ini Scala API tampaknya jauh lebih kuat, komprehensif dan efisien. Pengalaman saya cukup terbatas.
Streaming terstruktur di Spark 2.x tampaknya mengurangi kesenjangan antar bahasa tetapi untuk saat ini masih dalam masa awal. Namun demikian, API berbasis RDD telah dirujuk sebagai "streaming warisan" dalam Dokumentasi Databricks (tanggal akses 2017-03-03)) sehingga masuk akal untuk mengharapkan upaya penyatuan lebih lanjut.
Pertimbangan non-kinerja
Paritas fiturTidak semua fitur Spark diekspos melalui API PySpark. Pastikan untuk memeriksa apakah bagian-bagian yang Anda butuhkan sudah diimplementasikan dan mencoba memahami batasan yang mungkin.
Ini sangat penting ketika Anda menggunakan MLlib dan konteks campuran serupa (lihat Memanggil fungsi Java / Scala dari tugas ). Agar adil, beberapa bagian API PySpark, seperti
Desain APImllib.linalg
, menyediakan serangkaian metode yang lebih komprehensif daripada Scala.API PySpark sangat mencerminkan rekanan Scala-nya dan karenanya bukan Pythonic. Ini berarti cukup mudah untuk memetakan antar bahasa tetapi pada saat yang sama, kode Python dapat secara signifikan lebih sulit untuk dipahami.
Arsitektur yang kompleksAliran data PySpark relatif kompleks dibandingkan dengan eksekusi JVM murni. Jauh lebih sulit untuk beralasan tentang program atau debug PySpark. Apalagi setidaknya pemahaman dasar tentang Scala dan JVM secara umum cukup banyak yang harus dimiliki.
Spark 2.x dan seterusnyaPergeseran yang sedang berlangsung menuju
Dataset
API, dengan RDD API yang dibekukan membawa peluang dan tantangan bagi pengguna Python. Sementara bagian tingkat tinggi dari API jauh lebih mudah untuk diekspos dalam Python, fitur yang lebih canggih sangat tidak mungkin untuk digunakan secara langsung .Apalagi fungsi Python asli terus menjadi warga negara kelas dua di dunia SQL. Semoga ini akan membaik di masa mendatang dengan serialisasi Apache Arrow ( upaya saat ini menargetkan data
collection
tetapi serde UDF adalah tujuan jangka panjang ).Untuk proyek yang sangat bergantung pada basis kode Python, alternatif Python murni (seperti Dask atau Ray ) bisa menjadi alternatif yang menarik.
Itu tidak harus menjadi satu vs yang lain
API DataFrame Spark (SQL, Dataset) menyediakan cara yang elegan untuk mengintegrasikan kode Scala / Java dalam aplikasi PySpark. Anda dapat menggunakan
DataFrames
untuk mengekspos data ke kode JVM asli dan membaca kembali hasilnya. Saya sudah menjelaskan beberapa opsi di tempat lain dan Anda dapat menemukan contoh kerja ulang-alik Python-Scala di Cara menggunakan kelas Scala di dalam Pyspark .Itu dapat lebih ditingkatkan dengan memperkenalkan Jenis yang Ditentukan Pengguna (lihat Bagaimana mendefinisikan skema untuk tipe kustom di Spark SQL? ).
Apa yang salah dengan kode yang disediakan dalam pertanyaan
(Penafian: Pythonista sudut pandang. Kemungkinan besar saya telah melewatkan beberapa trik Scala)
Pertama-tama, ada satu bagian dalam kode Anda yang tidak masuk akal sama sekali. Jika Anda sudah memiliki
(key, value)
pasangan yang dibuat menggunakanzipWithIndex
atauenumerate
apa gunanya membuat string hanya untuk membaginya setelah itu?flatMap
tidak bekerja secara rekursif sehingga Anda bisa menghasilkan tupel dan melewatkan yang berikutmap
apa pun.Bagian lain yang menurut saya bermasalah adalah
reduceByKey
. Secara umum,reduceByKey
berguna jika menerapkan fungsi agregat dapat mengurangi jumlah data yang harus dikocok. Karena Anda hanya menyatukan senar, tidak ada untungnya di sini. Mengabaikan hal-hal tingkat rendah, seperti jumlah referensi, jumlah data yang harus Anda transfer persis sama dengangroupByKey
.Biasanya saya tidak akan memikirkan itu, tapi sejauh yang saya tahu itu adalah hambatan dalam kode Scala Anda. Menggabungkan string pada JVM adalah operasi yang agak mahal (lihat misalnya: Apakah rangkaian string dalam scala sama mahalnya dengan di Jawa? ). Ini berarti bahwa sesuatu seperti ini
_.reduceByKey((v1: String, v2: String) => v1 + ',' + v2)
yang setara denganinput4.reduceByKey(valsConcat)
dalam kode Anda bukan ide yang baik.Jika Anda ingin menghindari
groupByKey
Anda dapat mencoba menggunakanaggregateByKey
denganStringBuilder
. Sesuatu yang mirip dengan ini harus melakukan trik:tapi saya ragu itu semua layak diributkan.
Dengan mengingat hal di atas, saya telah menulis ulang kode Anda sebagai berikut:
Scala :
Python :
Hasil
Dalam
local[6]
mode (Intel (R) Xeon (R) CPU E3-1245 V2 @ 3.40GHz) dengan memori 4GB per pelaksana yang dibutuhkan (n = 3):Saya cukup yakin bahwa sebagian besar waktu dihabiskan untuk mengocok, membuat serial, deserializing dan tugas-tugas sekunder lainnya. Hanya untuk bersenang-senang, inilah kode single-threaded naif dalam Python yang melakukan tugas yang sama pada mesin ini dalam waktu kurang dari satu menit:
sumber
Ekstensi untuk jawaban di atas -
Scala terbukti lebih cepat dalam banyak hal dibandingkan dengan python tetapi ada beberapa alasan mengapa python menjadi lebih populer dibandingkan scala, mari kita lihat beberapa di antaranya -
Python untuk Apache Spark cukup mudah dipelajari dan digunakan. Namun, ini bukan satu-satunya alasan mengapa Pyspark adalah pilihan yang lebih baik daripada Scala. Masih ada lagi.
Python API untuk Spark mungkin lebih lambat di cluster, tetapi pada akhirnya, para ilmuwan data dapat melakukan lebih banyak dengannya dibandingkan dengan Scala. Kompleksitas Scala tidak ada. Antarmuka sederhana dan komprehensif.
Berbicara tentang keterbacaan kode, pemeliharaan, dan keakraban dengan Python API untuk Apache Spark jauh lebih baik daripada Scala.
Python dilengkapi dengan beberapa perpustakaan yang terkait dengan pembelajaran mesin dan pemrosesan bahasa alami. Ini membantu dalam analisis data dan juga memiliki statistik yang lebih matang dan teruji waktu. Misalnya, numpy, panda, scikit-learn, seaborn dan matplotlib.
Catatan: Sebagian besar ilmuwan data menggunakan pendekatan hibrid di mana mereka menggunakan yang terbaik dari kedua API.
Terakhir, komunitas Scala seringkali menjadi kurang bermanfaat bagi programmer. Ini membuat Python menjadi pembelajaran yang sangat berharga. Jika Anda memiliki cukup pengalaman dengan bahasa pemrograman yang diketik secara statis seperti Java, Anda dapat berhenti khawatir tentang tidak menggunakan Scala sama sekali.
sumber