Tidak dapat memilih <type 'instancemethod'> saat menggunakan multiprocessing Pool.map ()

218

Saya mencoba untuk menggunakan multiprocessing's Pool.map()fungsi untuk membagi pekerjaan secara bersamaan. Ketika saya menggunakan kode berikut, itu berfungsi dengan baik:

import multiprocessing

def f(x):
    return x*x

def go():
    pool = multiprocessing.Pool(processes=4)        
    print pool.map(f, range(10))


if __name__== '__main__' :
    go()

Namun, ketika saya menggunakannya dalam pendekatan yang lebih berorientasi objek, itu tidak berhasil. Pesan kesalahan yang diberikannya adalah:

PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
__builtin__.instancemethod failed

Ini terjadi ketika berikut ini adalah program utama saya:

import someClass

if __name__== '__main__' :
    sc = someClass.someClass()
    sc.go()

dan berikut ini adalah someClasskelas saya :

import multiprocessing

class someClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def go(self):
        pool = multiprocessing.Pool(processes=4)       
        print pool.map(self.f, range(10))

Adakah yang tahu apa masalahnya, atau cara yang mudah untuk menyelesaikannya?

ventolin
sumber
4
jika f adalah fungsi bersarang ada kesalahan serupaPicklingError: Can't pickle <class 'function'>: attribute lookup builtins.function failed
ggg

Jawaban:

122

Masalahnya adalah bahwa multiprocessing harus mengambil hal-hal untuk sling mereka di antara proses, dan metode terikat tidak dapat dipilih. Solusinya (apakah Anda menganggapnya "mudah" atau tidak ;-) adalah menambahkan infrastruktur ke program Anda untuk memungkinkan metode seperti itu untuk diacar , mendaftarkannya dengan metode library standar copy_reg .

Sebagai contoh, kontribusi Steven Bethard pada utas ini (menjelang akhir utas) menunjukkan satu pendekatan yang bisa diterapkan dengan baik untuk memungkinkan metode pengawetan / pembongkaran via copy_reg.

Alex Martelli
sumber
Itu bagus - terima kasih. Sepertinya telah berkembang beberapa cara, bagaimanapun: Menggunakan kode di pastebin.ca/1693348 sekarang saya mendapatkan RuntimeError: kedalaman rekursi maksimum terlampaui. Saya melihat sekeliling dan satu posting forum merekomendasikan meningkatkan kedalaman maksimum menjadi 1500 (dari 1000 default) tetapi saya tidak punya sukacita di sana. Sejujurnya, saya tidak bisa melihat bagian mana (dari kode saya, setidaknya) yang bisa berulang di luar kendali, kecuali untuk beberapa alasan kode itu menjadi acar dan tidak berubah menjadi satu lingkaran, karena sedikit perubahan yang saya buat untuk membuat Kode Steven OO'd?
ventolin
1
_pickle_methodPengembalian Anda self._unpickle_method, metode terikat; jadi tentu saja acar sekarang mencoba untuk acar ITU - dan itu sesuai dengan yang Anda katakan: dengan menelepon _pickle_method, secara rekursif. Yaitu dengan OOmemasukkan kode dengan cara ini, Anda pasti telah memperkenalkan rekursi tak terbatas. Saya sarankan kembali ke kode Steven (dan tidak beribadah di altar OO ketika tidak tepat: banyak hal dengan Python paling baik dilakukan dengan cara yang lebih fungsional, dan ini satu).
Alex Martelli
15
Untuk yang super sangat malas , lihat satu-satunya jawaban yang repot memposting kode yang sebenarnya tidak rusak ...
Cerin
2
Cara lain untuk memperbaiki / menghindari masalah pengawetan adalah menggunakan dill, lihat jawaban saya stackoverflow.com/questions/8804830/…
rocksportrocker
74

Semua solusi ini jelek karena multiprosesing dan pengawetan rusak dan terbatas kecuali Anda melompat di luar perpustakaan standar.

Jika Anda menggunakan fork yang multiprocessingdipanggil pathos.multiprocesssing, Anda bisa langsung menggunakan kelas dan metode kelas dalam mapfungsi multi-pemrosesan . Ini karena dilldigunakan sebagai ganti pickleatau cPickle, dan dilldapat membuat serialisasi hampir semua hal dengan python.

