Untuk membuat kode saya lebih "pythonic" dan lebih cepat, saya menggunakan "multiprocessing" dan fungsi peta untuk mengirimkannya a) fungsi dan b) jangkauan iterasi.
Solusi yang ditanamkan (yaitu, panggil tqdm langsung pada kisaran tqdm.tqdm (kisaran (0, 30)) tidak bekerja dengan multiprosesing (seperti yang dirumuskan dalam kode di bawah).
Bilah kemajuan ditampilkan dari 0 hingga 100% (ketika python membaca kode?) Tetapi itu tidak menunjukkan kemajuan sebenarnya dari fungsi peta.
Bagaimana cara menampilkan bilah kemajuan yang menunjukkan pada langkah mana fungsi 'peta' itu?
from multiprocessing import Pool
import tqdm
import time
def _foo(my_number):
square = my_number * my_number
time.sleep(1)
return square
if __name__ == '__main__':
p = Pool(2)
r = p.map(_foo, tqdm.tqdm(range(0, 30)))
p.close()
p.join()
Setiap bantuan atau saran dipersilakan ...
.starmap()
: Berikut adalah tambalan untukPool
ditambahkan.istarmap()
, yang juga akan berfungsitqdm
.Jawaban:
Gunakan imap daripada map, yang mengembalikan iterator dari nilai yang diproses.
from multiprocessing import Pool import tqdm import time def _foo(my_number): square = my_number * my_number time.sleep(1) return square if __name__ == '__main__': with Pool(2) as p: r = list(tqdm.tqdm(p.imap(_foo, range(30)), total=30))
sumber
starmap()
?for i in tqdm.tqdm(...): pass
mungkin lebih lurus ke depan, bahwalist(tqdm.tqdm)
chunk_size
darip.imap
. Bisakahtqdm
memperbarui setiap iterasi alih-alih setiap potongan?Solusi Ditemukan: Hati-hati! Karena multiprosesing, waktu estimasi (iterasi per loop, total waktu, dll.) Dapat menjadi tidak stabil, tetapi bilah kemajuan berfungsi dengan sempurna.
Catatan: Manajer konteks untuk Pool hanya tersedia dari Python versi 3.3
from multiprocessing import Pool import time from tqdm import * def _foo(my_number): square = my_number * my_number time.sleep(1) return square if __name__ == '__main__': with Pool(processes=2) as p: max_ = 30 with tqdm(total=max_) as pbar: for i, _ in enumerate(p.imap_unordered(_foo, range(0, max_))): pbar.update()
sumber
pbar.close()
tidak diperlukan, itu akan ditutup secara otomatis pada penghentianwith
tqdm
panggilan batin / kedua diperlukan di sini?starmap()
?imap_unordered
adalah kuncinya di sini, ini memberikan kinerja terbaik dan perkiraan bilah kemajuan terbaik.Maaf karena terlambat tetapi jika yang Anda butuhkan hanyalah peta bersamaan, versi terbaru (
tqdm>=4.42.0
) sekarang memiliki bawaan ini:from tqdm.contrib.concurrent import process_map # or thread_map import time def _foo(my_number): square = my_number * my_number time.sleep(1) return square if __name__ == '__main__': r = process_map(_foo, range(0, 30), max_workers=2)
Referensi: https://tqdm.github.io/docs/contrib.concurrent/ dan https://github.com/tqdm/tqdm/blob/master/examples/parallel_bars.py
sumber
HBox(children=(FloatProgress(value=0.0, max=30.0), HTML(value='')))
di JupyterAnda bisa menggunakan
p_tqdm
sebagai gantinya.https://github.com/swansonk14/p_tqdm
from p_tqdm import p_map import time def _foo(my_number): square = my_number * my_number time.sleep(1) return square if __name__ == '__main__': r = p_map(_foo, list(range(0, 30)))
sumber
pip install
. Ini menggantikan tqdm untuk sebagian besar kebutuhan sayap_tqdm
terbatas padamultiprocessing.Pool
, tidak tersedia untuk utasberdasarkan jawaban Xavi Martínez saya menulis fungsinya
imap_unordered_bar
. Ini dapat digunakan dengan cara yang sama sepertiimap_unordered
dengan satu-satunya perbedaan yang ditampilkan bilah pemrosesan.from multiprocessing import Pool import time from tqdm import * def imap_unordered_bar(func, args, n_processes = 2): p = Pool(n_processes) res_list = [] with tqdm(total = len(args)) as pbar: for i, res in tqdm(enumerate(p.imap_unordered(func, args))): pbar.update() res_list.append(res) pbar.close() p.close() p.join() return res_list def _foo(my_number): square = my_number * my_number time.sleep(1) return square if __name__ == '__main__': result = imap_unordered_bar(_foo, range(5))
sumber
Inilah pendapat saya tentang saat Anda perlu mendapatkan hasil dari fungsi eksekusi paralel Anda. Fungsi ini melakukan beberapa hal (ada kiriman saya yang lain yang menjelaskannya lebih lanjut) tetapi intinya adalah ada tugas yang menunggu antrian dan tugas selesai antrian. Saat pekerja selesai dengan setiap tugas dalam antrian tertunda mereka menambahkan hasil dalam antrian tugas selesai. Anda dapat membungkus pemeriksaan ke antrian tugas selesai dengan bilah kemajuan tqdm. Saya tidak menempatkan implementasi fungsi do_work () di sini, itu tidak relevan, karena pesan di sini adalah untuk memantau antrian tugas yang diselesaikan dan memperbarui bilah kemajuan setiap kali ada hasil.
def par_proc(job_list, num_cpus=None, verbose=False): # Get the number of cores if not num_cpus: num_cpus = psutil.cpu_count(logical=False) print('* Parallel processing') print('* Running on {} cores'.format(num_cpus)) # Set-up the queues for sending and receiving data to/from the workers tasks_pending = mp.Queue() tasks_completed = mp.Queue() # Gather processes and results here processes = [] results = [] # Count tasks num_tasks = 0 # Add the tasks to the queue for job in job_list: for task in job['tasks']: expanded_job = {} num_tasks = num_tasks + 1 expanded_job.update({'func': pickle.dumps(job['func'])}) expanded_job.update({'task': task}) tasks_pending.put(expanded_job) # Set the number of workers here num_workers = min(num_cpus, num_tasks) # We need as many sentinels as there are worker processes so that ALL processes exit when there is no more # work left to be done. for c in range(num_workers): tasks_pending.put(SENTINEL) print('* Number of tasks: {}'.format(num_tasks)) # Set-up and start the workers for c in range(num_workers): p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed, verbose)) p.name = 'worker' + str(c) processes.append(p) p.start() # Gather the results completed_tasks_counter = 0 with tqdm(total=num_tasks) as bar: while completed_tasks_counter < num_tasks: results.append(tasks_completed.get()) completed_tasks_counter = completed_tasks_counter + 1 bar.update(completed_tasks_counter) for p in processes: p.join() return results
sumber
import multiprocessing as mp import tqdm some_iterable = ... def some_func(): # your logic ... if __name__ == '__main__': with mp.Pool(mp.cpu_count()-2) as p: list(tqdm.tqdm(p.imap(some_func, iterable), total=len(iterable)))
sumber
Pendekatan ini sederhana dan berhasil.
from multiprocessing.pool import ThreadPool import time from tqdm import tqdm def job(): time.sleep(1) pbar.update() pool = ThreadPool(5) with tqdm(total=100) as pbar: for i in range(100): pool.apply_async(job) pool.close() pool.join()
sumber