Apakah ada cara mudah untuk menjalankan panda. DataFrame.isin secara paralel?

25

Saya memiliki program pemodelan dan penilaian yang banyak menggunakan DataFrame.isin fungsi panda, mencari melalui daftar facebook "seperti" catatan pengguna individu untuk masing-masing dari beberapa ribu halaman tertentu. Ini adalah bagian yang paling memakan waktu dari program, lebih dari pemodelan atau penilaian, hanya karena hanya berjalan pada satu inti sedangkan sisanya berjalan pada beberapa lusin secara bersamaan.

Meskipun saya tahu saya bisa secara manual memecah dataframe menjadi potongan-potongan dan menjalankan operasi secara paralel, apakah ada cara mudah untuk melakukannya secara otomatis? Dengan kata lain, apakah ada paket di luar sana yang akan mengenali saya menjalankan operasi yang mudah didelegasikan dan secara otomatis mendistribusikannya? Mungkin itu meminta terlalu banyak, tapi saya sudah cukup terkejut di masa lalu oleh apa yang sudah tersedia di Python, jadi saya pikir itu layak ditanyakan.

Ada saran lain tentang bagaimana hal ini dapat dicapai (bahkan jika tidak oleh paket unicorn ajaib!) Juga akan dihargai. Terutama, hanya berusaha mencari cara untuk mengurangi 15-20 menit per kali berjalan tanpa menghabiskan waktu yang sama untuk mengkodekan solusi.

Therriault
sumber
Seberapa besar daftar nilai Anda? Sudahkah Anda mencoba melewatinya sebagai set? Untuk paralelisme, Anda mungkin tertarik pada Joblib. Mudah digunakan dan dapat mempercepat perhitungan. Gunakan dengan potongan data yang besar.
Oao
Opsi lain adalah membingkai ulang masalah Anda sebagai gabungan. Bergabung jauh lebih cepat di Pandas stackoverflow.com/questions/23945493/...
Brian Spiering
Namun pilihan lain adalah menggunakan np.in1d ​​yang juga lebih cepat stackoverflow.com/questions/21738882/fast-pandas-filtering
Brian Spiering

Jawaban:

8

Sayangnya, paralelisasi belum diimplementasikan dalam panda. Anda dapat bergabung dengan masalah github ini jika Anda ingin berpartisipasi dalam pengembangan fitur ini.

Saya tidak tahu "paket unicorn ajaib" untuk tujuan ini, jadi yang terbaik adalah menulis solusi Anda sendiri. Tetapi jika Anda masih tidak ingin menghabiskan waktu untuk hal itu dan ingin belajar sesuatu yang baru - Anda dapat mencoba dua metode yang dibangun ke dalam MongoDB (kerangka pengurangan peta dan gabungkan). Lihat mongodb_agg_framework .

Stanpol
sumber
6

Saya pikir taruhan terbaik Anda adalah rosetta . Saya menemukan ini sangat berguna dan mudah. Periksa metode panda -nya .

Anda bisa mendapatkannya melalui pip .

dmvianna
sumber
Saya akan merekomendasikan mendapatkan rosetta dengan langsung pergi ke GitHub. Itu memastikan Anda mendapatkan versi terbaru. github.com/columbia-applied-data-science/rosetta
Ian Langmore
0

Ada versi yang lebih umum dari pertanyaan ini mengenai paralelisasi pada panda menerapkan fungsi - jadi ini adalah pertanyaan yang menyegarkan :)

Pertama , saya ingin menyebutkan lebih cepat karena Anda meminta solusi "paket", dan itu muncul pada sebagian besar pertanyaan SO mengenai paralelisasi panda.

Tapi .. Saya masih ingin membagikan kode inti pribadi saya untuk itu, karena setelah beberapa tahun bekerja dengan DataFrame saya tidak pernah menemukan solusi paralelisasi 100% (terutama untuk fungsi yang berlaku) dan saya selalu harus kembali untuk " kode "manual.

Terima kasih kepada Anda, saya membuatnya lebih umum untuk mendukung metode DataFrame (secara teoritis) apa pun dengan namanya (jadi Anda tidak perlu menyimpan versi untuk isin, melamar, dll.).

