Python multiprocessing PicklingError: Tidak dapat mengacau <type 'function'>

244

Saya menyesal tidak dapat mereproduksi kesalahan dengan contoh yang lebih sederhana, dan kode saya terlalu rumit untuk dikirim. Jika saya menjalankan program di shell IPython bukan Python biasa, semuanya berjalan dengan baik.

Saya mencari beberapa catatan sebelumnya tentang masalah ini. Mereka semua disebabkan oleh penggunaan pool untuk memanggil fungsi yang didefinisikan dalam fungsi kelas. Tapi ini tidak terjadi pada saya.

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib64/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

Saya sangat menghargai bantuan apa pun.

Pembaruan : Fungsi saya acar didefinisikan di tingkat atas modul. Padahal itu memanggil fungsi yang berisi fungsi bersarang. yaitu, f()panggilan g()panggilan h()yang memiliki fungsi bersarang i(), dan saya menelepon pool.apply_async(f). f(), g(), h()Semua didefinisikan di tingkat atas. Saya mencoba contoh sederhana dengan pola ini dan itu berhasil.

Vendetta
sumber
3
Jawaban tingkat atas / diterima baik, tetapi bisa berarti Anda perlu menyusun ulang kode Anda, yang mungkin menyakitkan. Saya akan merekomendasikan bagi siapa saja yang memiliki masalah ini untuk juga membaca jawaban tambahan dilldan memanfaatkan pathos. Namun, saya tidak beruntung dengan salah satu solusi ketika bekerja dengan vtkobjects :( Adakah yang telah berhasil menjalankan kode python dalam pemrosesan paralel vtkPolyData?
Chris

Jawaban:

306

Berikut adalah daftar apa yang bisa diasinkan . Secara khusus, fungsi hanya dapat dipilih jika mereka didefinisikan di tingkat atas modul.

Sepotong kode ini:

import multiprocessing as mp

class Foo():
    @staticmethod
    def work(self):
        pass

if __name__ == '__main__':   
    pool = mp.Pool()
    foo = Foo()
    pool.apply_async(foo.work)
    pool.close()
    pool.join()

menghasilkan kesalahan yang hampir identik dengan yang Anda posting:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

Masalahnya adalah bahwa poolsemua metode menggunakan a mp.SimpleQueueuntuk melewati tugas ke proses pekerja. Segala sesuatu yang melewati mp.SimpleQueueharus dapat dipilih, dan foo.worktidak dapat dipilih karena tidak ditentukan di tingkat atas modul.

Itu dapat diperbaiki dengan mendefinisikan fungsi di tingkat atas, yang memanggil foo.work():

def work(foo):
    foo.work()

pool.apply_async(work,args=(foo,))

Pemberitahuan yang foodapat dipilih, karena Fooditentukan di tingkat atas dan foo.__dict__dapat dipilih.

unutbu
sumber
2
Terima kasih untuk balasan Anda. Saya memperbarui pertanyaan saya. Saya tidak berpikir itu penyebabnya,
Vendetta
7
Untuk mendapatkan PicklingError, sesuatu harus diletakkan di Antrian yang tidak dapat dipilih. Bisa jadi fungsi atau argumennya. Untuk mengetahui lebih lanjut tentang masalah ini, saya sarankan buat salinan dari program Anda, dan mulailah menguranginya, membuatnya lebih sederhana dan lebih sederhana, setiap kali menjalankan kembali program untuk melihat apakah masalahnya tetap ada. Ketika itu menjadi sangat sederhana, Anda akan menemukan sendiri masalahnya, atau akan memiliki sesuatu yang dapat Anda posting di sini.
unutbu
3
Juga: jika Anda mendefinisikan fungsi di tingkat atas modul, tetapi didekorasi, maka referensi akan ke output dekorator, dan Anda tetap akan mendapatkan kesalahan ini.
bobpoekert
5
Hanya terlambat 5 tahun, tapi saya baru saja mengalami ini. Ternyata "level atas" harus diambil lebih harfiah dari biasanya: menurut saya definisi fungsi harus mendahului inisialisasi kumpulan (yaitu pool = Pool()baris di sini ). Saya tidak berharap itu, dan ini mungkin menjadi alasan mengapa masalah OP berlanjut.
Andras Deak
4
Secara khusus, fungsi hanya dapat dipilih jika mereka didefinisikan di tingkat atas modul. Tampaknya hasil penerapan functool.partialke fungsi tingkat atas juga dapat dilakukan oleh acar, bahkan jika fungsi tersebut ditetapkan di dalam fungsi lain.
user1071847
96

Saya akan menggunakan pathos.multiprocesssing, bukan multiprocessing. pathos.multiprocessingadalah garpu multiprocessingyang menggunakan dill. dilldapat membuat cerita bersambung hampir apa saja dengan python, sehingga Anda dapat mengirim lebih banyak secara paralel. The pathosgarpu juga memiliki kemampuan untuk bekerja secara langsung dengan beberapa fungsi argumen, yang Anda butuhkan untuk metode kelas.

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool(4)
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
>>> 
>>> class Foo(object):
...   @staticmethod
...   def work(self, x):
...     return x+1
... 
>>> f = Foo()
>>> p.apipe(f.work, f, 100)
<processing.pool.ApplyResult object at 0x10504f8d0>
>>> res = _
>>> res.get()
101

Dapatkan pathos(dan jika Anda suka dill) di sini: https://github.com/uqfoundation

Mike McKerns
sumber
5
bekerja memperlakukan. Untuk orang lain, saya menginstal kedua perpustakaan melalui: sudo pip install git+https://github.com/uqfoundation/dill.git@masterdansudo pip install git+https://github.com/uqfoundation/pathos.git@master
Alexander McFarlane
5
@AlexanderMcFarlane Saya tidak akan menginstal paket python dengan sudo(dari sumber eksternal seperti github khususnya). Sebaliknya, saya akan merekomendasikan untuk menjalankan:pip install --user git+...
Chris
Menggunakan just pip install pathostidak bekerja dengan sedih dan memberikan pesan ini:Could not find a version that satisfies the requirement pp==1.5.7-pathos (from pathos)
xApple
11
pip install pathossekarang berfungsi, dan pathospython 3 kompatibel.
Mike McKerns
3
@DanielGoldfarb: multiprocessadalah sebuah garpu di multiprocessingmana dilltelah diganti pickledi beberapa tempat dalam kode ... tetapi pada dasarnya, itu saja. pathosmenyediakan beberapa lapisan API tambahan multiprocessdan juga memiliki backend tambahan. Tapi, itulah intinya.
Mike McKerns
29

Seperti yang orang lain katakan multiprocessinghanya dapat mentransfer objek Python ke proses pekerja yang dapat diasamkan. Jika Anda tidak dapat mengatur ulang kode Anda seperti yang dijelaskan oleh unutbu, Anda dapat menggunakan dillkapabilitas pickling / pembongkaran yang diperluas untuk mentransfer data (terutama data kode) seperti yang saya perlihatkan di bawah ini.

Solusi ini hanya memerlukan instalasi dilldan tidak ada perpustakaan lain seperti pathos:

import os
from multiprocessing import Pool

import dill


def run_dill_encoded(payload):
    fun, args = dill.loads(payload)
    return fun(*args)


def apply_async(pool, fun, args):
    payload = dill.dumps((fun, args))
    return pool.apply_async(run_dill_encoded, (payload,))


if __name__ == "__main__":

    pool = Pool(processes=5)

    # asyn execution of lambda
    jobs = []
    for i in range(10):
        job = apply_async(pool, lambda a, b: (a, b, a * b), (i, i + 1))
        jobs.append(job)

    for job in jobs:
        print job.get()
    print

    # async execution of static method

    class O(object):

        @staticmethod
        def calc():
            return os.getpid()

    jobs = []
    for i in range(10):
        job = apply_async(pool, O.calc, ())
        jobs.append(job)

    for job in jobs:
        print job.get()
rocksportrocker
sumber
6
Saya adalah dilldan pathospenulis ... dan sementara Anda benar, bukankah jauh lebih bagus dan bersih dan lebih fleksibel untuk digunakan pathosseperti dalam jawaban saya? Atau mungkin saya agak bias ...
Mike McKerns
4
Saya tidak mengetahui status pathospada saat penulisan dan ingin memberikan solusi yang sangat dekat dengan jawabannya. Sekarang saya telah melihat solusi Anda, saya setuju bahwa ini adalah cara untuk pergi.
rocksportrocker
Saya membaca solusi Anda dan seperti, Doh… I didn't even think of doing it like that. Jadi itu agak keren.
Mike McKerns
4
Terima kasih telah memposting, saya menggunakan pendekatan ini untuk argumen dilling / undilling yang tidak dapat dipilih: stackoverflow.com/questions/27883574/…
jazzblue
@rocksportrocker. Saya membaca contoh ini dan tidak dapat memahami mengapa ada forloop eksplisit . Saya biasanya melihat paralel paralel mengambil daftar dan mengembalikan daftar tanpa loop.
user1700890
20

Saya telah menemukan bahwa saya juga dapat menghasilkan output kesalahan persis pada sepotong kode yang berfungsi dengan baik dengan mencoba menggunakan profiler di atasnya.

Perhatikan bahwa ini ada di Windows (di mana forking agak kurang elegan).

Saya berlari:

python -m profile -o output.pstats <script> 

Dan menemukan bahwa menghapus profil menghapus kesalahan dan menempatkan profil mengembalikannya. Membuatku batty juga karena aku tahu kode yang digunakan untuk bekerja. Saya sedang memeriksa untuk melihat apakah ada sesuatu yang diperbarui pool.py ... kemudian punya perasaan tenggelam dan menghilangkan profil dan hanya itu.

Posting di sini untuk arsip seandainya ada orang lain yang melihatnya.

Yehezkiel Kruglick
sumber
3
WOW, terima kasih sudah menyebutkan! Itu membuat saya gila selama satu jam terakhir atau lebih; Saya mencoba semuanya hingga contoh yang sangat sederhana - sepertinya tidak ada yang berhasil. Tapi saya juga memiliki profiler berjalan melalui batchfile saya :(
tim
1
Oh, tidak bisa cukup berterima kasih. Ini terdengar sangat konyol, karena sangat tak terduga. Saya pikir itu harus disebutkan dalam dokumen. Semua yang saya miliki adalah pernyataan impor pdb, dan fungsi tingkat atas yang sederhana dengan hanya passtidak 'acar'.
0xc0de
10

Ketika masalah ini muncul dengan multiprocessingsolusi sederhana adalah beralih dari Poolke ThreadPool. Ini dapat dilakukan tanpa perubahan kode selain impor

from multiprocessing.pool import ThreadPool as Pool

Ini berfungsi karena ThreadPool berbagi memori dengan utas utama, daripada membuat proses baru - ini berarti pengawetan tidak diperlukan.

Kelemahan dari metode ini adalah bahwa python bukan bahasa terbaik dengan penanganan utas - ia menggunakan sesuatu yang disebut Global Interpreter Lock untuk menjaga agar utas tetap aman, yang dapat memperlambat beberapa kasus penggunaan di sini. Namun, jika Anda terutama berinteraksi dengan sistem lain (menjalankan perintah HTTP, berbicara dengan database, menulis ke sistem file) maka kode Anda kemungkinan tidak terikat oleh CPU dan tidak akan mendapatkan banyak hasil. Bahkan saya telah menemukan ketika menulis tolok ukur HTTP / HTTPS bahwa model berulir yang digunakan di sini memiliki lebih sedikit overhead dan penundaan, karena overhead dari membuat proses baru jauh lebih tinggi daripada overhead untuk membuat thread baru.

Jadi, jika Anda memproses banyak hal dalam ruang python pengguna ini mungkin bukan metode terbaik.

tedivm
sumber
2
Tapi kemudian Anda hanya menggunakan satu CPU (setidaknya dengan versi Python biasa yang menggunakan GIL ), yang jenisnya mengalahkan tujuan.
Endre Both
Itu benar-benar tergantung pada apa tujuannya. Global Interpreter Lock tidak berarti bahwa hanya satu instance pada satu waktu yang dapat menjalankan kode python, tetapi untuk tindakan yang sangat memblokir (akses sistem file, mengunduh file besar atau banyak, menjalankan kode eksternal), GIL akhirnya menjadi non-isu. Dalam beberapa kasus, overhead dari membuka proses baru (bukan utas) melebihi overhead GIL.
tedivm
Itu benar, terima kasih. Masih Anda mungkin ingin memasukkan peringatan dalam jawabannya. Hari-hari ini ketika kekuatan pemrosesan meningkat sebagian besar datang dalam bentuk core CPU lebih daripada lebih kuat, beralih dari multicore ke eksekusi single-core adalah efek samping yang agak signifikan.
Endre Both
Poin bagus - Saya telah memperbarui jawabannya dengan lebih detail. Saya ingin menunjukkan bahwa beralih ke multiprocessing berulir tidak membuat python hanya berfungsi pada satu inti.
tedivm
4

Solusi ini hanya membutuhkan instalasi dill dan tidak ada perpustakaan lain sebagai patho

def apply_packed_function_for_map((dumped_function, item, args, kwargs),):
    """
    Unpack dumped function as target function and call it with arguments.

    :param (dumped_function, item, args, kwargs):
        a tuple of dumped function and its arguments
    :return:
        result of target function
    """
    target_function = dill.loads(dumped_function)
    res = target_function(item, *args, **kwargs)
    return res


def pack_function_for_map(target_function, items, *args, **kwargs):
    """
    Pack function and arguments to object that can be sent from one
    multiprocessing.Process to another. The main problem is:
        «multiprocessing.Pool.map*» or «apply*»
        cannot use class methods or closures.
    It solves this problem with «dill».
    It works with target function as argument, dumps it («with dill»)
    and returns dumped function with arguments of target function.
    For more performance we dump only target function itself
    and don't dump its arguments.
    How to use (pseudo-code):

        ~>>> import multiprocessing
        ~>>> images = [...]
        ~>>> pool = multiprocessing.Pool(100500)
        ~>>> features = pool.map(
        ~...     *pack_function_for_map(
        ~...         super(Extractor, self).extract_features,
        ~...         images,
        ~...         type='png'
        ~...         **options,
        ~...     )
        ~... )
        ~>>>

    :param target_function:
        function, that you want to execute like  target_function(item, *args, **kwargs).
    :param items:
        list of items for map
    :param args:
        positional arguments for target_function(item, *args, **kwargs)
    :param kwargs:
        named arguments for target_function(item, *args, **kwargs)
    :return: tuple(function_wrapper, dumped_items)
        It returs a tuple with
            * function wrapper, that unpack and call target function;
            * list of packed target function and its' arguments.
    """
    dumped_function = dill.dumps(target_function)
    dumped_items = [(dumped_function, item, args, kwargs) for item in items]
    return apply_packed_function_for_map, dumped_items

Ini juga berfungsi untuk array numpy.

Ilia w495 Nikitin
sumber
2
Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

Kesalahan ini juga akan datang jika Anda memiliki fungsi inbuilt di dalam objek model yang diteruskan ke pekerjaan async.

Jadi pastikan untuk memeriksa objek model yang dilewati tidak memiliki fungsi bawaan. (Dalam kasus kami, kami menggunakan FieldTracker()fungsi django-model-utils di dalam model untuk melacak bidang tertentu). Inilah tautan ke masalah GitHub yang relevan.

Penkey Suresh
sumber
0

Membangun solusi @rocksportrocker, Masuk akal untuk mengisi saat mengirim dan MEMULIHKAN hasilnya.

import dill
import itertools
def run_dill_encoded(payload):
    fun, args = dill.loads(payload)
    res = fun(*args)
    res = dill.dumps(res)
    return res

def dill_map_async(pool, fun, args_list,
                   as_tuple=True,
                   **kw):
    if as_tuple:
        args_list = ((x,) for x in args_list)

    it = itertools.izip(
        itertools.cycle([fun]),
        args_list)
    it = itertools.imap(dill.dumps, it)
    return pool.map_async(run_dill_encoded, it, **kw)

if __name__ == '__main__':
    import multiprocessing as mp
    import sys,os
    p = mp.Pool(4)
    res = dill_map_async(p, lambda x:[sys.stdout.write('%s\n'%os.getpid()),x][-1],
                  [lambda x:x+1]*10,)
    res = res.get(timeout=100)
    res = map(dill.loads,res)
    print(res)
seharusnya lihat
sumber