Dokumentasi untuk multiprocessing
modul menunjukkan cara meneruskan antrian ke proses yang dimulai multiprocessing.Process
. Tapi bagaimana saya bisa berbagi antrian dengan proses pekerja asynchronous yang dimulai apply_async
? Saya tidak perlu bergabung dinamis atau apa pun, hanya cara bagi pekerja untuk (berulang kali) melaporkan hasil mereka kembali ke pangkalan.
import multiprocessing
def worker(name, que):
que.put("%d is done" % name)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=3)
q = multiprocessing.Queue()
workers = pool.apply_async(worker, (33, q))
Ini gagal dengan:
RuntimeError: Queue objects should only be shared between processes through inheritance
. Saya mengerti apa artinya ini, dan saya memahami saran untuk mewarisi daripada memerlukan pengawetan / pembongkaran (dan semua batasan khusus Windows). Tapi bagaimana cara saya lulus antrian dengan cara yang bekerja? Saya tidak dapat menemukan contoh, dan saya telah mencoba beberapa alternatif yang gagal dalam berbagai cara. Tolonglah?
queue.Queue()
tidak cocok untuk ini?queue.Queue
dibuat untuk threading, menggunakan kunci dalam memori. Dalam lingkungan Multiproses, setiap subproses akan mendapatkan salinanqueue.Queue()
instance-nya sendiri di ruang memorinya, karena subproses tidak berbagi memori (kebanyakan).multiprocessing.Pool
sudah memiliki antrean hasil bersama, tidak perlu melibatkan fileManager.Queue
.Manager.Queue
adalahqueue.Queue
(multithreading-queue) di bawah tenda, terletak pada proses server terpisah dan diekspos melalui proxy. Ini menambah biaya tambahan dibandingkan dengan antrian internal Pool. Berlawanan dengan mengandalkan penanganan hasil asli Pool, hasil dalamManager.Queue
juga tidak dijamin akan dipesan.Proses pekerja tidak dimulai dengan
.apply_async()
, ini sudah terjadi saat Anda membuat contohPool
. Apa yang dimulai saat Anda meneleponpool.apply_async()
adalah "pekerjaan" baru. Proses pekerja kolam menjalankanmultiprocessing.pool.worker
fungsi di bawah tenda. Fungsi ini menangani pemrosesan "tugas" baru yang ditransfer melalui internal PoolPool._inqueue
dan mengirimkan hasil kembali ke induk melaluiPool._outqueue
. Anda ditentukanfunc
akan dieksekusi dalammultiprocessing.pool.worker
.func
hanya memilikireturn
sesuatu dan hasilnya akan otomatis dikirim kembali ke orang tua..apply_async()
segera (secara asinkron) mengembalikanAsyncResult
objek (alias untukApplyResult
). Anda perlu memanggil.get()
(memblokir) pada objek itu untuk menerima hasil sebenarnya. Pilihan lainnya adalah mendaftarkan fungsi callback , yang diaktifkan segera setelah hasilnya siap.from multiprocessing import Pool def busy_foo(i): """Dummy function simulating cpu-bound work.""" for _ in range(int(10e6)): # do stuff pass return i if __name__ == '__main__': with Pool(4) as pool: print(pool._outqueue) # DEMO results = [pool.apply_async(busy_foo, (i,)) for i in range(10)] # `.apply_async()` immediately returns AsyncResult (ApplyResult) object print(results[0]) # DEMO results = [res.get() for res in results] print(f'result: {results}')
Contoh Keluaran:
<multiprocessing.queues.SimpleQueue object at 0x7fa124fd67f0> <multiprocessing.pool.ApplyResult object at 0x7fa12586da20> result: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Catatan: Menentukan
timeout
-parameter untuk.get()
tidak akan menghentikan pemrosesan tugas yang sebenarnya di dalam pekerja, ini hanya membuka blokir induk yang menunggu dengan menaikkan amultiprocessing.TimeoutError
.sumber
error_callback
-parameter untukapply_async
, jadi tidak banyak berubah sejak itu.