Saya mengujinya pada fungsi "isin", "apply" dan "isna" menggunakan kedua python 2.7 dan 3.6. Itu di bawah 20 baris, dan saya mengikuti konvensi penamaan panda seperti "subset" dan "njobs".

Saya juga menambahkan perbandingan waktu dengan kode setara dask untuk "isin" dan tampaknya ~ X2 kali lebih lambat dari intinya.

Ini mencakup 2 fungsi:

df_multi_core - ini adalah yang Anda panggil. Ia menerima:

  1. Objek df Anda
  2. Nama fungsi yang ingin Anda panggil
  3. Subset kolom fungsi dapat dilakukan (membantu mengurangi waktu / memori)
  4. Jumlah pekerjaan yang dijalankan secara paralel (-1 atau menghilangkan untuk semua core)
  5. Semua kwarg lain yang diterima fungsi df (seperti "sumbu")

_df_split - ini adalah fungsi pembantu internal yang harus diposisikan secara global ke modul yang sedang berjalan (Pool.map adalah "penempatan bergantung"), kalau tidak saya akan menemukannya secara internal ..

inilah kode dari intisari (saya akan menambahkan lebih banyak tes fungsi panda di sana):

import pandas as pd
import numpy as np
import multiprocessing
from functools import partial

def _df_split(tup_arg, **kwargs):
    split_ind, df_split, df_f_name = tup_arg
    return (split_ind, getattr(df_split, df_f_name)(**kwargs))

def df_multi_core(df, df_f_name, subset=None, njobs=-1, **kwargs):
    if njobs == -1:
        njobs = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=njobs)

    try:
        splits = np.array_split(df[subset], njobs)
    except ValueError:
        splits = np.array_split(df, njobs)

    pool_data = [(split_ind, df_split, df_f_name) for split_ind, df_split in enumerate(splits)]
    results = pool.map(partial(_df_split, **kwargs), pool_data)
    pool.close()
    pool.join()
    results = sorted(results, key=lambda x:x[0])
    results = pd.concat([split[1] for split in results])
    return results

Di bawah ini adalah kode uji untuk isin yang diparalelkan , yang membandingkan kinerja asli, multi-core, dan dask. Pada mesin I7 dengan 8 core fisik, saya mendapatkan kecepatan X4 kali. Saya ingin mendengar apa yang Anda dapatkan dari data asli Anda!

from time import time

if __name__ == '__main__': 
    sep = '-' * 50

    # isin test
    N = 10000000
    df = pd.DataFrame({'c1': np.random.randint(low=1, high=N, size=N), 'c2': np.arange(N)})
    lookfor = np.random.randint(low=1, high=N, size=1000000)

    print('{}\ntesting pandas isin on {}\n{}'.format(sep, df.shape, sep))
    t1 = time()
    print('result\n{}'.format(df.isin(lookfor).sum()))
    t2 = time()
    print('time for native implementation {}\n{}'.format(round(t2 - t1, 2), sep))

    t3 = time()
    res = df_multi_core(df=df, df_f_name='isin', subset=['c1'], njobs=-1, values=lookfor)
    print('result\n{}'.format(res.sum()))
    t4 = time()
    print('time for multi core implementation {}\n{}'.format(round(t4 - t3, 2), sep))


    t5 = time()
    ddata = dd.from_pandas(df, npartitions=njobs)
    res = ddata.map_partitions(lambda df: df.apply(apply_f, axis=1)).compute(scheduler='processes')
    t6 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for dask implementation {}\n{}'.format(round(t6 - t5, 2), sep))

--------------------------------------------------
testing pandas isin on (10000000, 2)
--------------------------------------------------
result
c1    953213
c2    951942
dtype: int64
time for native implementation 3.87
--------------------------------------------------
result
c1    953213
dtype: int64
time for multi core implementation 1.16
--------------------------------------------------
result
c1    953213
c2    951942
dtype: int64
time for dask implementation 2.88
mork
sumber
@Therriault Saya menambahkan perbandingan dask dengan isin- sepertinya potongan kode paling efektif dengan 'isin' - ~ X1.75 kali lebih cepat daripada dask (dibandingkan dengan applyfungsi yang hanya mendapat 5% lebih cepat daripada dask)
mork