Indikator kemajuan selama operasi panda

159

Saya secara teratur melakukan operasi panda pada bingkai data lebih dari 15 juta baris atau lebih dan saya ingin memiliki akses ke indikator kemajuan untuk operasi tertentu.

Apakah ada indikator kemajuan berbasis teks untuk operasi panda split-apply-menggabungkan?

Misalnya, dalam sesuatu seperti:

df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)

di mana feature_rollupada fungsi yang agak terlibat yang mengambil banyak kolom DF dan membuat kolom pengguna baru melalui berbagai metode. Operasi ini dapat memakan waktu cukup lama untuk kerangka data yang besar sehingga saya ingin tahu apakah mungkin untuk memiliki keluaran berbasis teks di notebook iPython yang memperbarui saya tentang perkembangannya.

Sejauh ini, saya sudah mencoba indikator progress loop kanonik untuk Python tetapi mereka tidak berinteraksi dengan panda dengan cara yang berarti.

Saya berharap ada sesuatu yang saya abaikan di perpustakaan panda / dokumentasi yang memungkinkan seseorang untuk mengetahui kemajuan split-apply-menggabungkan. Implementasi sederhana mungkin akan melihat jumlah total himpunan bagian bingkai data yang menjadi dasar applyfungsinya dan melaporkan kemajuan sebagai bagian yang lengkap dari himpunan bagian tersebut.

Apakah ini mungkin sesuatu yang perlu ditambahkan ke perpustakaan?

cwharland
sumber
sudahkah Anda melakukan% prun (profil) pada kode? kadang-kadang Anda dapat melakukan operasi pada seluruh frame sebelum Anda menerapkan untuk menghilangkan kemacetan
Jeff
@ Jeff: Anda bertaruh, saya melakukan itu sebelumnya untuk memeras setiap bit terakhir dari kinerja itu. Masalahnya benar-benar turun ke batas pseudo-mengurangi peta yang saya kerjakan karena barisnya ada dalam puluhan juta jadi saya tidak berharap peningkatan kecepatan super hanya ingin umpan balik pada kemajuan.
cwharland
Pertimbangkan cythonising: pandas.pydata.org/pandas-docs/dev/…
Andy Hayden
@AndyHayden - Ketika saya mengomentari jawaban Anda, implementasi Anda cukup bagus dan menambah sedikit waktu untuk keseluruhan pekerjaan. Saya juga mencatat tiga operasi di dalam rollup fitur yang mendapatkan kembali semua waktu yang sekarang didedikasikan untuk melaporkan perkembangan. Jadi pada akhirnya saya bertaruh saya akan memiliki progress bar dengan pengurangan waktu pemrosesan total jika saya melanjutkan dengan cython pada seluruh fungsi.
cwharland

Jawaban:

279

Karena permintaan populer, tqdmtelah menambahkan dukungan untuk pandas. Tidak seperti jawaban lain, ini tidak akan memperlambat panda - inilah contoh untuk DataFrameGroupBy.progress_apply:

import pandas as pd
import numpy as np
from tqdm import tqdm
# from tqdm.auto import tqdm  # for notebooks

df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))

# Create and register a new `tqdm` instance with `pandas`
# (can use tqdm_gui, optional kwargs, etc.)
tqdm.pandas()

# Now you can use `progress_apply` instead of `apply`
df.groupby(0).progress_apply(lambda x: x**2)

Jika Anda tertarik pada cara kerjanya (dan bagaimana memodifikasinya untuk panggilan balik Anda sendiri), lihat contoh di github , dokumentasi lengkap tentang pypi , atau impor modul dan jalankan help(tqdm).

EDIT


Untuk langsung menjawab pertanyaan awal, ganti:

df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)

dengan:

from tqdm import tqdm
tqdm.pandas()
df_users.groupby(['userID', 'requestDate']).progress_apply(feature_rollup)

Catatan: tqdm <= v4.8 : Untuk versi tqdm di bawah 4.8, tqdm.pandas()Anda tidak harus:

