Multiprocessing: gunakan tqdm untuk menampilkan bilah kemajuan

103

Untuk membuat kode saya lebih "pythonic" dan lebih cepat, saya menggunakan "multiprocessing" dan fungsi peta untuk mengirimkannya a) fungsi dan b) jangkauan iterasi.

Solusi yang ditanamkan (yaitu, panggil tqdm langsung pada kisaran tqdm.tqdm (kisaran (0, 30)) tidak bekerja dengan multiprosesing (seperti yang dirumuskan dalam kode di bawah).

Bilah kemajuan ditampilkan dari 0 hingga 100% (ketika python membaca kode?) Tetapi itu tidak menunjukkan kemajuan sebenarnya dari fungsi peta.

Bagaimana cara menampilkan bilah kemajuan yang menunjukkan pada langkah mana fungsi 'peta' itu?

from multiprocessing import Pool
import tqdm
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   p = Pool(2)
   r = p.map(_foo, tqdm.tqdm(range(0, 30)))
   p.close()
   p.join()

Setiap bantuan atau saran dipersilakan ...

SciPy
sumber
Dapatkah Anda memposting cuplikan kode dari bilah kemajuan?
Alex
2
Untuk orang-orang yang mencari solusi dengan .starmap(): Berikut adalah tambalan untuk Poolditambahkan .istarmap(), yang juga akan berfungsi tqdm.
Darkonaut

Jawaban:

136

Gunakan imap daripada map, yang mengembalikan iterator dari nilai yang diproses.

from multiprocessing import Pool
import tqdm
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   with Pool(2) as p:
      r = list(tqdm.tqdm(p.imap(_foo, range(30)), total=30))
hkyi
sumber
14
Pernyataan list () yang melampirkan menunggu iterator berakhir. total = juga diperlukan karena tqdm tidak tahu berapa lama iterasinya,
hkyi
16
Apakah ada solusi serupa untuk starmap()?
tarashypka
2
for i in tqdm.tqdm(...): pass mungkin lebih lurus ke depan, bahwalist(tqdm.tqdm)
savfod
1
Ini berfungsi tetapi apakah ada orang lain yang terus mencetak bilah kemajuan pada baris baru untuk setiap iterasi?
Dennis Subachev
3
Perilaku adalah kabel saat tertentu chunk_sizedari p.imap. Bisakah tqdmmemperbarui setiap iterasi alih-alih setiap potongan?
huangbiubiu
56

Solusi Ditemukan: Hati-hati! Karena multiprosesing, waktu estimasi (iterasi per loop, total waktu, dll.) Dapat menjadi tidak stabil, tetapi bilah kemajuan berfungsi dengan sempurna.

Catatan: Manajer konteks untuk Pool hanya tersedia dari Python versi 3.3

from multiprocessing import Pool
import time
from tqdm import *

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
    with Pool(processes=2) as p:
        max_ = 30
        with tqdm(total=max_) as pbar:
            for i, _ in enumerate(p.imap_unordered(_foo, range(0, max_))):
                pbar.update()
SciPy
sumber
2
pbar.close()tidak diperlukan, itu akan ditutup secara otomatis pada penghentianwith
Sagar Kar
5
Apakah tqdmpanggilan batin / kedua diperlukan di sini?
shadowtalker
7
bagaimana dengan output dari _foo (my_number) yang dikembalikan sebagai "r" yang dimaksud?
Likak
4
Apakah ada solusi serupa untuk starmap()?
tarashypka
2
@shadowtalker - tampaknya bekerja tanpa;). Bagaimanapun - imap_unorderedadalah kuncinya di sini, ini memberikan kinerja terbaik dan perkiraan bilah kemajuan terbaik.
Tomasz Gandor
22

Maaf karena terlambat tetapi jika yang Anda butuhkan hanyalah peta bersamaan, versi terbaru ( tqdm>=4.42.0) sekarang memiliki bawaan ini:

from tqdm.contrib.concurrent import process_map  # or thread_map
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   r = process_map(_foo, range(0, 30), max_workers=2)

Referensi: https://tqdm.github.io/docs/contrib.concurrent/ dan https://github.com/tqdm/tqdm/blob/master/examples/parallel_bars.py

casper.dcl
sumber
1
Terima kasih untuk ini. Bekerja dengan mudah, jauh lebih baik daripada solusi lain yang pernah saya coba.
pengguna3340499
Keren (+1), tetapi melempar HBox(children=(FloatProgress(value=0.0, max=30.0), HTML(value='')))di Jupyter
Ébe Isaac
@ Ébe-Isaac lihat github.com/tqdm/tqdm/issues/937
casper.dcl
Saya melihat masalah dengan diskusi untuk meretas tqdm_notebook, namun, tidak dapat menemukan solusi untuk memecahkan tqdm.contrib.concurrent.
Ébe Isaac
21

Anda bisa menggunakan p_tqdmsebagai gantinya.

