multiprocessing: Bagaimana cara berbagi dict di antara banyak proses?

113

Program yang membuat beberapa proses yang bekerja pada antrian yang dapat digabungkan Q, dan pada akhirnya dapat memanipulasi kamus global Duntuk menyimpan hasil. (sehingga setiap proses anak dapat menggunakan Duntuk menyimpan hasilnya dan juga melihat hasil apa yang dihasilkan oleh proses anak lainnya)

Jika saya mencetak kamus D dalam proses anak, saya melihat modifikasi yang telah dilakukan di atasnya (yaitu di D). Tetapi setelah proses utama bergabung dengan Q, jika saya mencetak D, itu adalah dict kosong!

Saya mengerti ini adalah masalah sinkronisasi / kunci. Dapatkah seseorang memberi tahu saya apa yang terjadi di sini, dan bagaimana saya dapat menyinkronkan akses ke D?

dop
sumber
1
Ini tidak bekerja seperti yang diharapkan setidaknya pada python 3.7.2 menggunakan osx 10.14.4 Dict tidak disinkronkan dan isinya ditulis ulang oleh proses lain. Namun, <code> multiprocessing.Manager (). List () </code> berfungsi seperti yang diharapkan.
Andrew Druchenko

Jawaban:

162

Jawaban umum melibatkan penggunaan suatu Managerobjek. Diadaptasi dari dokumen:

from multiprocessing import Process, Manager

def f(d):
    d[1] += '1'
    d['2'] += 2

