Saya sering perlu menerapkan fungsi ke grup yang sangat besar DataFrame
(tipe data campuran) dan ingin memanfaatkan beberapa inti.
Saya dapat membuat iterator dari grup dan menggunakan modul multiprosesing, tetapi itu tidak efisien karena setiap grup dan hasil fungsi harus diawetkan untuk pengiriman pesan antar proses.
Apakah ada cara untuk menghindari pengawetan atau bahkan menghindari penyalinan DataFrame
seluruhnya? Sepertinya fungsi memori bersama dari modul multiprosesing terbatas pada numpy
array. Apakah ada pilihan lain?
python
pandas
multiprocessing
shared-memory
pengguna2303
sumber
sumber
Jawaban:
Dari komentar di atas, sepertinya ini sudah direncanakan untuk
pandas
beberapa waktu (ada jugarosetta
proyek yang terlihat menarik yang baru saja saya perhatikan).Namun, sampai setiap fungsionalitas paralel digabungkan
pandas
, saya perhatikan bahwa sangat mudah untuk menulis augmentasi paralel yang efisien & non-penyalinan memori untukpandas
secara langsung menggunakancython
+ OpenMP dan C ++.Berikut adalah contoh singkat penulisan groupby-sum paralel, yang penggunaannya kira-kira seperti ini:
import pandas as pd import para_group_demo df = pd.DataFrame({'a': [1, 2, 1, 2, 1, 1, 0], 'b': range(7)}) print para_group_demo.sum(df.a, df.b)
dan keluarannya adalah:
sum key 0 6 1 11 2 4
Catatan Tidak diragukan lagi, fungsi contoh sederhana ini pada akhirnya akan menjadi bagian dari
pandas
. Beberapa hal, bagaimanapun, akan lebih alami untuk diparalelkan dalam C ++ untuk beberapa waktu, dan penting untuk menyadari betapa mudahnya menggabungkan inipandas
.Untuk melakukan ini, saya menulis ekstensi file sumber tunggal sederhana yang kodenya mengikuti.
Ini dimulai dengan beberapa definisi impor dan tipe
from libc.stdint cimport int64_t, uint64_t from libcpp.vector cimport vector from libcpp.unordered_map cimport unordered_map cimport cython from cython.operator cimport dereference as deref, preincrement as inc from cython.parallel import prange import pandas as pd ctypedef unordered_map[int64_t, uint64_t] counts_t ctypedef unordered_map[int64_t, uint64_t].iterator counts_it_t ctypedef vector[counts_t] counts_vec_t
Tipe C ++
unordered_map
adalah untuk menjumlahkan dengan satu utas, dan thevector
untuk menjumlahkan dengan semua utas.Sekarang ke fungsinya
sum
. Ini dimulai dengan tampilan memori yang diketik untuk akses cepat:def sum(crit, vals): cdef int64_t[:] crit_view = crit.values cdef int64_t[:] vals_view = vals.values
Fungsi ini berlanjut dengan membagi semi-sama ke utas (di sini dikodekan keras menjadi 4), dan meminta setiap utas menjumlahkan entri dalam jangkauannya:
cdef uint64_t num_threads = 4 cdef uint64_t l = len(crit) cdef uint64_t s = l / num_threads + 1 cdef uint64_t i, j, e cdef counts_vec_t counts counts = counts_vec_t(num_threads) counts.resize(num_threads) with cython.boundscheck(False): for i in prange(num_threads, nogil=True): j = i * s e = j + s if e > l: e = l while j < e: counts[i][crit_view[j]] += vals_view[j] inc(j)
Ketika utas selesai, fungsi menggabungkan semua hasil (dari rentang yang berbeda) menjadi satu
unordered_map
:cdef counts_t total cdef counts_it_t it, e_it for i in range(num_threads): it = counts[i].begin() e_it = counts[i].end() while it != e_it: total[deref(it).first] += deref(it).second inc(it)
Yang tersisa hanyalah membuat
DataFrame
dan mengembalikan hasilnya:key, sum_ = [], [] it = total.begin() e_it = total.end() while it != e_it: key.append(deref(it).first) sum_.append(deref(it).second) inc(it) df = pd.DataFrame({'key': key, 'sum': sum_}) df.set_index('key', inplace=True) return df
sumber