Menggabungkan beberapa frame data secara bijaksana di PySpark

21

Saya memiliki frame 10 data pyspark.sql.dataframe.DataFrame, yang diperoleh dari randomSplitsebagai (td1, td2, td3, td4, td5, td6, td7, td8, td9, td10) = td.randomSplit([.1, .1, .1, .1, .1, .1, .1, .1, .1, .1], seed = 100)Sekarang saya ingin bergabung 9 td's ke dalam bingkai data tunggal, bagaimana saya harus melakukan itu?

Saya sudah mencoba unionAll, tetapi fungsi ini hanya menerima dua argumen.

td1_2 = td1.unionAll(td2) 
# this is working fine

td1_2_3 = td1.unionAll(td2, td3) 
# error TypeError: unionAll() takes exactly 2 arguments (3 given)

Apakah ada cara untuk menggabungkan lebih dari dua frame data secara bijaksana?

Tujuan melakukan ini adalah bahwa saya melakukan Validasi Silang 10 kali lipat secara manual tanpa menggunakan CrossValidatormetode PySpark , Jadi dengan memasukkan 9 ke dalam pelatihan dan 1 ke dalam data uji dan kemudian saya akan mengulanginya untuk kombinasi lainnya.

krishna Prasad
sumber
1
Ini tidak langsung menjawab pertanyaan, tetapi di sini saya memberikan saran untuk meningkatkan metode penamaan sehingga pada akhirnya, kita tidak perlu mengetik, misalnya: [td1, td2, td3, td4, td5, td6, td7 , td8, td9, td10]. Bayangkan melakukan ini untuk CV 100 kali lipat. Inilah yang akan saya lakukan: bagian = [0,1] * 10 cv = df7.randomSplit (bagian) lipatan = daftar (rentang (10)) untuk i dalam kisaran (10): test_data = cv [i] fold_no_i = lipatan [: i] + lipatan [i + 1:] train_data = cv [fold_no_i [0]] untuk j di fold_no_i [1:]: train_data = train_data.union (cv [j])
ngoc thoag

Jawaban:

37

Dicuri dari: /programming/33743978/spark-union-of-multiple-rdds

Di luar serikat pekerja, ini adalah satu-satunya cara untuk melakukannya untuk DataFrames.

from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame

def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

unionAll(td2, td3, td4, td5, td6, td7, td8, td9, td10)

Apa yang terjadi adalah ia mengambil semua objek yang Anda berikan sebagai parameter dan menguranginya menggunakan unionAll (reduksi ini dari Python, bukan reduksi percikan meskipun mereka bekerja sama) yang akhirnya menguranginya menjadi satu DataFrame.

Jika alih-alih DataFrame, itu adalah RDD biasa, Anda dapat meneruskan daftar mereka ke fungsi gabungan SparkContext Anda

Suntingan: Untuk tujuan Anda, saya mengusulkan metode yang berbeda, karena Anda harus mengulangi seluruh penyatuan ini 10 kali untuk lipatan Anda yang berbeda untuk validasi silang, saya akan menambahkan label yang memiliki lipatan baris dan hanya menyaring DataFrame Anda untuk setiap lipatan berdasarkan label

Jan van der Vegt
sumber
(+1) Pekerjaan yang menyenangkan. Namun, perlu ada fungsi yang memungkinkan penggabungan beberapa dataframe. Akan sangat berguna!
Dawny33
Saya tidak setuju dengan itu
Jan van der Vegt
@JanvanderVegt Terima kasih, ini bekerja dan gagasan untuk menambahkan label untuk menyaring pelatihan dan pengujian dataset, saya sudah melakukannya. Terima kasih banyak atas bantuan Anda.
krishna Prasad
@Jan van der Vegt Bisakah Anda menerapkan logika yang sama untuk Bergabung dan jawab pertanyaan ini
GeorgeOfTheRF
6

Terkadang, ketika kerangka data untuk digabungkan tidak memiliki urutan kolom yang sama, lebih baik untuk df2.select (df1.columns) untuk memastikan kedua df memiliki urutan kolom yang sama sebelum serikat.

import functools 

def unionAll(dfs):
    return functools.reduce(lambda df1,df2: df1.union(df2.select(df1.columns)), dfs) 

Contoh:

df1 = spark.createDataFrame([[1,1],[2,2]],['a','b'])
# different column order. 
df2 = spark.createDataFrame([[3,333],[4,444]],['b','a']) 
df3 = spark.createDataFrame([555,5],[666,6]],['b','a']) 

unioned_df = unionAll([df1, df2, df3])
unioned_df.show() 

masukkan deskripsi gambar di sini

selain itu akan menghasilkan hasil di bawah ini sebagai gantinya.

from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame

def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs) 

unionAll(*[df1, df2, df3]).show()

masukkan deskripsi gambar di sini

Wong Tat Yau
sumber
2

Bagaimana kalau menggunakan rekursi?

def union_all(dfs):
    if len(dfs) > 1:
        return dfs[0].unionAll(union_all(dfs[1:]))
    else:
        return dfs[0]

td = union_all([td1, td2, td3, td4, td5, td6, td7, td8, td9, td10])
proinsias
sumber