Berbagi hasil antrian di antara beberapa proses

96

Dokumentasi untuk multiprocessingmodul menunjukkan cara meneruskan antrian ke proses yang dimulai multiprocessing.Process. Tapi bagaimana saya bisa berbagi antrian dengan proses pekerja asynchronous yang dimulai apply_async? Saya tidak perlu bergabung dinamis atau apa pun, hanya cara bagi pekerja untuk (berulang kali) melaporkan hasil mereka kembali ke pangkalan.

import multiprocessing
def worker(name, que):
    que.put("%d is done" % name)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=3)
    q = multiprocessing.Queue()
    workers = pool.apply_async(worker, (33, q))

Ini gagal dengan: RuntimeError: Queue objects should only be shared between processes through inheritance. Saya mengerti apa artinya ini, dan saya memahami saran untuk mewarisi daripada memerlukan pengawetan / pembongkaran (dan semua batasan khusus Windows). Tapi bagaimana cara saya lulus antrian dengan cara yang bekerja? Saya tidak dapat menemukan contoh, dan saya telah mencoba beberapa alternatif yang gagal dalam berbagai cara. Tolonglah?

alexis
sumber

Jawaban:

138

Coba gunakan multiprocessing.Manager untuk mengelola antrean Anda dan juga membuatnya dapat diakses oleh pekerja yang berbeda.

import multiprocessing
def worker(name, que):
    que.put("%d is done" % name)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=3)
    m = multiprocessing.Manager()
    q = m.Queue()
    workers = pool.apply_async(worker, (33, q))
enderskill
sumber
Itu berhasil, terima kasih! Ada masalah yang tidak terkait dengan panggilan asinkron di kode asli saya, jadi saya juga menyalin perbaikan ke jawaban Anda.
alexis
17
Ada penjelasan kenapa queue.Queue()tidak cocok untuk ini?
mrgloom
1
@mrgloom: queue.Queuedibuat untuk threading, menggunakan kunci dalam memori. Dalam lingkungan Multiproses, setiap subproses akan mendapatkan salinan queue.Queue()instance-nya sendiri di ruang memorinya, karena subproses tidak berbagi memori (kebanyakan).
LeoRochael
1
@alexis Bagaimana cara mendapatkan elemen dari Manajer (). Queue () setelah beberapa pekerja memasukkan data ke dalamnya?
MSS
14

multiprocessing.Poolsudah memiliki antrean hasil bersama, tidak perlu melibatkan file Manager.Queue. Manager.Queueadalah queue.Queue(multithreading-queue) di bawah tenda, terletak pada proses server terpisah dan diekspos melalui proxy. Ini menambah biaya tambahan dibandingkan dengan antrian internal Pool. Berlawanan dengan mengandalkan penanganan hasil asli Pool, hasil dalam Manager.Queuejuga tidak dijamin akan dipesan.

Proses pekerja tidak dimulai dengan .apply_async(), ini sudah terjadi saat Anda membuat contoh Pool. Apa yang dimulai saat Anda menelepon pool.apply_async()adalah "pekerjaan" baru. Proses pekerja kolam menjalankan multiprocessing.pool.workerfungsi di bawah tenda. Fungsi ini menangani pemrosesan "tugas" baru yang ditransfer melalui internal Pool Pool._inqueuedan mengirimkan hasil kembali ke induk melalui Pool._outqueue. Anda ditentukan funcakan dieksekusi dalam multiprocessing.pool.worker. funchanya memiliki returnsesuatu dan hasilnya akan otomatis dikirim kembali ke orang tua.

.apply_async() segera (secara asinkron) mengembalikan AsyncResultobjek (alias untuk ApplyResult). Anda perlu memanggil .get()(memblokir) pada objek itu untuk menerima hasil sebenarnya. Pilihan lainnya adalah mendaftarkan fungsi callback , yang diaktifkan segera setelah hasilnya siap.

from multiprocessing import Pool

def busy_foo(i):
    """Dummy function simulating cpu-bound work."""
    for _ in range(int(10e6)):  # do stuff
        pass
    return i

if __name__ == '__main__':

    with Pool(4) as pool:
        print(pool._outqueue)  # DEMO
        results = [pool.apply_async(busy_foo, (i,)) for i in range(10)]
        # `.apply_async()` immediately returns AsyncResult (ApplyResult) object
        print(results[0])  # DEMO
        results = [res.get() for res in results]
        print(f'result: {results}')       

Contoh Keluaran:

<multiprocessing.queues.SimpleQueue object at 0x7fa124fd67f0>
<multiprocessing.pool.ApplyResult object at 0x7fa12586da20>
result: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Catatan: Menentukan timeout-parameter untuk .get()tidak akan menghentikan pemrosesan tugas yang sebenarnya di dalam pekerja, ini hanya membuka blokir induk yang menunggu dengan menaikkan a multiprocessing.TimeoutError.

Darkonaut
sumber
Menarik, saya akan mencobanya pertama kali mendapat kesempatan. Ini jelas tidak berhasil seperti ini pada tahun 2012.
alexis
@alexis Python 2.7 (2010) secara relevan di sini hanya kehilangan pengelola konteks dan error_callback-parameter untuk apply_async, jadi tidak banyak berubah sejak itu.
Darkonaut
Saya menemukan fungsi panggilan balik menjadi yang paling berguna, terutama bila dikombinasikan dengan fungsi parsial untuk memungkinkan penggunaan daftar biasa untuk mengumpulkan hasil asinkron seperti yang dijelaskan di sini; gist.github.com/Glench/5789879
user5359531