Saya mengalami banyak kesulitan untuk mencoba memahami bagaimana antrian multiprosesing bekerja pada python dan bagaimana menerapkannya. Katakanlah saya memiliki dua modul python yang mengakses data dari file bersama, sebut saja dua modul ini sebagai penulis dan pembaca. Rencana saya adalah meminta pembaca dan penulis memasukkan permintaan ke dalam dua antrian multiprosesing yang terpisah, dan kemudian memiliki proses ketiga untuk memasukkan permintaan ini dalam satu lingkaran dan mengeksekusi seperti itu.
Masalah utama saya adalah bahwa saya benar-benar tidak tahu bagaimana menerapkan multiprocessing.queue dengan benar, Anda tidak dapat benar-benar membuat instance objek untuk setiap proses karena mereka akan menjadi antrian terpisah, bagaimana Anda memastikan bahwa semua proses terkait dengan antrian bersama (atau dalam hal ini, antrian)
sumber
Jawaban:
Ini adalah contoh sederhana dari seorang pembaca dan penulis yang berbagi satu antrian ... Penulis mengirimkan sekumpulan bilangan bulat ke pembaca; ketika penulis kehabisan angka, ia mengirimkan 'SELESAI', yang memungkinkan pembaca tahu untuk keluar dari loop baca.
from multiprocessing import Process, Queue import time import sys def reader_proc(queue): ## Read from the queue; this will be spawned as a separate Process while True: msg = queue.get() # Read from the queue and do nothing if (msg == 'DONE'): break def writer(count, queue): ## Write to the queue for ii in range(0, count): queue.put(ii) # Write 'count' numbers into the queue queue.put('DONE') if __name__=='__main__': pqueue = Queue() # writer() writes to pqueue from _this_ process for count in [10**4, 10**5, 10**6]: ### reader_proc() reads from pqueue as a separate process reader_p = Process(target=reader_proc, args=((pqueue),)) reader_p.daemon = True reader_p.start() # Launch reader_proc() as a separate python process _start = time.time() writer(count, pqueue) # Send a lot of stuff to reader() reader_p.join() # Wait for the reader to finish print("Sending {0} numbers to Queue() took {1} seconds".format(count, (time.time() - _start)))
sumber
di "
from queue import Queue
" tidak ada modul yang dipanggilqueue
, sebagai gantinyamultiprocessing
harus digunakan. Oleh karena itu, ini akan terlihat seperti "from multiprocessing import Queue
"sumber
multiprocessing.Queue
sudah benar. NormalQueue.Queue
digunakan untuk utas python . Ketika Anda mencoba menggunakanQueue.Queue
dengan multiprosesing, salinan objek Antrian akan dibuat di setiap proses anak dan proses anak tidak akan pernah diperbarui. Pada dasarnya,Queue.Queue
bekerja dengan menggunakan objek bersama global, danmultiprocessing.Queue
bekerja menggunakan IPC. Lihat: stackoverflow.com/questions/925100/…Berikut adalah penggunaan sederhana
multiprocessing.Queue
danmultiprocessing.Process
yang memungkinkan pemanggil untuk mengirim "peristiwa" plus argumen ke proses terpisah yang mengirimkan peristiwa ke metode "do_" pada proses tersebut. (Python 3.4+)import multiprocessing as mp import collections Msg = collections.namedtuple('Msg', ['event', 'args']) class BaseProcess(mp.Process): """A process backed by an internal queue for simple one-way message passing. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.queue = mp.Queue() def send(self, event, *args): """Puts the event and args as a `Msg` on the queue """ msg = Msg(event, args) self.queue.put(msg) def dispatch(self, msg): event, args = msg handler = getattr(self, "do_%s" % event, None) if not handler: raise NotImplementedError("Process has no handler for [%s]" % event) handler(*args) def run(self): while True: msg = self.queue.get() self.dispatch(msg)
Pemakaian:
class MyProcess(BaseProcess): def do_helloworld(self, arg1, arg2): print(arg1, arg2) if __name__ == "__main__": process = MyProcess() process.start() process.send('helloworld', 'hello', 'world')
The
send
terjadi dalam proses induk, yangdo_*
terjadi dalam proses anak.Saya meninggalkan penanganan pengecualian apa pun yang jelas akan mengganggu putaran run dan keluar dari proses anak. Anda juga dapat menyesuaikannya dengan mengganti
run
ke kontrol pemblokiran atau apa pun.Ini benar-benar hanya berguna dalam situasi di mana Anda memiliki proses pekerja tunggal, tetapi saya pikir ini adalah jawaban yang relevan untuk pertanyaan ini untuk menunjukkan skenario umum dengan sedikit lebih berorientasi objek.
sumber
Saya telah melihat beberapa jawaban di seluruh stack overflow dan web saat mencoba menyiapkan cara melakukan multiprosesing menggunakan antrean untuk menyebarkan kerangka data panda besar. Tampak bagi saya bahwa setiap jawaban mengulangi jenis solusi yang sama tanpa mempertimbangkan banyaknya kasus tepi yang pasti akan ditemukan seseorang saat menyiapkan penghitungan seperti ini. Masalahnya adalah ada banyak hal yang berperan pada saat yang bersamaan. Jumlah tugas, jumlah pekerja, durasi setiap tugas dan kemungkinan pengecualian selama pelaksanaan tugas. Semua ini membuat sinkronisasi menjadi rumit dan sebagian besar jawaban tidak membahas bagaimana Anda dapat melakukannya. Jadi ini adalah pendapat saya setelah bermain-main selama beberapa jam, semoga ini cukup umum bagi kebanyakan orang untuk menganggapnya berguna.
Beberapa pemikiran sebelum contoh pengkodean. Karena
queue.Empty
atauqueue.qsize()
metode serupa lainnya tidak dapat diandalkan untuk kontrol aliran, kode sejenisnyawhile True: try: task = pending_queue.get_nowait() except queue.Empty: break
itu palsu. Ini akan mematikan pekerja bahkan jika milidetik kemudian tugas lain muncul dalam antrian. Pekerja tidak akan pulih dan setelah beberapa saat SEMUA pekerja akan menghilang karena mereka secara acak menemukan antrian kosong untuk sementara. Hasil akhirnya adalah fungsi multiprosesing utama (yang memiliki join () pada proses) akan kembali tanpa semua tugas diselesaikan. Bagus. Semoga berhasil melakukan debug melalui itu jika Anda memiliki ribuan tugas dan beberapa hilang.
Masalah lainnya adalah penggunaan nilai sentinel. Banyak orang menyarankan untuk menambahkan nilai sentinel dalam antrian untuk menandai akhir antrian. Tetapi untuk menandainya kepada siapa sebenarnya? Jika ada N pekerja, dengan asumsi N adalah jumlah inti yang tersedia memberi atau menerima, maka satu nilai sentinel hanya akan menandai akhir antrian untuk satu pekerja. Semua pekerja lainnya akan duduk menunggu lebih banyak pekerjaan ketika tidak ada yang tersisa. Contoh khas yang pernah saya lihat adalah
while True: task = pending_queue.get() if task == SOME_SENTINEL_VALUE: break
Satu pekerja akan mendapatkan nilai sentinel sedangkan pekerja lainnya akan menunggu tanpa batas. Tidak ada pos yang saya temukan menyebutkan bahwa Anda perlu mengirimkan nilai sentinel ke antrean SETIDAKNYA sebanyak Anda memiliki pekerja sehingga SEMUA mereka mendapatkannya.
Masalah lainnya adalah penanganan pengecualian selama eksekusi tugas. Sekali lagi ini harus ditangkap dan dikelola. Selain itu, jika Anda memiliki
completed_tasks
antrian, Anda harus menghitung secara deterministik berapa banyak item dalam antrian sebelum Anda memutuskan bahwa pekerjaan telah selesai. Sekali lagi mengandalkan ukuran antrian pasti akan gagal dan mengembalikan hasil yang tidak diharapkan.Dalam contoh di bawah ini,
par_proc()
fungsi akan menerima daftar tugas termasuk fungsi yang harus dieksekusi bersama dengan argumen dan nilai bernama apa pun.import multiprocessing as mp import dill as pickle import queue import time import psutil SENTINEL = None def do_work(tasks_pending, tasks_completed): # Get the current worker's name worker_name = mp.current_process().name while True: try: task = tasks_pending.get_nowait() except queue.Empty: print(worker_name + ' found an empty queue. Sleeping for a while before checking again...') time.sleep(0.01) else: try: if task == SENTINEL: print(worker_name + ' no more work left to be done. Exiting...') break print(worker_name + ' received some work... ') time_start = time.perf_counter() work_func = pickle.loads(task['func']) result = work_func(**task['task']) tasks_completed.put({work_func.__name__: result}) time_end = time.perf_counter() - time_start print(worker_name + ' done in {} seconds'.format(round(time_end, 5))) except Exception as e: print(worker_name + ' task failed. ' + str(e)) tasks_completed.put({work_func.__name__: None}) def par_proc(job_list, num_cpus=None): # Get the number of cores if not num_cpus: num_cpus = psutil.cpu_count(logical=False) print('* Parallel processing') print('* Running on {} cores'.format(num_cpus)) # Set-up the queues for sending and receiving data to/from the workers tasks_pending = mp.Queue() tasks_completed = mp.Queue() # Gather processes and results here processes = [] results = [] # Count tasks num_tasks = 0 # Add the tasks to the queue for job in job_list: for task in job['tasks']: expanded_job = {} num_tasks = num_tasks + 1 expanded_job.update({'func': pickle.dumps(job['func'])}) expanded_job.update({'task': task}) tasks_pending.put(expanded_job) # Use as many workers as there are cores (usually chokes the system so better use less) num_workers = num_cpus # We need as many sentinels as there are worker processes so that ALL processes exit when there is no more # work left to be done. for c in range(num_workers): tasks_pending.put(SENTINEL) print('* Number of tasks: {}'.format(num_tasks)) # Set-up and start the workers for c in range(num_workers): p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed)) p.name = 'worker' + str(c) processes.append(p) p.start() # Gather the results completed_tasks_counter = 0 while completed_tasks_counter < num_tasks: results.append(tasks_completed.get()) completed_tasks_counter = completed_tasks_counter + 1 for p in processes: p.join() return results
Dan berikut ini adalah tes untuk menjalankan kode di atas
def test_parallel_processing(): def heavy_duty1(arg1, arg2, arg3): return arg1 + arg2 + arg3 def heavy_duty2(arg1, arg2, arg3): return arg1 * arg2 * arg3 task_list = [ {'func': heavy_duty1, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]}, {'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]}, ] results = par_proc(task_list) job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())]) job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())]) assert job1 == 15 assert job2 == 21
ditambah satu sama lain dengan beberapa pengecualian
def test_parallel_processing_exceptions(): def heavy_duty1_raises(arg1, arg2, arg3): raise ValueError('Exception raised') return arg1 + arg2 + arg3 def heavy_duty2(arg1, arg2, arg3): return arg1 * arg2 * arg3 task_list = [ {'func': heavy_duty1_raises, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]}, {'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]}, ] results = par_proc(task_list) job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())]) job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())]) assert not job1 assert job2 == 21
Semoga bermanfaat.
sumber
Kami mengimplementasikan dua versi ini, satu kumpulan multi utas sederhana yang dapat mengeksekusi banyak jenis callable, membuat hidup kami jauh lebih mudah dan versi kedua yang menggunakan proses , yang kurang fleksibel dalam hal callable dan memerlukan dan panggilan tambahan ke dill.
Menyetel frozen_pool ke true akan membekukan eksekusi hingga finish_pool_queue dipanggil di salah satu kelas.
Versi Benang:
''' Created on Nov 4, 2019 @author: Kevin ''' from threading import Lock, Thread from Queue import Queue import traceback from helium.loaders.loader_retailers import print_info from time import sleep import signal import os class ThreadPool(object): def __init__(self, queue_threads, *args, **kwargs): self.frozen_pool = kwargs.get('frozen_pool', False) self.print_queue = kwargs.get('print_queue', True) self.pool_results = [] self.lock = Lock() self.queue_threads = queue_threads self.queue = Queue() self.threads = [] for i in range(self.queue_threads): t = Thread(target=self.make_pool_call) t.daemon = True t.start() self.threads.append(t) def make_pool_call(self): while True: if self.frozen_pool: #print '--> Queue is frozen' sleep(1) continue item = self.queue.get() if item is None: break call = item.get('call', None) args = item.get('args', []) kwargs = item.get('kwargs', {}) keep_results = item.get('keep_results', False) try: result = call(*args, **kwargs) if keep_results: self.lock.acquire() self.pool_results.append((item, result)) self.lock.release() except Exception as e: self.lock.acquire() print e traceback.print_exc() self.lock.release() os.kill(os.getpid(), signal.SIGUSR1) self.queue.task_done() def finish_pool_queue(self): self.frozen_pool = False while self.queue.unfinished_tasks > 0: if self.print_queue: print_info('--> Thread pool... %s' % self.queue.unfinished_tasks) sleep(5) self.queue.join() for i in range(self.queue_threads): self.queue.put(None) for t in self.threads: t.join() del self.threads[:] def get_pool_results(self): return self.pool_results def clear_pool_results(self): del self.pool_results[:]
Versi Proses:
''' Created on Nov 4, 2019 @author: Kevin ''' import traceback from helium.loaders.loader_retailers import print_info from time import sleep import signal import os from multiprocessing import Queue, Process, Value, Array, JoinableQueue, Lock,\ RawArray, Manager from dill import dill import ctypes from helium.misc.utils import ignore_exception from mem_top import mem_top import gc class ProcessPool(object): def __init__(self, queue_processes, *args, **kwargs): self.frozen_pool = Value(ctypes.c_bool, kwargs.get('frozen_pool', False)) self.print_queue = kwargs.get('print_queue', True) self.manager = Manager() self.pool_results = self.manager.list() self.queue_processes = queue_processes self.queue = JoinableQueue() self.processes = [] for i in range(self.queue_processes): p = Process(target=self.make_pool_call) p.start() self.processes.append(p) print 'Processes', self.queue_processes def make_pool_call(self): while True: if self.frozen_pool.value: sleep(1) continue item_pickled = self.queue.get() if item_pickled is None: #print '--> Ending' self.queue.task_done() break item = dill.loads(item_pickled) call = item.get('call', None) args = item.get('args', []) kwargs = item.get('kwargs', {}) keep_results = item.get('keep_results', False) try: result = call(*args, **kwargs) if keep_results: self.pool_results.append(dill.dumps((item, result))) else: del call, args, kwargs, keep_results, item, result except Exception as e: print e traceback.print_exc() os.kill(os.getpid(), signal.SIGUSR1) self.queue.task_done() def finish_pool_queue(self, callable=None): self.frozen_pool.value = False while self.queue._unfinished_tasks.get_value() > 0: if self.print_queue: print_info('--> Process pool... %s' % (self.queue._unfinished_tasks.get_value())) if callable: callable() sleep(5) for i in range(self.queue_processes): self.queue.put(None) self.queue.join() self.queue.close() for p in self.processes: with ignore_exception: p.join(10) with ignore_exception: p.terminate() with ignore_exception: del self.processes[:] def get_pool_results(self): return self.pool_results def clear_pool_results(self): del self.pool_results[:]
Telepon dengan:
tp = ThreadPool(queue_threads=2) tp.queue.put({'call': test, 'args': [random.randint(0, 100)]}) tp.finish_pool_queue()
atau
pp = ProcessPool(queue_processes=2) pp.queue.put(dill.dumps({'call': test, 'args': [random.randint(0, 100)]})) pp.queue.put(dill.dumps({'call': test, 'args': [random.randint(0, 100)]})) pp.finish_pool_queue()
sumber
Hanya membuat contoh sederhana dan umum untuk mendemonstrasikan penyampaian pesan melalui Antrian antara 2 program mandiri. Itu tidak langsung menjawab pertanyaan OP tetapi harus cukup jelas menunjukkan konsepnya.
Server:
multiprocessing-queue-manager-server.py
import asyncio import concurrent.futures import multiprocessing import multiprocessing.managers import queue import sys import threading from typing import Any, AnyStr, Dict, Union class QueueManager(multiprocessing.managers.BaseManager): def get_queue(self, ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue: pass def get_queue(ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue: global q if not ident in q: q[ident] = multiprocessing.Queue() return q[ident] q: Dict[Union[AnyStr, int, type(None)], multiprocessing.Queue] = dict() delattr(QueueManager, 'get_queue') def init_queue_manager_server(): if not hasattr(QueueManager, 'get_queue'): QueueManager.register('get_queue', get_queue) def serve(no: int, term_ev: threading.Event): manager: QueueManager with QueueManager(authkey=QueueManager.__name__.encode()) as manager: print(f"Server address {no}: {manager.address}") while not term_ev.is_set(): try: item: Any = manager.get_queue().get(timeout=0.1) print(f"Client {no}: {item} from {manager.address}") except queue.Empty: continue async def main(n: int): init_queue_manager_server() term_ev: threading.Event = threading.Event() executor: concurrent.futures.ThreadPoolExecutor = concurrent.futures.ThreadPoolExecutor() i: int for i in range(n): asyncio.ensure_future(asyncio.get_running_loop().run_in_executor(executor, serve, i, term_ev)) # Gracefully shut down try: await asyncio.get_running_loop().create_future() except asyncio.CancelledError: term_ev.set() executor.shutdown() raise if __name__ == '__main__': asyncio.run(main(int(sys.argv[1])))
Klien:
multiprocessing-queue-manager-client.py
import multiprocessing import multiprocessing.managers import os import sys from typing import AnyStr, Union class QueueManager(multiprocessing.managers.BaseManager): def get_queue(self, ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue: pass delattr(QueueManager, 'get_queue') def init_queue_manager_client(): if not hasattr(QueueManager, 'get_queue'): QueueManager.register('get_queue') def main(): init_queue_manager_client() manager: QueueManager = QueueManager(sys.argv[1], authkey=QueueManager.__name__.encode()) manager.connect() message = f"A message from {os.getpid()}" print(f"Message to send: {message}") manager.get_queue().put(message) if __name__ == '__main__': main()
Pemakaian
Server:
N
adalah bilangan bulat yang menunjukkan berapa banyak server yang harus dibuat. Salin salah satu<server-address-N>
output oleh server dan jadikan itu argumen pertama masing-masingmultiprocessing-queue-manager-client.py
.Klien:
python3 multiprocessing-queue-manager-client.py <server-address-1>
Hasil
Server:
Client 1: <item> from <server-address-1>
Inti: https://gist.github.com/89062d639e40110c61c2f88018a8b0e5
UPD : Membuat paket di sini .
Server:
import ipcq with ipcq.QueueManagerServer(address=ipcq.Address.DEFAULT, authkey=ipcq.AuthKey.DEFAULT) as server: server.get_queue().get()
Klien:
import ipcq client = ipcq.QueueManagerClient(address=ipcq.Address.DEFAULT, authkey=ipcq.AuthKey.DEFAULT) client.get_queue().put('a message')
sumber