if __name__ == '__main__':
    manager = Manager()

    d = manager.dict()
    d[1] = '1'
    d['2'] = 2

    p1 = Process(target=f, args=(d,))
    p2 = Process(target=f, args=(d,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

    print d

Keluaran:

$ python mul.py 
{1: '111', '2': 6}
pengirim
sumber
4
Terima kasih pengirim. Memang, D = multiprocessing.Manager (). Dict () memecahkan masalah saya. Saya menggunakan D = dict ().
dop
3
@ LorenzoBelli, jika Anda bertanya apakah akses ke pengelola disinkronkan, saya yakin jawabannya adalah ya. multiprocessing.Manager()mengembalikan sebuah instance dariSyncManager , yang namanya menyarankan sebanyak itu!
pengirim
@senderle Saya ingin berbagi keadaan acak numpy dari proses induk dengan proses anak. Saya sudah mencoba menggunakan Managertetapi masih belum berhasil. Bisakah Anda melihat pertanyaan saya di sini dan melihat apakah Anda dapat menawarkan solusi? Saya masih bisa mendapatkan nomor acak yang berbeda jika saya melakukannya np.random.seed(None)setiap kali saya menghasilkan nomor acak, tetapi ini tidak memungkinkan saya untuk menggunakan keadaan acak dari proses induk, yang bukan itu yang saya inginkan. Bantuan apa pun sangat dihargai.
Amir
1
@RadioControlled dengan senang hati menulis pembaruan, namun secara singkat, meskipun menurut saya Anda tidak dapat mewujudkannya secara langsung, Anda dapat dengan mudah membuat dikt terkelola baru dengan kunci dan nilai yang sama, dan menggunakannya sebagai ganti yang asli. Apakah itu cukup untuk kasus Anda?
senderle
1
@senderle, itulah yang akhirnya saya lakukan. Jadi jawabannya adalah Anda harus melakukan itu.
Radio Controlled
25

multiprocessing tidak seperti threading. Setiap proses anak akan mendapatkan salinan memori proses utama. Umumnya keadaan dibagi melalui komunikasi (pipa / soket), sinyal, atau memori bersama.

Multiprocessing membuat beberapa abstraksi tersedia untuk kasus penggunaan Anda - status bersama yang diperlakukan sebagai lokal dengan menggunakan proxy atau memori bersama: http://docs.python.org/library/multiprocessing.html#sharing-state-between-processes

Bagian yang relevan:

Jeremy Brown
sumber
1
Terima kasih banyak. Anda mengarahkan saya ke / a solusi: multiprocessing.Manager (). Dict ().
dop
Dapatkah seseorang menjelaskan apa arti pernyataan "Setiap proses anak akan mendapatkan salinan dari memori proses utama".
Itsme2003
@ Itsme2003 secara default proses yang muncul tidak memiliki akses ke memori proses induk (ini adalah salah satu perbedaan utama untuk utas). Jadi, ketika suatu proses membutuhkan objek dari proses induk, itu harus membuat salinannya (alih-alih mendapatkan referensi ke objek sebenarnya). Jawaban di atas menjelaskan tentang cara berbagi objek antar proses.
Niklas Mertsch
Karena ini sering keliru: Selama Anda tidak memodifikasi objek, setidaknya dalam konfigurasi Linux biasa, objek tersebut sebenarnya hanya akan disimpan satu kali di memori. Ini akan disalin segera setelah diubah. Ini bisa menjadi sangat penting jika Anda perlu menghemat memori dan tidak memodifikasi objek.
Radio Controlled
16

Saya ingin membagikan pekerjaan saya sendiri yang lebih cepat daripada dikt Manajer dan lebih sederhana serta lebih stabil daripada pustaka pyshmht yang menggunakan banyak memori dan tidak berfungsi untuk Mac OS. Meskipun dict saya hanya berfungsi untuk string biasa dan saat ini tidak dapat diubah. Saya menggunakan implementasi probing linier dan menyimpan kunci dan pasangan nilai dalam blok memori terpisah setelah tabel.

from mmap import mmap
import struct
from timeit import default_timer
from multiprocessing import Manager
from pyshmht import HashTable


class shared_immutable_dict:
    def __init__(self, a):
        self.hs = 1 << (len(a) * 3).bit_length()
        kvp = self.hs * 4
        ht = [0xffffffff] * self.hs
        kvl = []
        for k, v in a.iteritems():
            h = self.hash(k)
            while ht[h] != 0xffffffff:
                h = (h + 1) & (self.hs - 1)
            ht[h] = kvp
            kvp += self.kvlen(k) + self.kvlen(v)
            kvl.append(k)
            kvl.append(v)

        self.m = mmap(-1, kvp)
        for p in ht:
            self.m.write(uint_format.pack(p))
        for x in kvl:
            if len(x) <= 0x7f:
                self.m.write_byte(chr(len(x)))
            else:
                self.m.write(uint_format.pack(0x80000000 + len(x)))
            self.m.write(x)

    def hash(self, k):
        h = hash(k)
        h = (h + (h >> 3) + (h >> 13) + (h >> 23)) * 1749375391 & (self.hs - 1)
        return h

    def get(self, k, d=None):
        h = self.hash(k)
        while True:
            x = uint_format.unpack(self.m[h * 4:h * 4 + 4])[0]
            if x == 0xffffffff:
                return d
            self.m.seek(x)
            if k == self.read_kv():
                return self.read_kv()
            h = (h + 1) & (self.hs - 1)

    def read_kv(self):
        sz = ord(self.m.read_byte())
        if sz & 0x80:
            sz = uint_format.unpack(chr(sz) + self.m.read(3))[0] - 0x80000000
        return self.m.read(sz)

    def kvlen(self, k):
        return len(k) + (1 if len(k) <= 0x7f else 4)

    def __contains__(self, k):
        return self.get(k, None) is not None

    def close(self):
        self.m.close()

uint_format = struct.Struct('>I')


def uget(a, k, d=None):
    return to_unicode(a.get(to_str(k), d))


def uin(a, k):
    return to_str(k) in a


def to_unicode(s):
    return s.decode('utf-8') if isinstance(s, str) else s


def to_str(s):
    return s.encode('utf-8') if isinstance(s, unicode) else s


def mmap_test():
    n = 1000000
    d = shared_immutable_dict({str(i * 2): '1' for i in xrange(n)})
    start_time = default_timer()
    for i in xrange(n):
        if bool(d.get(str(i))) != (i % 2 == 0):
            raise Exception(i)
    print 'mmap speed: %d gets per sec' % (n / (default_timer() - start_time))


def manager_test():
    n = 100000
    d = Manager().dict({str(i * 2): '1' for i in xrange(n)})
    start_time = default_timer()
    for i in xrange(n):
        if bool(d.get(str(i))) != (i % 2 == 0):
            raise Exception(i)
    print 'manager speed: %d gets per sec' % (n / (default_timer() - start_time))


def shm_test():
    n = 1000000
    d = HashTable('tmp', n)
    d.update({str(i * 2): '1' for i in xrange(n)})
    start_time = default_timer()
    for i in xrange(n):
        if bool(d.get(str(i))) != (i % 2 == 0):
            raise Exception(i)
    print 'shm speed: %d gets per sec' % (n / (default_timer() - start_time))


if __name__ == '__main__':
    mmap_test()
    manager_test()
    shm_test()

Hasil kinerja laptop saya adalah:

mmap speed: 247288 gets per sec
manager speed: 33792 gets per sec
shm speed: 691332 gets per sec

contoh penggunaan sederhana:

ht = shared_immutable_dict({'a': '1', 'b': '2'})
print ht.get('a')
alyaxey
sumber
14
Github? Dokumentasi? bagaimana kita bisa menggunakan alat ini?
Pavlos Panteliadis
10

Selain @ senderle di sini, beberapa mungkin juga bertanya-tanya bagaimana cara menggunakan fungsionalitas multiprocessing.Pool.

Hal yang menyenangkan adalah bahwa ada .Pool()metode ke managerinstance yang meniru semua API level atas yang sudah dikenal multiprocessing.

from itertools import repeat
import multiprocessing as mp
import os
import pprint

def f(d: dict) -> None:
    pid = os.getpid()
    d[pid] = "Hi, I was written by process %d" % pid

if __name__ == '__main__':
    with mp.Manager() as manager:
        d = manager.dict()
        with manager.Pool() as pool:
            pool.map(f, repeat(d, 10))
        # `d` is a DictProxy object that can be converted to dict
        pprint.pprint(dict(d))

Keluaran:

$ python3 mul.py 
{22562: 'Hi, I was written by process 22562',
 22563: 'Hi, I was written by process 22563',
 22564: 'Hi, I was written by process 22564',
 22565: 'Hi, I was written by process 22565',
 22566: 'Hi, I was written by process 22566',
 22567: 'Hi, I was written by process 22567',
 22568: 'Hi, I was written by process 22568',
 22569: 'Hi, I was written by process 22569',
 22570: 'Hi, I was written by process 22570',
 22571: 'Hi, I was written by process 22571'}

Ini adalah contoh yang sedikit berbeda di mana setiap proses hanya mencatat ID prosesnya ke DictProxyobjek global d.

Brad Solomon
sumber
3

Mungkin Anda dapat mencoba pyshmht , berbagi ekstensi tabel hash berbasis memori untuk Python.

Memperhatikan

  1. Ini tidak sepenuhnya diuji, hanya untuk referensi Anda.

  2. Saat ini tidak ada mekanisme kunci / sem untuk multiprosesing.

felix021
sumber