pathos.multiprocessingjuga menyediakan fungsi peta yang tidak sinkron ... dan dapat mapberfungsi dengan banyak argumen (mis. map(math.pow, [1,2,3], [4,5,6]))

Lihat: Apa yang dapat dilakukan multiprocessing dan dill bersama?

dan: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization/

>>> import pathos.pools as pp
>>> p = pp.ProcessPool(4)
>>> 
>>> def add(x,y):
...   return x+y
... 
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>> 
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>> 
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> 
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>> 
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]

Dan hanya untuk menjadi eksplisit, Anda dapat melakukan persis apa yang ingin Anda lakukan di tempat pertama, dan Anda dapat melakukannya dari penerjemah, jika Anda mau.

>>> import pathos.pools as pp
>>> class someClass(object):
...   def __init__(self):
...     pass
...   def f(self, x):
...     return x*x
...   def go(self):
...     pool = pp.ProcessPool(4)
...     print pool.map(self.f, range(10))
... 
>>> sc = someClass()
>>> sc.go()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> 

Dapatkan kode di sini: https://github.com/uqfoundation/pathos

Mike McKerns
sumber
3
Bisakah Anda memperbarui jawaban ini berdasarkan pathos.pp karena pathos.multiprocessing tidak ada lagi?
Saheel Godhane
10
Saya pathospenulisnya. Versi yang Anda maksud sudah berumur beberapa tahun. Coba versi di github, Anda dapat menggunakan pathos.ppatau github.com/uqfoundation/ppft .
Mike McKerns
1
atau github.com/uqfoundation/pathos . @ SaheelGodhane: Rilis baru sudah lama ditunggu, tapi harus segera keluar
Mike McKerns
3
Pertama kalau pip install setuptoolsbegitu pip install git+https://github.com/uqfoundation/pathos.git@master. Ini akan mendapatkan dependensi yang sesuai. Rilis baru hampir siap ... sekarang hampir semuanya pathosjuga berjalan di windows, dan 3.xkompatibel.
Mike McKerns
1
@Rika: Ya. tersedia pemblokiran, iteratif, dan peta async.
Mike McKerns
35

Anda juga bisa mendefinisikan __call__()metode di dalam Anda someClass(), yang memanggil someClass.go()dan kemudian mengirimkan instance someClass()ke pool. Objek ini acar dan berfungsi baik (untuk saya) ...

dorvak
sumber
3
Ini jauh lebih mudah daripada teknik yang diusulkan oleh Alex Martelli, tetapi Anda terbatas untuk mengirim hanya satu metode per kelas ke kumpulan multi proses Anda.
usang
6
Satu detail lain yang perlu diingat adalah bahwa hanya objek (instance kelas) yang di acar, bukan kelas itu sendiri. Oleh karena itu, jika Anda telah mengubah atribut kelas apa pun dari nilai defaultnya, perubahan ini tidak akan menyebar ke proses yang berbeda. Solusinya adalah memastikan bahwa semua yang dibutuhkan fungsi Anda disimpan sebagai atribut instance.
usang
2
@dorvak bisa tolong tunjukkan contoh sederhana dengan __call__()? Saya pikir jawaban Anda mungkin yang lebih bersih - saya berjuang untuk memahami kesalahan ini, dan pertama kali saya datang untuk melihat panggilan. Ngomong-ngomong, jawaban ini juga membantu menjelaskan apa yang multiprocessing lakukan: [ stackoverflow.com/a/20789937/305883]
user305883
1
Bisakah Anda memberi contohnya?
frmsaul
1
Ada jawaban baru yang diposting (saat ini di bawah ini) dengan kode contoh untuk ini.
Aaron
22

Namun beberapa keterbatasan terhadap solusi Steven Bethard:

Ketika Anda mendaftarkan metode kelas Anda sebagai suatu fungsi, penghancur kelas Anda secara mengejutkan dipanggil setiap kali pemrosesan metode Anda selesai. Jadi, jika Anda memiliki 1 instance dari kelas Anda yang memanggil n kali metodenya, anggota dapat menghilang antara 2 run dan Anda mungkin mendapatkan pesan malloc: *** error for object 0x...: pointer being freed was not allocated(misalnya file anggota terbuka) atau pure virtual method called, terminate called without an active exception(yang berarti dari masa pakai objek anggota yang saya gunakan lebih pendek dari apa yang saya pikirkan). Saya mendapatkan ini ketika berhadapan dengan n lebih besar dari ukuran kolam. Ini adalah contoh singkat:

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