from tqdm import tqdm, tqdm_pandas
tqdm_pandas(tqdm())
casper.dcl
sumber
5
tqdmsebenarnya diciptakan untuk iterables biasa saja awalnya: from tqdm import tqdm; for i in tqdm( range(int(1e8)) ): passDukungan panda adalah hack baru-baru ini saya buat :)
casper.dcl
6
Btw, jika Anda menggunakan notebook Jupyter, Anda juga dapat menggunakan tqdm_notebooks untuk mendapatkan bilah yang lebih cantik. Bersama-sama dengan panda, Anda saat ini harus membuat from tqdm import tqdm_notebook; tqdm_notebook().pandas(*args, **kwargs)
Instantiate
2
Pada versi 4.8.1 - gunakan tqdm.panda () sebagai gantinya. github.com/tqdm/tqdm/commit/…
mork
1
Terima kasih, @mork benar. Kami sedang bekerja (perlahan-lahan) ke arah tqdmv5 yang membuat semuanya lebih termodulasi.
casper.dcl
1
Untuk rekomendasi sintaksis terkini, lihat dokumentasi tqdm Pandas di sini: pypi.python.org/pypi/tqdm#pandas-integration
Manu CJ
18

Untuk men-tweak jawaban Jeff (dan memiliki ini sebagai fungsi yang dapat digunakan kembali).

def logged_apply(g, func, *args, **kwargs):
    step_percentage = 100. / len(g)
    import sys
    sys.stdout.write('apply progress:   0%')
    sys.stdout.flush()

    def logging_decorator(func):
        def wrapper(*args, **kwargs):
            progress = wrapper.count * step_percentage
            sys.stdout.write('\033[D \033[D' * 4 + format(progress, '3.0f') + '%')
            sys.stdout.flush()
            wrapper.count += 1
            return func(*args, **kwargs)
        wrapper.count = 0
        return wrapper

    logged_func = logging_decorator(func)
    res = g.apply(logged_func, *args, **kwargs)
    sys.stdout.write('\033[D \033[D' * 4 + format(100., '3.0f') + '%' + '\n')
    sys.stdout.flush()
    return res

Catatan: berlaku pembaruan persentase kemajuan sebaris . Jika fungsi Anda stdouts maka ini tidak akan berfungsi.

In [11]: g = df_users.groupby(['userID', 'requestDate'])

In [12]: f = feature_rollup

In [13]: logged_apply(g, f)
apply progress: 100%
Out[13]: 
...

Seperti biasa Anda dapat menambahkan ini ke objek grup Anda sebagai metode:

from pandas.core.groupby import DataFrameGroupBy
DataFrameGroupBy.logged_apply = logged_apply

In [21]: g.logged_apply(f)
apply progress: 100%
Out[21]: 
...

Seperti yang disebutkan dalam komentar, ini bukan fitur yang akan diterapkan oleh panda inti. Tapi python memungkinkan Anda untuk membuat ini untuk banyak objek / metode panda (melakukannya akan sedikit kerja ... meskipun Anda harus bisa menggeneralisasi pendekatan ini).

Andy Hayden
sumber
Saya mengatakan "cukup banyak pekerjaan", tetapi Anda mungkin dapat menulis ulang seluruh fungsi ini sebagai dekorator (yang lebih umum).
Andy Hayden
Terima kasih telah memperluas posting Jeff. Saya telah menerapkan keduanya dan perlambatan untuk masing-masing cukup minimal (menambahkan total 1,1 menit ke operasi yang membutuhkan waktu 27 menit untuk menyelesaikannya). Dengan cara ini saya dapat melihat kemajuan dan mengingat sifat adhoc dari operasi ini, saya pikir ini adalah memperlambat yang dapat diterima.
cwharland
Luar biasa, senang itu membantu. Saya benar-benar terkejut dengan perlambatan (ketika saya mencoba sebuah contoh), saya berharap itu akan jauh lebih buruk.
Andy Hayden
1
Untuk lebih menambah efisiensi metode yang diposting, saya malas tentang impor data (panda terlalu bagus dalam menangani csv yang berantakan !!) dan beberapa entri saya (~ 1%) telah benar-benar menghilangkan insersi (pikirkan keseluruhan catatan dimasukkan ke dalam satu bidang). Menghilangkan penyebab ini mempercepat dalam rollup fitur karena tidak ada ambiguitas tentang apa yang harus dilakukan selama operasi split-apply-menggabungkan.
cwharland
1
Saya turun ke 8 menit ... tapi saya menambahkan sesuatu ke rollup fitur (lebih banyak fitur -> AUC lebih baik!). 8 menit ini adalah per chunk (total dua chunk sekarang) dengan masing-masing chunk di lingkungan 12 juta baris. Jadi ya ... 16 menit untuk melakukan operasi besar-besaran pada 24 juta baris menggunakan HDFStore (dan ada hal-hal nltk di rollup fitur). Lumayan bagus. Mari berharap internet tidak menilai saya dari ketidaktahuan awal atau ambivalensi terhadap penyisipan yang kacau =)
cwharland
11

