Menerapkan fungsi secara efisien ke DataFrame panda yang dikelompokkan secara paralel

89

Saya sering perlu menerapkan fungsi ke grup yang sangat besar DataFrame(tipe data campuran) dan ingin memanfaatkan beberapa inti.

Saya dapat membuat iterator dari grup dan menggunakan modul multiprosesing, tetapi itu tidak efisien karena setiap grup dan hasil fungsi harus diawetkan untuk pengiriman pesan antar proses.

Apakah ada cara untuk menghindari pengawetan atau bahkan menghindari penyalinan DataFrameseluruhnya? Sepertinya fungsi memori bersama dari modul multiprosesing terbatas pada numpyarray. Apakah ada pilihan lain?

pengguna2303
sumber
Sejauh yang saya tahu, tidak ada cara untuk berbagi objek sembarangan. Saya bertanya-tanya, apakah pengawetan membutuhkan lebih banyak waktu, daripada keuntungan melalui multiprosesing. Mungkin Anda harus mencari kemungkinan untuk membuat paket kerja yang lebih besar untuk setiap proses untuk mengurangi waktu pengawetan relatif. Kemungkinan lain adalah menggunakan multiprocessing saat Anda membuat grup.
Sebastian Werk
3
Saya melakukan sesuatu seperti itu tetapi menggunakan UWSGI, Flask dan preforking: Saya memuat pandas dataframe ke dalam sebuah proses, membaginya x kali (menjadikannya objek memori bersama) dan kemudian memanggil proses tersebut dari proses python lain di mana saya menggabungkan hasilnya. atm Saya menggunakan JSON sebagai proses komunikasi, tetapi ini akan datang (namun masih sangat eksperimental): pandas.pydata.org/pandas-docs/dev/io.html#msgpack-experimental
Carst
Omong-omong, apakah Anda pernah melihat HDF5 dengan chunking? (HDF5 tidak disimpan untuk penulisan bersamaan, tetapi Anda juga dapat menyimpan ke file terpisah dan pada akhirnya menggabungkan hal-hal)
Carst
7
ini akan ditargetkan untuk 0,14, lihat masalah ini: github.com/pydata/pandas/issues/5751
Jeff
4
@Jeff didorong ke 0,15 = (
pyCthon

Jawaban:

12

Dari komentar di atas, sepertinya ini sudah direncanakan untuk pandasbeberapa waktu (ada juga rosettaproyek yang terlihat menarik yang baru saja saya perhatikan).

Namun, sampai setiap fungsionalitas paralel digabungkan pandas, saya perhatikan bahwa sangat mudah untuk menulis augmentasi paralel yang efisien & non-penyalinan memori untuk pandassecara langsung menggunakan cython+ OpenMP dan C ++.

Berikut adalah contoh singkat penulisan groupby-sum paralel, yang penggunaannya kira-kira seperti ini:

import pandas as pd
import para_group_demo

df = pd.DataFrame({'a': [1, 2, 1, 2, 1, 1, 0], 'b': range(7)})
print para_group_demo.sum(df.a, df.b)

dan keluarannya adalah:

     sum
key     
0      6
1      11
2      4

Catatan Tidak diragukan lagi, fungsi contoh sederhana ini pada akhirnya akan menjadi bagian dari pandas. Beberapa hal, bagaimanapun, akan lebih alami untuk diparalelkan dalam C ++ untuk beberapa waktu, dan penting untuk menyadari betapa mudahnya menggabungkan ini pandas.


Untuk melakukan ini, saya menulis ekstensi file sumber tunggal sederhana yang kodenya mengikuti.

Ini dimulai dengan beberapa definisi impor dan tipe

from libc.stdint cimport int64_t, uint64_t
from libcpp.vector cimport vector
from libcpp.unordered_map cimport unordered_map

cimport cython
from cython.operator cimport dereference as deref, preincrement as inc
from cython.parallel import prange

import pandas as pd

ctypedef unordered_map[int64_t, uint64_t] counts_t
ctypedef unordered_map[int64_t, uint64_t].iterator counts_it_t
ctypedef vector[counts_t] counts_vec_t

Tipe C ++ unordered_mapadalah untuk menjumlahkan dengan satu utas, dan the vectoruntuk menjumlahkan dengan semua utas.

Sekarang ke fungsinya sum. Ini dimulai dengan tampilan memori yang diketik untuk akses cepat:

def sum(crit, vals):
    cdef int64_t[:] crit_view = crit.values
    cdef int64_t[:] vals_view = vals.values

Fungsi ini berlanjut dengan membagi semi-sama ke utas (di sini dikodekan keras menjadi 4), dan meminta setiap utas menjumlahkan entri dalam jangkauannya:

    cdef uint64_t num_threads = 4
    cdef uint64_t l = len(crit)
    cdef uint64_t s = l / num_threads + 1
    cdef uint64_t i, j, e
    cdef counts_vec_t counts
    counts = counts_vec_t(num_threads)
    counts.resize(num_threads)
    with cython.boundscheck(False):
        for i in prange(num_threads, nogil=True): 
            j = i * s
            e = j + s
            if e > l:
                e = l
            while j < e:
                counts[i][crit_view[j]] += vals_view[j]
                inc(j)

Ketika utas selesai, fungsi menggabungkan semua hasil (dari rentang yang berbeda) menjadi satu unordered_map:

    cdef counts_t total
    cdef counts_it_t it, e_it
    for i in range(num_threads):
        it = counts[i].begin()
        e_it = counts[i].end()
        while it != e_it:
            total[deref(it).first] += deref(it).second
            inc(it)        

Yang tersisa hanyalah membuat DataFramedan mengembalikan hasilnya:

    key, sum_ = [], []
    it = total.begin()
    e_it = total.end()
    while it != e_it:
        key.append(deref(it).first)
        sum_.append(deref(it).second)
        inc(it)

    df = pd.DataFrame({'key': key, 'sum': sum_})
    df.set_index('key', inplace=True)
    return df
Ami Tavory
sumber