Saya memiliki skrip yang berhasil melakukan kumpulan tugas multiprosesing dengan imap_unordered()
panggilan:
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion
Namun, saya num_tasks
sekitar 250.000, jadi join()
utas utama terkunci selama 10 detik atau lebih, dan saya ingin dapat menggema ke baris perintah secara bertahap untuk menunjukkan bahwa proses utama tidak terkunci. Sesuatu seperti:
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
remaining = rs.tasks_remaining() # How many of the map call haven't been done yet?
if (remaining == 0): break # Jump out of while loop
print "Waiting for", remaining, "tasks to complete..."
time.sleep(2)
Apakah ada metode untuk objek hasil atau kumpulan itu sendiri yang menunjukkan jumlah tugas yang tersisa? Saya mencoba menggunakan multiprocessing.Value
objek sebagai penghitung ( do_work
memanggil counter.value += 1
tindakan setelah melakukan tugasnya), tetapi penghitung hanya mencapai ~ 85% dari total nilai sebelum berhenti menambah.
python
multiprocessing
MidnightLightning
sumber
sumber
def do_word(*a): time.sleep(.1)
sebagai contoh. Jika tidak berhasil untuk Anda, buat contoh kode minimal lengkap yang mendemonstrasikan masalah Anda: jelaskan menggunakan kata-kata apa yang Anda harapkan akan terjadi dan apa yang terjadi, sebutkan bagaimana Anda menjalankan skrip Python Anda, apa OS Anda, versi Python dan posting sebagai pertanyaan baru .Pool.map()
. Saya tidak menyadarinya hanyaimap()
danimap_unordered()
bekerja dengan cara ini - dokumentasinya hanya mengatakan "Versi lazier dari map ()" tetapi sebenarnya berarti "iterator yang mendasari mengembalikan hasil saat mereka masuk".imap_unordered()
. Masalah Hanan mungkin karenasys.stderr.write('\r..')
(menimpa baris yang sama untuk menunjukkan kemajuan).Favorit pribadi saya - memberi Anda bilah kemajuan kecil yang bagus dan penyelesaian ETA sementara semuanya berjalan dan berjalan secara paralel.
sumber
pip install tqdm
Saya menemukan bahwa pekerjaan tersebut sudah selesai pada saat saya mencoba untuk memeriksa kemajuannya. Inilah yang berhasil untuk saya menggunakan tqdm .
pip install tqdm
Ini harus bekerja dengan semua rasa multiprosesing, apakah mereka memblokir atau tidak.
sumber
Menemukan jawaban sendiri dengan beberapa lebih menggali: Mengambil melihat pada
__dict__
satuimap_unordered
objek hasil, saya menemukan ia memiliki_index
atribut yang bertahap dengan masing-masing penyelesaian tugas. Jadi ini berfungsi untuk logging, dibungkus dalamwhile
loop:Namun, saya menemukan bahwa menukar
imap_unordered
untukmap_async
menghasilkan eksekusi yang jauh lebih cepat, meskipun objek hasilnya sedikit berbeda. Sebagai gantinya, objek hasil darimap_async
memiliki_number_left
atribut, danready()
metode:sumber
rs
diketahui dan agak terlambat atau tidak?rs
telah meluncurkan utas lainnya.rs
dalam lingkaran mana pun, saya pemula multiprocessing dan ini akan membantu. Terima kasih banyak.python 3.5
, solusi yang menggunakan_number_left
tidak berhasil._number_left
mewakili bongkahan yang tersisa untuk diproses. Misalnya, jika saya ingin 50 elemen diteruskan ke fungsi saya secara paralel, maka untuk kumpulan utas dengan 3 proses_map_async()
membuat 10 potongan dengan masing-masing 5 elemen._number_left
kemudian mewakili berapa banyak dari potongan ini yang telah diselesaikan.Saya tahu bahwa ini adalah pertanyaan yang agak lama, tetapi inilah yang saya lakukan ketika saya ingin melacak perkembangan kumpulan tugas dengan python.
Pada dasarnya, Anda menggunakan apply_async dengan callbak (dalam hal ini, ini untuk menambahkan nilai yang dikembalikan ke daftar), jadi Anda tidak perlu menunggu untuk melakukan sesuatu yang lain. Kemudian, dalam loop sementara, Anda memeriksa perkembangan pekerjaan. Dalam hal ini, saya menambahkan widget agar terlihat lebih bagus.
Hasil:
Semoga membantu.
sumber
[pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args]
untuk(pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args)
Seperti yang disarankan oleh Tim, Anda dapat menggunakan
tqdm
danimap
untuk memecahkan masalah ini. Saya baru saja menemukan masalah ini dan mengubahimap_unordered
solusinya, sehingga saya dapat mengakses hasil pemetaan. Begini cara kerjanya:Jika Anda tidak peduli dengan nilai yang dikembalikan dari pekerjaan Anda, Anda tidak perlu menetapkan daftar ke variabel apa pun.
sumber
untuk siapa saja yang mencari solusi sederhana yang bekerja dengan
Pool.apply_async()
:sumber
Saya membuat kelas khusus untuk membuat cetakan kemajuan. Maby ini membantu:
sumber
Coba pendekatan berbasis Antrean sederhana ini, yang juga dapat digunakan dengan penggabungan. Perhatikan bahwa mencetak apa pun setelah dimulainya bilah kemajuan akan menyebabkannya dipindahkan, setidaknya untuk bilah kemajuan khusus ini. (Kemajuan PyPI 1.5)
sumber