# --------- see Stenven's solution above -------------
from copy_reg import pickle
from types import MethodType

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)


class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multi-processing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self.process_obj, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __del__(self):
        print "... Destructor"

    def process_obj(self, index):
        print "object %d" % index
        return "results"

pickle(MethodType, _pickle_method, _unpickle_method)
Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once)

Keluaran:

Constructor ...
object 0
object 1
object 2
... Destructor
object 3
... Destructor
object 4
... Destructor
object 5
... Destructor
object 6
... Destructor
object 7
... Destructor
... Destructor
... Destructor
['results', 'results', 'results', 'results', 'results', 'results', 'results', 'results']
... Destructor

The __call__Metode tidak begitu setara, karena [None, ...] dibaca dari hasil:

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multiprocessing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __call__(self, i):
        self.process_obj(i)

    def __del__(self):
        print "... Destructor"

    def process_obj(self, i):
        print "obj %d" % i
        return "result"

Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once), 
# **and** results are empty !

Jadi tidak satupun dari kedua metode ini memuaskan ...

Eric H.
sumber
7
Anda Nonekembali karena definisi Anda __call__hilang return: seharusnya return self.process_obj(i).
torek
1
@ Eric Saya mendapatkan kesalahan yang sama dan saya mencoba solusi ini, namun saya mulai mendapatkan kesalahan baru sebagai "cPickle.PicklingError: Tidak dapat mengacau <type 'function'>: atribut lookup builtin .funcation gagal." Apakah Anda tahu apa yang mungkin menjadi alasan di baliknya?
Naman
15

Ada jalan pintas lain yang bisa Anda gunakan, meskipun bisa jadi tidak efisien tergantung pada apa yang ada di instance kelas Anda.

Seperti yang dikatakan semua orang, masalahnya adalah bahwa multiprocessingkode tersebut harus mengacak hal-hal yang dikirimkannya ke sub-proses yang telah dimulai, dan pemetik tidak melakukan metode contoh.

Namun, alih-alih mengirim metode-instance, Anda dapat mengirim instance kelas aktual, ditambah nama fungsi yang akan dipanggil, ke fungsi biasa yang kemudian digunakan getattruntuk memanggil metode-instance, sehingga menciptakan metode terikat dalam Poolsubproses. Ini mirip dengan mendefinisikan __call__metode kecuali bahwa Anda dapat memanggil lebih dari satu fungsi anggota.

Mencuri kode @ EricH. Dari jawabannya dan sedikit membubuhi keterangan (saya mengetik ulang karenanya semua nama berubah dan semacamnya, untuk beberapa alasan ini tampak lebih mudah daripada cut-and-paste :-)) untuk ilustrasi semua keajaiban:

import multiprocessing
import os

def call_it(instance, name, args=(), kwargs=None):
    "indirect caller for instance methods and multiprocessing"
    if kwargs is None:
        kwargs = {}
    return getattr(instance, name)(*args, **kwargs)

class Klass(object):
    def __init__(self, nobj, workers=multiprocessing.cpu_count()):
        print "Constructor (in pid=%d)..." % os.getpid()
        self.count = 1
        pool = multiprocessing.Pool(processes = workers)
        async_results = [pool.apply_async(call_it,
            args = (self, 'process_obj', (i,))) for i in range(nobj)]
        pool.close()
        map(multiprocessing.pool.ApplyResult.wait, async_results)
        lst_results = [r.get() for r in async_results]
        print lst_results

    def __del__(self):
        self.count -= 1
        print "... Destructor (in pid=%d) count=%d" % (os.getpid(), self.count)

    def process_obj(self, index):
        print "object %d" % index
        return "results"

Klass(nobj=8, workers=3)

Keluaran menunjukkan bahwa, memang, konstruktor dipanggil sekali (dalam pid asli) dan destruktor disebut 9 kali (sekali untuk setiap salinan dibuat = 2 atau 3 kali per kumpulan-pekerja-proses sesuai kebutuhan, ditambah sekali dalam aslinya proses). Ini sering OK, seperti dalam kasus ini, karena pemetik default membuat salinan dari seluruh instance dan (semi-) secara diam-diam mengisinya kembali — dalam hal ini, melakukan:

obj = object.__new__(Klass)
obj.__dict__.update({'count':1})