Jika Anda membutuhkan dukungan untuk cara menggunakannya di notebook Jupyter / ipython, seperti yang saya lakukan, berikut adalah panduan dan sumber yang bermanfaat untuk artikel yang relevan :

from tqdm._tqdm_notebook import tqdm_notebook
import pandas as pd
tqdm_notebook.pandas()
df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))
df.groupby(0).progress_apply(lambda x: x**2)

Perhatikan garis bawah dalam pernyataan impor untuk _tqdm_notebook. Seperti yang disebutkan dalam artikel yang dirujuk, pengembangan sedang dalam tahap akhir beta.

Victor Vulovic
sumber
8

Bagi siapa saja yang ingin menerapkan tqdm pada kode panda-berlaku paralel kustom mereka.

(Saya mencoba beberapa perpustakaan untuk paralelisasi selama bertahun-tahun, tetapi saya tidak pernah menemukan solusi paralelisasi 100%, terutama untuk fungsi yang berlaku, dan saya selalu harus kembali untuk kode "manual" saya.)

df_multi_core - ini adalah yang Anda panggil. Ia menerima:

  1. Objek df Anda
  2. Nama fungsi yang ingin Anda panggil
  3. Subset kolom fungsi dapat dilakukan pada (membantu mengurangi waktu / memori)
  4. Jumlah pekerjaan yang harus dijalankan secara paralel (-1 atau menghilangkan semua inti)
  5. Semua kwarg lain yang diterima fungsi df (seperti "sumbu")

_df_split - ini adalah fungsi pembantu internal yang harus diposisikan secara global ke modul yang sedang berjalan (Pool.map adalah "penempatan bergantung"), kalau tidak saya akan menemukannya secara internal ..

inilah kode dari intisari (saya akan menambahkan lebih banyak tes fungsi panda di sana):

import pandas as pd
import numpy as np
import multiprocessing
from functools import partial

def _df_split(tup_arg, **kwargs):
    split_ind, df_split, df_f_name = tup_arg
    return (split_ind, getattr(df_split, df_f_name)(**kwargs))

def df_multi_core(df, df_f_name, subset=None, njobs=-1, **kwargs):
    if njobs == -1:
        njobs = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=njobs)

    try:
        splits = np.array_split(df[subset], njobs)
    except ValueError:
        splits = np.array_split(df, njobs)

    pool_data = [(split_ind, df_split, df_f_name) for split_ind, df_split in enumerate(splits)]
    results = pool.map(partial(_df_split, **kwargs), pool_data)
    pool.close()
    pool.join()
    results = sorted(results, key=lambda x:x[0])
    results = pd.concat([split[1] for split in results])
    return results

Di bawah adalah kode uji untuk penerapan paralel dengan tqdm "progress_apply".

from time import time
from tqdm import tqdm
tqdm.pandas()

