Mungkinkah membuat Kolam python yang non-daemonic? Saya ingin sebuah pool dapat memanggil fungsi yang memiliki pool lain di dalamnya.
Saya menginginkan ini karena proses deamon tidak dapat membuat proses. Secara khusus, ini akan menyebabkan kesalahan:
AssertionError: daemonic processes are not allowed to have children
Misalnya, pertimbangkan skenario di mana function_a
memiliki kumpulan yang berjalan function_b
yang memiliki kumpulan yang berjalan function_c
. Rantai fungsi ini akan gagal, karena function_b
dijalankan dalam proses daemon, dan proses daemon tidak dapat membuat proses.
I want a pool to be able to call a function that has another pool inside
dan bagaimana hal itu mengganggu fakta bahwa pekerja di-daemonisasi.AssertionError: daemonic processes are not allowed to have children
Jawaban:
The
multiprocessing.pool.Pool
kelas menciptakan proses pekerja di perusahaan__init__
metode, membuat mereka kejam dan mulai mereka, dan tidak mungkin untuk kembali mengatur merekadaemon
atributFalse
sebelum mereka mulai (dan setelah itu tidak diperbolehkan lagi). Tapi Anda bisa membuat sub-kelas Anda sendirimultiprocesing.pool.Pool
(multiprocessing.Pool
hanya fungsi pembungkus) dan menggantimultiprocessing.Process
sub-kelas Anda sendiri , yang selalu non-daemonik, untuk digunakan untuk proses pekerja.Berikut contoh lengkap tentang cara melakukan ini. Bagian yang penting adalah dua kelas
NoDaemonProcess
danMyPool
di bagian atas dan untuk memanggilpool.close()
danpool.join()
diMyPool
instance Anda di bagian akhir.#!/usr/bin/env python # -*- coding: UTF-8 -*- import multiprocessing # We must import this explicitly, it is not imported by the top-level # multiprocessing module. import multiprocessing.pool import time from random import randint class NoDaemonProcess(multiprocessing.Process): # make 'daemon' attribute always return False def _get_daemon(self): return False def _set_daemon(self, value): pass daemon = property(_get_daemon, _set_daemon) # We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool # because the latter is only a wrapper function, not a proper class. class MyPool(multiprocessing.pool.Pool): Process = NoDaemonProcess def sleepawhile(t): print("Sleeping %i seconds..." % t) time.sleep(t) return t def work(num_procs): print("Creating %i (daemon) workers and jobs in child." % num_procs) pool = multiprocessing.Pool(num_procs) result = pool.map(sleepawhile, [randint(1, 5) for x in range(num_procs)]) # The following is not really needed, since the (daemon) workers of the # child's pool are killed when the child is terminated, but it's good # practice to cleanup after ourselves anyway. pool.close() pool.join() return result def test(): print("Creating 5 (non-daemon) workers and jobs in main process.") pool = MyPool(5) result = pool.map(work, [randint(1, 5) for x in range(5)]) pool.close() pool.join() print(result) if __name__ == '__main__': test()
sumber
multiprocessing.freeze_support()
MyPool
alih-alih defaultPool
? Dengan kata lain, sebagai ganti fleksibilitas memulai proses anak, berapa biaya yang harus saya bayar? (Jika tidak ada biaya, mungkin standarPool
akan menggunakan proses non-daemonik).Pool
kelas telah direfraktor secara ekstensif, jadiProcess
bukan atribut sederhana lagi, tetapi metode, yang mengembalikan contoh proses yang didapat dari konteks . Saya mencoba menimpa metode ini untuk mengembalikan sebuahNoDaemonPool
instance, tetapi ini menghasilkan pengecualianAssertionError: daemonic processes are not allowed to have children
saat Pool digunakan.Saya memiliki kebutuhan untuk menggunakan kumpulan non-daemonik dengan Python 3.7 dan akhirnya mengadaptasi kode yang diposting dalam jawaban yang diterima. Di bawah ini ada cuplikan yang membuat kumpulan non-daemonik:
import multiprocessing.pool class NoDaemonProcess(multiprocessing.Process): @property def daemon(self): return False @daemon.setter def daemon(self, value): pass class NoDaemonContext(type(multiprocessing.get_context())): Process = NoDaemonProcess # We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool # because the latter is only a wrapper function, not a proper class. class NestablePool(multiprocessing.pool.Pool): def __init__(self, *args, **kwargs): kwargs['context'] = NoDaemonContext() super(NestablePool, self).__init__(*args, **kwargs)
Karena implementasi saat
multiprocessing
ini telah difaktorisasi ulang secara ekstensif untuk didasarkan pada konteks, kita perlu menyediakanNoDaemonContext
kelas yang memilikiNoDaemonProcess
atribut as kita .NestablePool
kemudian akan menggunakan konteks itu, bukan yang default.Karena itu, saya harus memperingatkan bahwa setidaknya ada dua peringatan untuk pendekatan ini:
multiprocessing
paket, dan karenanya dapat rusak kapan saja.multiprocessing
sangat sulit menggunakan proses non-daemonik, banyak di antaranya dijelaskan di sini . Yang paling menarik menurut saya adalah:sumber
The multiprocessing modul memiliki antarmuka yang bagus untuk menggunakan kolam dengan proses atau benang. Bergantung pada kasus penggunaan Anda saat ini, Anda mungkin mempertimbangkan
multiprocessing.pool.ThreadPool
untuk menggunakan untuk Pool luar Anda, yang akan menghasilkan utas (yang memungkinkan untuk menelurkan proses dari dalam) sebagai lawan dari proses.Mungkin dibatasi oleh GIL, tetapi dalam kasus khusus saya (saya menguji keduanya) , waktu startup untuk proses dari luar
Pool
seperti yang dibuat di sini jauh melebihi solusinyaThreadPool
.Ini benar-benar mudah untuk pertukaran
Processes
untukThreads
. Baca lebih lanjut tentang cara menggunakanThreadPool
solusi di sini atau di sini .sumber
Pada beberapa versi Python menggantikan standar Renang adat dapat meningkatkan error:
AssertionError: group argument must be None for now
.Di sini saya menemukan solusi yang dapat membantu:
class NoDaemonProcess(multiprocessing.Process): # make 'daemon' attribute always return False @property def daemon(self): return False @daemon.setter def daemon(self, val): pass class NoDaemonProcessPool(multiprocessing.pool.Pool): def Process(self, *args, **kwds): proc = super(NoDaemonProcessPool, self).Process(*args, **kwds) proc.__class__ = NoDaemonProcess return proc
sumber
concurrent.futures.ProcessPoolExecutor
tidak memiliki batasan ini. Itu dapat memiliki kumpulan proses bersarang tanpa masalah sama sekali:from concurrent.futures import ProcessPoolExecutor as Pool from itertools import repeat from multiprocessing import current_process import time def pid(): return current_process().pid def _square(i): # Runs in inner_pool square = i ** 2 time.sleep(i / 10) print(f'{pid()=} {i=} {square=}') return square def _sum_squares(i, j): # Runs in outer_pool with Pool(max_workers=2) as inner_pool: squares = inner_pool.map(_square, (i, j)) sum_squares = sum(squares) time.sleep(sum_squares ** .5) print(f'{pid()=}, {i=}, {j=} {sum_squares=}') return sum_squares def main(): with Pool(max_workers=3) as outer_pool: for sum_squares in outer_pool.map(_sum_squares, range(5), repeat(3)): print(f'{pid()=} {sum_squares=}') if __name__ == "__main__": main()
Kode demonstrasi di atas telah diuji dengan Python 3.8.
Batasannya
ProcessPoolExecutor
, bagaimanapun, adalah tidak adanyamaxtasksperchild
. Jika Anda membutuhkan ini, pertimbangkan jawaban dari Massimiliano sebagai gantinya.Kredit: jawaban oleh jfs
sumber
multiprocessing.Pool
di dalam aProcessPoolExecutor.Pool
juga dimungkinkan!Masalah yang saya temui adalah mencoba mengimpor global antar modul, menyebabkan baris ProcessPool () dievaluasi beberapa kali.
globals.py
from processing import Manager, Lock from pathos.multiprocessing import ProcessPool from pathos.threading import ThreadPool class SingletonMeta(type): def __new__(cls, name, bases, dict): dict['__deepcopy__'] = dict['__copy__'] = lambda self, *args: self return super(SingletonMeta, cls).__new__(cls, name, bases, dict) def __init__(cls, name, bases, dict): super(SingletonMeta, cls).__init__(name, bases, dict) cls.instance = None def __call__(cls,*args,**kw): if cls.instance is None: cls.instance = super(SingletonMeta, cls).__call__(*args, **kw) return cls.instance def __deepcopy__(self, item): return item.__class__.instance class Globals(object): __metaclass__ = SingletonMeta """ This class is a workaround to the bug: AssertionError: daemonic processes are not allowed to have children The root cause is that importing this file from different modules causes this file to be reevalutated each time, thus ProcessPool() gets reexecuted inside that child thread, thus causing the daemonic processes bug """ def __init__(self): print "%s::__init__()" % (self.__class__.__name__) self.shared_manager = Manager() self.shared_process_pool = ProcessPool() self.shared_thread_pool = ThreadPool() self.shared_lock = Lock() # BUG: Windows: global name 'lock' is not defined | doesn't affect cygwin
Kemudian impor dengan aman dari tempat lain di kode Anda
from globals import Globals Globals().shared_manager Globals().shared_process_pool Globals().shared_thread_pool Globals().shared_lock
Saya telah menulis kelas pembungkus yang lebih luas di sekitar
pathos.multiprocessing
sini:Sebagai catatan tambahan, jika kasus penggunaan Anda hanya memerlukan peta multiproses asinkron sebagai pengoptimalan kinerja, maka joblib akan mengelola semua kumpulan proses Anda di belakang layar dan memungkinkan sintaks yang sangat sederhana ini:
squares = Parallel(-1)( delayed(lambda num: num**2)(x) for x in range(100) )
sumber
Saya telah melihat orang-orang berurusan dengan masalah ini dengan menggunakan
celery
garpu yangmultiprocessing
disebut billiard (ekstensi kolam multiprosesing), yang memungkinkan proses daemonik untuk menelurkan anak-anak. Panduannya adalah dengan menggantimultiprocessing
modul dengan:import billiard as multiprocessing
sumber
Ini memberikan solusi untuk kesalahan yang tampaknya positif palsu. Seperti juga dicatat oleh James , ini bisa terjadi pada impor yang tidak disengaja dari proses daemonik.
Misalnya, jika Anda memiliki kode sederhana berikut,
WORKER_POOL
secara tidak sengaja dapat diimpor dari pekerja, yang menyebabkan kesalahan.import multiprocessing WORKER_POOL = multiprocessing.Pool()
Pendekatan sederhana namun dapat diandalkan untuk solusi adalah:
import multiprocessing import multiprocessing.pool class MyClass: @property def worker_pool(self) -> multiprocessing.pool.Pool: # Ref: https://stackoverflow.com/a/63984747/ try: return self._worker_pool # type: ignore except AttributeError: # pylint: disable=protected-access self.__class__._worker_pool = multiprocessing.Pool() # type: ignore return self.__class__._worker_pool # type: ignore # pylint: enable=protected-access
Dalam solusi di atas,
MyClass.worker_pool
dapat digunakan tanpa kesalahan. Jika menurut Anda pendekatan ini dapat diperbaiki, beri tahu saya.sumber