Saya sudah mulai menggunakan Spark SQL dan DataFrames di Spark 1.4.0. Saya ingin mendefinisikan pemartisi khusus di DataFrames, di Scala, tetapi tidak melihat cara melakukan ini.
Salah satu tabel data yang saya kerjakan berisi daftar transaksi, berdasarkan akun, silimar ke contoh berikut.
Account Date Type Amount
1001 2014-04-01 Purchase 100.00
1001 2014-04-01 Purchase 50.00
1001 2014-04-05 Purchase 70.00
1001 2014-04-01 Payment -150.00
1002 2014-04-01 Purchase 80.00
1002 2014-04-02 Purchase 22.00
1002 2014-04-04 Payment -120.00
1002 2014-04-04 Purchase 60.00
1003 2014-04-02 Purchase 210.00
1003 2014-04-03 Purchase 15.00
Setidaknya pada awalnya, sebagian besar kalkulasi akan terjadi antara transaksi di dalam akun. Jadi saya ingin data dipartisi sehingga semua transaksi untuk akun berada di partisi Spark yang sama.
Tapi saya tidak melihat cara untuk mendefinisikan ini. Kelas DataFrame memiliki metode yang disebut 'repartition (Int)', di mana Anda dapat menentukan jumlah partisi yang akan dibuat. Tapi saya tidak melihat metode apa pun yang tersedia untuk menentukan pemartisi khusus untuk DataFrame, seperti yang dapat ditentukan untuk RDD.
Data sumber disimpan di Parket. Saya melihat bahwa ketika menulis DataFrame ke Parquet, Anda dapat menentukan kolom untuk dipartisi, jadi mungkin saya bisa memberi tahu Parquet untuk mempartisi datanya dengan kolom 'Akun'. Tapi mungkin ada jutaan akun, dan jika saya memahami Parket dengan benar, ini akan membuat direktori berbeda untuk setiap Akun, jadi itu tidak terdengar seperti solusi yang masuk akal.
Apakah ada cara untuk membuat Spark mempartisi DataFrame ini sehingga semua data untuk sebuah Akun berada di partisi yang sama?
int(account/someInteger)
dan dengan demikian mendapatkan jumlah akun yang wajar per direktori.partitionBy(Partitioner)
, tetapi untuk DataFrames, bukan RDD. Saya sekarang melihat bahwapartitionBy
hanya tersedia untuk Pair RDD, tidak yakin mengapa demikian.Jawaban:
Percikan> = 2.3.0
SPARK-22614 memperlihatkan partisi jarak.
SPARK-22389 memperlihatkan partisi format eksternal di Data Source API v2 .
Percikan> = 1.6.0
Dalam Spark> = 1.6 dimungkinkan untuk menggunakan partisi menurut kolom untuk kueri dan caching. Lihat: SPARK-11410 dan SPARK-4849 menggunakan
repartition
metode:Tidak seperti
RDDs
SparkDataset
(termasukDataset[Row]
aliasDataFrame
) tidak dapat menggunakan pemartisi khusus seperti untuk saat ini. Anda biasanya dapat mengatasinya dengan membuat kolom partisi buatan tetapi itu tidak akan memberi Anda fleksibilitas yang sama.Spark <1.6.0:
Satu hal yang dapat Anda lakukan adalah memasukkan data masukan prapartisi sebelum Anda membuat file
DataFrame
Karena
DataFrame
pembuatan dariRDD
hanya memerlukan fase peta sederhana, tata letak partisi yang ada harus dipertahankan *:Cara yang sama Anda dapat mempartisi ulang yang ada
DataFrame
:Jadi sepertinya bukan tidak mungkin. Pertanyaannya tetap apakah itu masuk akal. Saya akan berargumen bahwa seringkali tidak:
Proses partisi ulang adalah proses yang mahal. Dalam skenario umum, sebagian besar data harus diserialisasi, dikocok, dan dideserialisasi. Di sisi lain, jumlah operasi yang dapat memanfaatkan data yang dipartisi sebelumnya relatif kecil dan selanjutnya dibatasi jika API internal tidak dirancang untuk memanfaatkan properti ini.
GROUP BY
- dimungkinkan untuk mengurangi jejak memori dari buffer sementara **, tetapi biaya keseluruhan jauh lebih tinggi. Lebih atau kurang setara dengangroupByKey.mapValues(_.reduce)
(perilaku saat ini) vsreduceByKey
(pra-partisi). Sepertinya tidak berguna dalam praktik.SqlContext.cacheTable
. Karena tampaknya ini menggunakan pengkodean panjang proses, penerapanOrderedRDDFunctions.repartitionAndSortWithinPartitions
dapat meningkatkan rasio kompresi.Performa sangat bergantung pada distribusi kunci. Jika miring maka akan menghasilkan pemanfaatan sumber daya yang kurang optimal. Dalam skenario kasus terburuk, tidak mungkin menyelesaikan pekerjaan sama sekali.
Konsep terkait
Mempartisi dengan sumber JDBC :
Sumber data JDBC mendukung
predicates
argumen . Ini dapat digunakan sebagai berikut:Ini menciptakan satu partisi JDBC per predikat. Perlu diingat bahwa jika set yang dibuat menggunakan predikat individual tidak terputus-putus, Anda akan melihat duplikat di tabel yang dihasilkan.
partitionBy
metode dalamDataFrameWriter
:Spark
DataFrameWriter
menyediakanpartitionBy
metode yang dapat digunakan untuk "mempartisi" data saat menulis. Ini memisahkan data saat menulis menggunakan kumpulan kolom yang disediakanIni memungkinkan predikat menekan baca untuk kueri berdasarkan kunci:
tapi itu tidak sama dengan
DataFrame.repartition
. Dalam agregasi tertentu seperti:masih membutuhkan
TungstenExchange
:bucketBy
metode diDataFrameWriter
(Spark> = 2.0):bucketBy
memiliki aplikasi yang serupapartitionBy
tetapi hanya tersedia untuk tables (saveAsTable
). Informasi pengelompokan dapat digunakan untuk mengoptimalkan gabungan:* Dengan tata letak partisi yang saya maksud hanya distribusi data.
partitioned
RDD tidak lagi menjadi pemartisi. ** Dengan asumsi tidak ada proyeksi awal. Jika agregasi hanya mencakup subset kecil dari kolom, mungkin tidak ada keuntungan sama sekali.sumber
DataFrameWriter.partitionBy
secara logis tidak sama denganDataFrame.repartition
. Mantan on tidak mengocok, itu hanya memisahkan output. Mengenai pertanyaan pertama. - data disimpan per partisi dan tidak ada pengacakan. Anda dapat dengan mudah memeriksanya dengan membaca file individual. Tetapi Spark sendiri tidak memiliki cara untuk mengetahuinya jika ini yang Anda inginkan.Di Spark <1.6 Jika Anda membuat
HiveContext
, bukan yang lama biasa,SqlContext
Anda dapat menggunakan HiveQLDISTRIBUTE BY colX...
(memastikan setiap pengecil N mendapatkan rentang x yang tidak tumpang tindih) &CLUSTER BY colX...
(pintasan untuk Distribusikan Berdasarkan dan Urutkan Berdasarkan) misalnya;Tidak yakin bagaimana hal ini cocok dengan api Spark DF. Kata kunci ini tidak didukung dalam SqlContext normal (perhatikan Anda tidak perlu memiliki penyimpanan meta sarang untuk menggunakan HiveContext)
EDIT: Spark 1.6+ sekarang memiliki ini di API DataFrame asli
sumber
Jadi untuk memulai dengan beberapa jenis jawaban :) - Anda tidak bisa
Saya bukan seorang ahli, tetapi sejauh yang saya mengerti DataFrames, mereka tidak sama dengan rdd dan DataFrame tidak memiliki hal seperti Partitioner.
Umumnya ide DataFrame adalah menyediakan level abstraksi lain yang menangani masalah itu sendiri. Kueri di DataFrame diterjemahkan ke dalam rencana logis yang selanjutnya diterjemahkan ke operasi di RDD. Pemartisian yang Anda sarankan mungkin akan diterapkan secara otomatis atau setidaknya harus diterapkan.
Jika Anda tidak mempercayai SparkSQL yang akan memberikan beberapa jenis pekerjaan yang optimal, Anda selalu dapat mengubah DataFrame ke RDD [Row] seperti yang disarankan di komentar.
sumber
Gunakan DataFrame yang dikembalikan oleh:
Tidak ada cara eksplisit untuk menggunakan
partitionBy
DataFrame, hanya di PairRDD, tetapi saat Anda mengurutkan DataFrame, itu akan digunakan di LogicalPlan dan itu akan membantu saat Anda perlu membuat kalkulasi pada setiap Akun.Saya baru saja menemukan masalah yang sama persis, dengan kerangka data yang ingin saya partisi berdasarkan akun. Saya berasumsi bahwa ketika Anda mengatakan "ingin data dipartisi sehingga semua transaksi untuk akun berada di partisi Spark yang sama", Anda menginginkannya untuk skala dan kinerja, tetapi kode Anda tidak bergantung padanya (seperti menggunakan
mapPartitions()
dll), bukan?sumber
Saya bisa melakukan ini menggunakan RDD. Tetapi saya tidak tahu apakah ini solusi yang dapat diterima untuk Anda. Setelah DF tersedia sebagai RDD, Anda dapat mengajukan permohonan
repartitionAndSortWithinPartitions
untuk melakukan partisi ulang data.Ini contoh yang saya gunakan:
sumber