Saya berurusan dengan Pandas DataFrame yang cukup besar - dataset saya menyerupai df
pengaturan berikut :
import pandas as pd
import numpy as np
#--------------------------------------------- SIZING PARAMETERS :
R1 = 20 # .repeat( repeats = R1 )
R2 = 10 # .repeat( repeats = R2 )
R3 = 541680 # .repeat( repeats = [ R3, R4 ] )
R4 = 576720 # .repeat( repeats = [ R3, R4 ] )
T = 55920 # .tile( , T)
A1 = np.arange( 0, 2708400, 100 ) # ~ 20x re-used
A2 = np.arange( 0, 2883600, 100 ) # ~ 20x re-used
#--------------------------------------------- DataFrame GENERATION :
df = pd.DataFrame.from_dict(
{ 'measurement_id': np.repeat( [0, 1], repeats = [ R3, R4 ] ),
'time':np.concatenate( [ np.repeat( A1, repeats = R1 ),
np.repeat( A2, repeats = R1 ) ] ),
'group': np.tile( np.repeat( [0, 1], repeats = R2 ), T ),
'object': np.tile( np.arange( 0, R1 ), T )
}
)
#--------------------------------------------- DataFrame RE-PROCESSING :
df = pd.concat( [ df,
df \
.groupby( ['measurement_id', 'time', 'group'] ) \
.apply( lambda x: np.random.uniform( 0, 100, 10 ) ) \
.explode() \
.astype( 'float' ) \
.to_frame( 'var' ) \
.reset_index( drop = True )
], axis = 1
)
Catatan: Untuk tujuan memiliki contoh minimal, dapat dengan mudah dimasukkan kembali (misalnya dengan df.loc[df['time'] <= 400, :]
), tetapi karena saya mensimulasikan data, saya pikir ukuran aslinya akan memberikan gambaran yang lebih baik.
Untuk setiap grup yang ditentukan oleh ['measurement_id', 'time', 'group']
saya perlu memanggil fungsi berikut:
from sklearn.cluster import SpectralClustering
from pandarallel import pandarallel
def cluster( x, index ):
if len( x ) >= 2:
data = np.asarray( x )[:, np.newaxis]
clustering = SpectralClustering( n_clusters = 5,
random_state = 42
).fit( data )
return pd.Series( clustering.labels_ + 1, index = index )
else:
return pd.Series( np.nan, index = index )
Untuk meningkatkan kinerja saya mencoba dua pendekatan:
Paket pandarallel
Pendekatan pertama adalah memparalelkan perhitungan menggunakan pandarallel
paket:
pandarallel.initialize( progress_bar = True )
df \
.groupby( ['measurement_id', 'time', 'group'] ) \
.parallel_apply( lambda x: cluster( x['var'], x['object'] ) )
Namun, ini tampaknya kurang optimal karena mengkonsumsi banyak RAM dan tidak semua core digunakan dalam perhitungan (bahkan meskipun menentukan jumlah core secara eksplisit dalam pandarallel.initialize()
metode ini). Juga, kadang-kadang perhitungan diakhiri dengan berbagai kesalahan, walaupun saya belum punya kesempatan untuk menemukan alasan untuk itu (mungkin kekurangan RAM?).
PySpark Pandas UDF
Saya juga mencoba Spark Pandas UDF, meskipun saya benar-benar baru di Spark. Inilah usaha saya:
import findspark; findspark.init()
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
spark = SparkSession.builder.master( "local" ).appName( "test" ).config( conf = SparkConf() ).getOrCreate()
df = spark.createDataFrame( df )
@pandas_udf( StructType( [StructField( 'id', IntegerType(), True )] ), functionType = PandasUDFType.GROUPED_MAP )
def cluster( df ):
if len( df['var'] ) >= 2:
data = np.asarray( df['var'] )[:, np.newaxis]
clustering = SpectralClustering( n_clusters = 5,
random_state = 42
).fit( data )
return pd.DataFrame( clustering.labels_ + 1,
index = df['object']
)
else:
return pd.DataFrame( np.nan,
index = df['object']
)
res = df \
.groupBy( ['id_half', 'frame', 'team_id'] ) \
.apply( cluster ) \
.toPandas()
Sayangnya, kinerjanya juga tidak memuaskan, dan dari apa yang saya baca pada topik, ini mungkin hanya beban menggunakan fungsi UDF, yang ditulis dalam Python dan kebutuhan terkait untuk mengubah semua objek Python menjadi objek Spark dan kembali.
Jadi inilah pertanyaanku:
- Bisakah salah satu pendekatan saya disesuaikan untuk menghilangkan kemungkinan kemacetan dan meningkatkan kinerja? (misalnya pengaturan PySpark, menyesuaikan operasi sub-optimal, dll.)
- Apakah ada alternatif yang lebih baik? Bagaimana mereka membandingkan dengan solusi yang disediakan dalam hal kinerja?
dask
(((jadi komentar saya itu hanya saran untuk penelitian.Jawaban:
+1
untuk menyebutkan biaya tambahan pengaturan tambahan untuk kedua strategi komputasi. Ini selalu membuat titik impas, hanya setelah itu sebuah non-[SERIAL]
strategi dapat mencapai kegembiraan yang menguntungkan dari beberapa yang ingin memiliki-[TIME]
percepatan Domain (namun, jika lainnya, biasanya[SPACE]
biaya Domain mengizinkan atau tetap layak - ya, RAM. .. keberadaan & akses ke perangkat berukuran demikian, anggaran, dan kendala dunia nyata serupa lainnya)Pertama,
pemeriksaan pra-penerbangan, sebelum lepas landas
. Perumusan Amdahl yang baru dan ketat atas biaya saat ini mampu menggabungkan kedua
pSO + pTO
biaya tambahan ini dan mencerminkannya dalam memprediksi tingkat-tingkat Peningkatan yang dapat dicapai termasuk titik impas titik, karena itu mungkin menjadi bermakna (dalam biaya / efek, efisiensi) untuk paralel.Namun,
itu bukan masalah inti kami di sini .
Ini terjadi selanjutnya:
Selanjutnya, dengan
mempertimbangkan biaya komputasi
SpectralClustering()
, yang akan digunakan di sini untuk menggunakan kernel Radial Boltzmann Function,~ exp( -gamma * distance( data, data )**2 )
tampaknya tidak ada kemajuan dari pemisahandata
-jumlah lebih dari sejumlah unit kerja yang terpisah, karenadistance( data, data )
-komponen, menurut definisi, memiliki kunjungi semua-data
elemen (ref. biaya komunikasi dari{ process | node }
topologi apa saja-ke-setiap nilai-lewat- terdistribusi adalah, untuk alasan yang jelas, sangat buruk jika bukan kasus penggunaan terburuk untuk{ process | node }
-distribusi proses, jika bukan anti-pola lurus (Kecuali untuk beberapa kain memang misterius, memori-kurang / negara-kurang, belum komputasi).Untuk analis yang bertele-tele, ya - tambahkan ini (dan kita mungkin sudah mengatakan keadaan yang buruk ) biaya - lagi - apa saja untuk setiap k-berarti- pemrosesan, di sini tentang
O( N^( 1 + 5 * 5 ) )
itu berlaku, karenaN ~ len( data ) ~ 1.12E6+
, sangat bertentangan dengan keinginan kami untuk memiliki beberapa proses cerdas dan cepat.Terus?
Meskipun biaya setup tidak diabaikan, peningkatan biaya komunikasi hampir pasti akan menonaktifkan setiap perbaikan dari menggunakan upaya sketsa di atas untuk beralih dari
[SERIAL]
aliran proses murni ke beberapa bentuk hanya -[CONCURRENT]
atau True-[PARALLEL]
orkestrasi dari beberapa sub-unit kerja , karena peningkatan overhead yang terkait dengan keharusan untuk mengimplementasikan (sepasang tandem) topologi lewat-nilai apa saja.Jika bukan karena mereka?
Nah, ini kedengarannya sebagai oxymoron Ilmu Komputer - bahkan jika itu mungkin, biaya dari setiap-ke-setiap jarak yang telah dihitung sebelumnya (yang akan mengambil
[TIME]
biaya kompleksitas-domain besar "sebelumnya" (Di mana? Bagaimana? Apakah ada lainnya, latensi yang tidak dapat dihindarkan, yang memungkinkan kemungkinan latensi ditutupi oleh beberapa (penambahan yang tidak diketahui) penambahan matriks jarak any-to-any yang lengkap di masa depan?)) tetapi akan memposisikan ulang biaya yang pada prinsipnya hadir ini ke beberapa lokasi lain di[TIME]
- dan[SPACE]
-Domain, bukan mengurangi mereka.Satu-satunya, saya off menyadari sejauh ini, adalah untuk mencoba, jika masalahnya adalah mungkin untuk mendapatkan kembali dirumuskan menjadi lain, seorang QUBO-dirumuskan, masalah mode (ref .: Q uantum- U nconstrained- B inary- O ptimisation , kabar baiknya adalah alat untuk melakukannya, basis pengetahuan tangan pertama dan pengalaman pemecahan masalah praktis ada dan tumbuh lebih besar)
Performanya menakjubkan - Masalah yang diformulasikan QUBO memiliki
O(1)
pemecah (!) Yang menjanjikan dalam waktu konstan (dalam[TIME]
-Domain) dan agak terbatas di[SPACE]
-Domain (di mana baru-baru ini diumumkan trik LLNL dapat membantu menghindari dunia fisik ini, implementasi QPU saat ini, kendala masalah ukuran).sumber
Ini bukan jawaban, tapi ...
Jika Anda berlari
(Yaitu, dengan Panda saja), Anda akan melihat bahwa Anda sudah menggunakan beberapa inti. Ini karena
sklearn
penggunaanjoblib
secara default untuk memparalelkan pekerjaan. Anda dapat menukar penjadwal dengan Dask dan mungkin mendapatkan lebih banyak efisiensi daripada berbagi data di antara utas, tetapi selama pekerjaan yang Anda lakukan terikat pada CPU seperti ini, tidak akan ada yang dapat Anda lakukan untuk mempercepatnya.Singkatnya, ini adalah masalah algoritma: cari tahu apa yang Anda benar-benar perlu hitung, sebelum mencoba mempertimbangkan kerangka kerja yang berbeda untuk menghitungnya.
sumber
joblib
-spawned , yang tidak ada hubungannya dengan utas, semakin sedikit dengan berbagi? Terima kasih atas klarifikasi argumen Anda.Saya bukan ahli
Dask
, tetapi saya memberikan kode berikut sebagai data dasar:sumber