if __name__ == '__main__': 
    sep = '-' * 50

    # tqdm progress_apply test      
    def apply_f(row):
        return row['c1'] + 0.1
    N = 1000000
    np.random.seed(0)
    df = pd.DataFrame({'c1': np.arange(N), 'c2': np.arange(N)})

    print('testing pandas apply on {}\n{}'.format(df.shape, sep))
    t1 = time()
    res = df.progress_apply(apply_f, axis=1)
    t2 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for native implementation {}\n{}'.format(round(t2 - t1, 2), sep))

    t3 = time()
    # res = df_multi_core(df=df, df_f_name='apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
    res = df_multi_core(df=df, df_f_name='progress_apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
    t4 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for multi core implementation {}\n{}'.format(round(t4 - t3, 2), sep))

Dalam output Anda dapat melihat 1 progress bar untuk berjalan tanpa paralelisasi, dan progress bar per-core saat berjalan dengan paralelisasi. Ada sedikit hickup dan kadang-kadang sisa core muncul sekaligus, tetapi meskipun begitu saya pikir itu berguna karena Anda mendapatkan statistik kemajuan per inti (itu / detik dan total catatan, misalnya)

masukkan deskripsi gambar di sini

Terima kasih @abcdaa untuk perpustakaan yang luar biasa ini!

mork
sumber
1
Terima kasih @mork - silakan menambahkan ke github.com/tqdm/tqdm/wiki/Bagaimana- membuat-membuat-a- great - Progress - Bar atau membuat halaman baru di github.com/tqdm/tqdm/wiki
casper. dcl
Terima kasih, tetapi harus mengubah bagian ini: try: splits = np.array_split(df[subset], njobs) except ValueError: splits = np.array_split(df, njobs)karena pengecualian KeyError alih-alih ValueError, ubah ke Pengecualian untuk menangani semua kasus.
Marius
Terima kasih @mork - jawaban ini harus lebih tinggi.
Andy
5

Anda dapat dengan mudah melakukan ini dengan dekorator

from functools import wraps 

def logging_decorator(func):

    @wraps
    def wrapper(*args, **kwargs):
        wrapper.count += 1
        print "The function I modify has been called {0} times(s).".format(
              wrapper.count)
        func(*args, **kwargs)
    wrapper.count = 0
    return wrapper

modified_function = logging_decorator(feature_rollup)

lalu gunakan fungsi modified_function (dan ubah ketika Anda ingin mencetak)

Jeff
sumber
1
Peringatan yang jelas karena ini akan memperlambat fungsi Anda! Anda bahkan dapat memperbaruinya dengan progres stackoverflow.com/questions/5426546/… misalnya menghitung / len sebagai persentase.
Andy Hayden
ya - Anda akan memiliki pesanan (jumlah grup), jadi tergantung pada apa hambatan Anda ini mungkin membuat perbedaan
Jeff
mungkin hal intuitif yang harus dilakukan adalah membungkus ini dalam suatu logged_apply(g, func)fungsi, di mana Anda akan memiliki akses ke pesanan, dan dapat login dari awal.
Andy Hayden
Saya melakukan hal di atas dalam jawaban saya, juga pembaruan persentase kurang ajar. Sebenarnya saya tidak bisa mendapatkan milik Anda bekerja ... Saya pikir dengan sedikit membungkus. Jika Anda menggunakannya untuk mendaftar, toh itu tidak begitu penting.
Andy Hayden
1

Saya telah mengubah jawaban Jeff , untuk memasukkan total, sehingga Anda dapat melacak kemajuan dan variabel untuk hanya mencetak setiap iterasi X (ini sebenarnya meningkatkan banyak kinerja, jika "print_at" cukup tinggi)

def count_wrapper(func,total, print_at):

    def wrapper(*args):
        wrapper.count += 1
        if wrapper.count % wrapper.print_at == 0:
            clear_output()
            sys.stdout.write( "%d / %d"%(calc_time.count,calc_time.total) )
            sys.stdout.flush()
        return func(*args)
    wrapper.count = 0
    wrapper.total = total
    wrapper.print_at = print_at

    return wrapper

fungsi clear_output () berasal dari

from IPython.core.display import clear_output

jika tidak pada jawaban IPython Andy Hayden melakukannya tanpa itu

Filipe Silva
sumber