https://github.com/swansonk14/p_tqdm

from p_tqdm import p_map
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   r = p_map(_foo, list(range(0, 30)))
Victor Quach
sumber
1
Ini bekerja dengan sangat baik, dan sangat mudah pip install. Ini menggantikan tqdm untuk sebagian besar kebutuhan saya
crypdick
Merci Victor;)
Gabriel Romon
p_tqdmterbatas pada multiprocessing.Pool, tidak tersedia untuk utas
pateheo
8

berdasarkan jawaban Xavi Martínez saya menulis fungsinya imap_unordered_bar. Ini dapat digunakan dengan cara yang sama seperti imap_unordereddengan satu-satunya perbedaan yang ditampilkan bilah pemrosesan.

from multiprocessing import Pool
import time
from tqdm import *

def imap_unordered_bar(func, args, n_processes = 2):
    p = Pool(n_processes)
    res_list = []
    with tqdm(total = len(args)) as pbar:
        for i, res in tqdm(enumerate(p.imap_unordered(func, args))):
            pbar.update()
            res_list.append(res)
    pbar.close()
    p.close()
    p.join()
    return res_list

def _foo(my_number):
    square = my_number * my_number
    time.sleep(1)
    return square 

if __name__ == '__main__':
    result = imap_unordered_bar(_foo, range(5))
Oliver Wilken
sumber
3
Ini akan menggambar ulang bilah di setiap langkah di baris baru. Bagaimana cara memperbarui baris yang sama?
misantroop
Solusi dalam kasus saya (Windows / Powershell): Colorama.
misantroop
'pbar.close () tidak diperlukan, itu akan ditutup secara otomatis pada penghentian dengan' seperti komentar yang dibuat Sagar pada jawaban @ scipy
Tejas Shetty
1

Inilah pendapat saya tentang saat Anda perlu mendapatkan hasil dari fungsi eksekusi paralel Anda. Fungsi ini melakukan beberapa hal (ada kiriman saya yang lain yang menjelaskannya lebih lanjut) tetapi intinya adalah ada tugas yang menunggu antrian dan tugas selesai antrian. Saat pekerja selesai dengan setiap tugas dalam antrian tertunda mereka menambahkan hasil dalam antrian tugas selesai. Anda dapat membungkus pemeriksaan ke antrian tugas selesai dengan bilah kemajuan tqdm. Saya tidak menempatkan implementasi fungsi do_work () di sini, itu tidak relevan, karena pesan di sini adalah untuk memantau antrian tugas yang diselesaikan dan memperbarui bilah kemajuan setiap kali ada hasil.

def par_proc(job_list, num_cpus=None, verbose=False):

# Get the number of cores
if not num_cpus:
    num_cpus = psutil.cpu_count(logical=False)

print('* Parallel processing')
print('* Running on {} cores'.format(num_cpus))

# Set-up the queues for sending and receiving data to/from the workers
tasks_pending = mp.Queue()
tasks_completed = mp.Queue()

# Gather processes and results here
processes = []
results = []

# Count tasks
num_tasks = 0

# Add the tasks to the queue
for job in job_list:
    for task in job['tasks']:
        expanded_job = {}
        num_tasks = num_tasks + 1
        expanded_job.update({'func': pickle.dumps(job['func'])})
        expanded_job.update({'task': task})
        tasks_pending.put(expanded_job)

# Set the number of workers here
num_workers = min(num_cpus, num_tasks)

# We need as many sentinels as there are worker processes so that ALL processes exit when there is no more
# work left to be done.
for c in range(num_workers):
    tasks_pending.put(SENTINEL)

print('* Number of tasks: {}'.format(num_tasks))

# Set-up and start the workers
for c in range(num_workers):
    p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed, verbose))
    p.name = 'worker' + str(c)
    processes.append(p)
    p.start()

# Gather the results
completed_tasks_counter = 0

with tqdm(total=num_tasks) as bar:
    while completed_tasks_counter < num_tasks:
        results.append(tasks_completed.get())
        completed_tasks_counter = completed_tasks_counter + 1
        bar.update(completed_tasks_counter)

for p in processes:
    p.join()

return results
Nick B.
sumber
0
import multiprocessing as mp
import tqdm


some_iterable = ...

def some_func():
    # your logic
    ...


if __name__ == '__main__':
    with mp.Pool(mp.cpu_count()-2) as p:
        list(tqdm.tqdm(p.imap(some_func, iterable), total=len(iterable)))
orang gelap
sumber
-2

Pendekatan ini sederhana dan berhasil.

from multiprocessing.pool import ThreadPool
import time
from tqdm import tqdm

def job():
    time.sleep(1)
    pbar.update()

pool = ThreadPool(5)
with tqdm(total=100) as pbar:
    for i in range(100):
        pool.apply_async(job)
    pool.close()
    pool.join()
Vijayabhaskar J
sumber