Ketika saya menjalankan sesuatu seperti:
from multiprocessing import Pool
p = Pool(5)
def f(x):
return x*x
p.map(f, [1,2,3])
ini bekerja dengan baik. Namun, menempatkan ini sebagai fungsi kelas:
class calculate(object):
def run(self):
def f(x):
return x*x
p = Pool()
return p.map(f, [1,2,3])
cl = calculate()
print cl.run()
Memberi saya kesalahan berikut:
Exception in thread Thread-1:
Traceback (most recent call last):
File "/sw/lib/python2.6/threading.py", line 532, in __bootstrap_inner
self.run()
File "/sw/lib/python2.6/threading.py", line 484, in run
self.__target(*self.__args, **self.__kwargs)
File "/sw/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
Saya telah melihat posting dari Alex Martelli berurusan dengan masalah yang sama, tetapi itu tidak cukup eksplisit.
python
multiprocessing
pickle
Mermoz
sumber
sumber
IPython.Parallel
, tetapi di sana Anda bisa menyelesaikan masalah dengan mendorong benda ke node. Tampaknya cukup menjengkelkan untuk menyelesaikan masalah ini dengan multiprosesor.calculate
adalah picklable, sehingga tampaknya seperti ini dapat diselesaikan dengan 1) menciptakan fungsi objek dengan konstruktor yang salinan atascalculate
contoh dan kemudian 2) melewati sebuah instance dari objek fungsi ini untukPool
'smap
metode. Tidak?multiprocessing
modul adalah karena tujuannya untuk menjadi implementasi lintas platform, dan kurangnyafork(2)
panggilan sistem seperti pada Windows. Jika Anda tidak peduli dengan dukungan Win32, mungkin ada solusi berbasis proses yang lebih sederhana. Atau jika Anda siap untuk menggunakan benang bukan proses, Anda dapat menggantifrom multiprocessing import Pool
denganfrom multiprocessing.pool import ThreadPool as Pool
.Jawaban:
Saya juga terganggu oleh pembatasan fungsi apa yang bisa diterima pool.map. Saya menulis yang berikut untuk menghindari ini. Tampaknya berfungsi, bahkan untuk penggunaan parmap secara rekursif.
sumber
Saya tidak dapat menggunakan kode yang diposting sejauh ini karena kode yang menggunakan "multiprocessing.Pool" tidak bekerja dengan ekspresi lambda dan kode tidak menggunakan "multiprocessing.Pool" menelurkan banyak proses karena ada item pekerjaan.
Saya mengadaptasi kode st yang memunculkan jumlah pekerja yang telah ditentukan dan hanya beralih melalui daftar input jika ada pekerja yang menganggur. Saya juga mengaktifkan mode "daemon" untuk ctrl-c karya pekerja seperti yang diharapkan.
sumber
parmap
fungsi ini ?(None, None)
sebagai item terakhir menunjukkanfun
bahwa item telah mencapai akhir urutan item untuk setiap proses.Multiprocessing dan acar rusak dan terbatas kecuali Anda melompat di luar perpustakaan standar.
Jika Anda menggunakan fork yang
multiprocessing
dipanggilpathos.multiprocesssing
, Anda bisa langsung menggunakan kelas dan metode kelas dalammap
fungsi multi-pemrosesan . Ini karenadill
digunakan sebagai gantipickle
ataucPickle
, dandill
dapat membuat serialisasi hampir semua hal dengan python.pathos.multiprocessing
juga menyediakan fungsi peta yang tidak sinkron ... dan dapatmap
berfungsi dengan banyak argumen (mis.map(math.pow, [1,2,3], [4,5,6])
)Lihat diskusi: Apa yang dapat dilakukan multiprocessing dan dill bersama?
dan: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization
Bahkan menangani kode yang Anda tulis pada awalnya, tanpa modifikasi, dan dari penerjemah. Mengapa melakukan hal lain yang lebih rapuh dan spesifik untuk satu kasus?
Dapatkan kode di sini: https://github.com/uqfoundation/pathos
Dan, hanya untuk memamerkan sedikit tentang apa yang dapat dilakukannya:
sumber
amap
) yang memungkinkan penggunaan progress bar dan pemrograman asinkron lainnya.Could not find a version that satisfies the requirement pp==1.5.7-pathos (from pathos)
multiprocess
yang kompatibel 2/3. Lihat stackoverflow.com/questions/27873093/... dan pypi.python.org/pypi/multiprocess .pathos
telah memiliki rilis stabil baru dan juga kompatibel dengan 2.x dan 3.x.Saat ini tidak ada solusi untuk masalah Anda, sejauh yang saya tahu: fungsi yang Anda berikan
map()
harus dapat diakses melalui impor modul Anda. Inilah sebabnya mengapa kode robert berfungsi: fungsinyaf()
dapat diperoleh dengan mengimpor kode berikut:Saya sebenarnya menambahkan bagian "utama", karena ini mengikuti rekomendasi untuk platform Windows ("Pastikan bahwa modul utama dapat diimpor dengan aman oleh juru bahasa Python baru tanpa menyebabkan efek samping yang tidak diinginkan").
Saya juga menambahkan huruf besar di depan
Calculate
, untuk mengikuti PEP 8 . :)sumber
Solusi oleh mrule benar tetapi memiliki bug: jika anak mengirim kembali sejumlah besar data, itu dapat mengisi buffer pipa, menghalangi anak
pipe.send()
, sementara orang tua menunggu anak untuk keluarpipe.join()
. Solusinya adalah dengan membaca data anak sebelumjoin()
memasukkan anak. Lebih jauh lagi, anak harus menutup ujung pipa orang tua untuk mencegah jalan buntu. Kode di bawah ini memperbaikinya. Perlu diketahui juga bahwa iniparmap
menciptakan satu proses per elemen diX
. Solusi yang lebih maju adalah menggunakanmultiprocessing.cpu_count()
untuk membagiX
menjadi beberapa potongan, dan kemudian menggabungkan hasilnya sebelum kembali. Saya meninggalkan itu sebagai latihan untuk pembaca agar tidak merusak keringkasan jawaban bagus oleh mrule. ;)sumber
OSError: [Errno 24] Too many open files
. Saya pikir perlu ada semacam batasan pada jumlah proses agar bisa berfungsi dengan baik ...Saya juga berjuang dengan ini. Saya memiliki fungsi sebagai anggota data kelas, sebagai contoh sederhana:
Saya perlu menggunakan fungsi self.f dalam panggilan Pool.map () dari dalam kelas yang sama dan self.f tidak menganggap tuple sebagai argumen. Karena fungsi ini tertanam dalam kelas, tidak jelas bagi saya bagaimana menulis jenis pembungkus jawaban yang disarankan.
Saya memecahkan masalah ini dengan menggunakan pembungkus berbeda yang mengambil tuple / daftar, di mana elemen pertama adalah fungsi, dan elemen yang tersisa adalah argumen untuk fungsi itu, yang disebut eval_func_tuple (f_args). Dengan menggunakan ini, baris bermasalah dapat diganti dengan return pool.map (eval_func_tuple, itertools.izip (itertools.repeat (self.f), list1, list2)). Ini kode lengkapnya:
File: util.py
File: main.py
Menjalankan main.py akan memberi [11, 22, 33]. Jangan ragu untuk memperbaiki ini, misalnya eval_func_tuple juga dapat dimodifikasi untuk mengambil argumen kata kunci.
Pada catatan lain, pada jawaban lain, fungsi "parmap" dapat dibuat lebih efisien untuk kasus lebih banyak Proses daripada jumlah CPU yang tersedia. Saya menyalin versi yang diedit di bawah ini. Ini adalah posting pertama saya dan saya tidak yakin apakah saya harus langsung mengedit jawaban aslinya. Saya juga mengganti nama beberapa variabel.
sumber
Saya mengambil jawaban klaus se dan aganders3, dan membuat modul terdokumentasi yang lebih mudah dibaca dan disimpan dalam satu file. Anda bisa menambahkannya ke proyek Anda. Bahkan memiliki progress bar opsional!
EDIT : Menambahkan saran @ alexander-mcfarlane dan fungsi tes
sumber
join()
pada saat yang sama dan Anda hanya akan mendapatkan flash100%
selesai padatqdm
layar. Satu-satunya waktu itu akan berguna adalah jika setiap prosesor memiliki beban kerja yang biastqdm()
ke wrap the line:result = [q_out.get() for _ in tqdm(sent)]
dan itu bekerja jauh lebih baik - upaya besar meskipun sangat menghargai ini jadi +1Saya tahu ini ditanyakan lebih dari 6 tahun yang lalu sekarang, tetapi hanya ingin menambahkan solusi saya, karena beberapa saran di atas tampak sangat rumit, tetapi solusi saya sebenarnya sangat sederhana.
Yang harus saya lakukan adalah membungkus panggilan pool.map () ke fungsi pembantu. Melewati objek kelas bersama dengan args untuk metode sebagai tuple, yang terlihat sedikit seperti ini.
sumber
Fungsi yang didefinisikan dalam kelas (bahkan di dalam fungsi di dalam kelas) tidak benar-benar acar. Namun, ini berfungsi:
sumber
Saya tahu bahwa pertanyaan ini ditanyakan 8 tahun dan 10 bulan yang lalu tetapi saya ingin memberikan solusi kepada Anda:
Anda hanya perlu menjadikan fungsi kelas Anda menjadi metode statis. Tetapi itu juga mungkin dengan metode kelas:
Diuji dalam Python 3.7.3
sumber
Saya memodifikasi metode klaus se karena ketika itu bekerja untuk saya dengan daftar kecil, itu akan hang ketika jumlah item ~ 1000 atau lebih besar. Alih-alih mendorong pekerjaan satu per satu dengan
None
kondisi berhenti, saya memuat antrian input sekaligus dan membiarkan proses mengunyah sampai kosong.Sunting: sayangnya sekarang saya mengalami kesalahan pada sistem saya: Batas maksimum antrian multiprosesing adalah 32767 , mudah-mudahan solusi di sana akan membantu.
sumber
Anda dapat menjalankan kode Anda tanpa masalah jika Anda entah bagaimana mengabaikan
Pool
objek secara manual dari daftar objek di kelas karena tidakpickle
dapat seperti yang dikatakan kesalahan. Anda dapat melakukan ini dengan__getstate__
fungsi (lihat di sini juga) sebagai berikut. ThePool
objek akan mencoba untuk menemukan__getstate__
dan__setstate__
fungsi dan mengeksekusi mereka jika menemukan ketika Anda menjalankanmap
,map_async
dll:Kemudian lakukan:
akan memberi Anda output:
Saya sudah menguji kode di atas dengan Python 3.x dan berhasil.
sumber
Saya tidak yakin apakah pendekatan ini telah diambil tetapi upaya yang saya gunakan adalah:
Output harus:
sumber
Ada kemungkinan Anda ingin menerapkan fungsi ini untuk setiap instance kelas yang berbeda. Maka di sini adalah solusi untuk itu juga
sumber
Ini solusi saya, yang menurut saya agak kurang hackish daripada kebanyakan orang lain di sini. Ini mirip dengan jawaban nightowl.
sumber
Dari http://www.rueckstiess.net/research/snippets/show/ca1d7d90 dan http://qingkaikong.blogspot.com/2016/12/python-parallel-method-in-class.html
Kita dapat membuat fungsi eksternal dan menaburinya dengan objek self class:
ATAU tanpa joblib:
sumber
Ini mungkin bukan solusi yang sangat baik tetapi dalam kasus saya, saya menyelesaikannya seperti ini.
Saya harus beralih
self
ke fungsi saya karena saya harus mengakses atribut dan fungsi kelas saya melalui fungsi itu. Ini bekerja untuk saya. Koreksi dan saran selalu kami terima.sumber
Berikut adalah boilerplate yang saya tulis untuk menggunakan Pool multiprocessing di python3, khususnya python3.7.7 digunakan untuk menjalankan tes. Saya menggunakan lari tercepat saya
imap_unordered
. Cukup masukkan skenario Anda dan coba. Anda dapat menggunakantimeit
atau hanyatime.time()
untuk mencari tahu mana yang paling cocok untuk Anda.Dalam skenario di atas
imap_unordered
sebenarnya sepertinya melakukan yang terburuk bagi saya. Cobalah kasing Anda dan berikan tolok ukur pada mesin yang ingin Anda jalankan. Baca juga di Proses Kolam . Bersulang!sumber