—Karena itu meskipun destruktor disebut delapan kali dalam tiga proses pekerja, ia menghitung mundur dari 1 menjadi 0 setiap kali — tetapi tentu saja Anda masih dapat mendapat masalah dengan cara ini. Jika perlu, Anda bisa menyediakan sendiri __setstate__:

    def __setstate__(self, adict):
        self.count = adict['count']

dalam hal ini misalnya.

torek
sumber
1
Sejauh ini, ini adalah jawaban terbaik untuk masalah ini, karena ini adalah yang termudah untuk diterapkan pada perilaku default yang tidak dapat acar
Matt Taylor
12

Anda juga bisa mendefinisikan __call__()metode di dalam Anda someClass(), yang memanggil someClass.go()dan kemudian mengirimkan instance someClass()ke pool. Objek ini acar dan berfungsi baik (untuk saya) ...

class someClass(object):
   def __init__(self):
       pass
   def f(self, x):
       return x*x

   def go(self):
      p = Pool(4)
      sc = p.map(self, range(4))
      print sc

   def __call__(self, x):   
     return self.f(x)

sc = someClass()
sc.go()
parisjohn
sumber
3

Solusi dari parisjohn di atas berfungsi baik dengan saya. Ditambah lagi kode terlihat bersih dan mudah dimengerti. Dalam kasus saya ada beberapa fungsi untuk memanggil menggunakan Pool, jadi saya memodifikasi kode parisjohn sedikit di bawah ini. Saya membuat panggilan untuk dapat memanggil beberapa fungsi, dan nama fungsi diteruskan dalam argumen dari go():

from multiprocessing import Pool
class someClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def g(self, x):
        return x*x+1    

    def go(self):
        p = Pool(4)
        sc = p.map(self, [{"func": "f", "v": 1}, {"func": "g", "v": 2}])
        print sc

    def __call__(self, x):
        if x["func"]=="f":
            return self.f(x["v"])
        if x["func"]=="g":
            return self.g(x["v"])        

sc = someClass()
sc.go()
Neobot
sumber
1

Solusi yang mungkin sepele untuk ini adalah beralih ke menggunakan multiprocessing.dummy. Ini adalah implementasi berbasis antarmuka multiprocessing yang tampaknya tidak memiliki masalah ini di Python 2.7. Saya tidak punya banyak pengalaman di sini, tetapi perubahan impor cepat ini memungkinkan saya untuk memanggil apply_async pada metode kelas.

Beberapa sumber yang bagus tentang multiprocessing.dummy:

https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.dummy

http://chriskiehl.com/article/parallelism-in-one-line/

David Parks
sumber
1

Dalam kasus sederhana ini, di mana someClass.ftidak mewarisi data apa pun dari kelas dan tidak melampirkan apa pun ke kelas, solusi yang mungkin adalah dengan memisahkan f, sehingga dapat diambil acar:

import multiprocessing


def f(x):
    return x*x


class someClass(object):
    def __init__(self):
        pass

    def go(self):
        pool = multiprocessing.Pool(processes=4)       
        print pool.map(f, range(10))
mhh
sumber
1

Mengapa tidak menggunakan func terpisah?

def func(*args, **kwargs):
    return inst.method(args, kwargs)

print pool.map(func, arr)
0script0
sumber
1

Saya mengalami masalah yang sama tetapi menemukan bahwa ada JSON encoder yang dapat digunakan untuk memindahkan objek-objek ini antara proses.

from pyVmomi.VmomiSupport import VmomiJSONEncoder

Gunakan ini untuk membuat daftar Anda:

jsonSerialized = json.dumps(pfVmomiObj, cls=VmomiJSONEncoder)

Kemudian dalam fungsi yang dipetakan, gunakan ini untuk memulihkan objek:

pfVmomiObj = json.loads(jsonSerialized)
George
sumber
0

Pembaruan: pada hari penulisan ini, namedTuples dapat dipilih (dimulai dengan python 2.7)

Masalahnya di sini adalah proses anak tidak dapat mengimpor kelas objek -dalam kasus ini, kelas P-, dalam kasus proyek multi-model Kelas P harus dapat diimpor di mana saja proses anak digunakan

solusi cepat adalah membuatnya dapat diimpor dengan memengaruhinya ke global ()

globals()["P"] = P
rachid el kedmiri
sumber