Objek memori bersama dalam multiprosesing

124

Misalkan saya memiliki array numpy memori yang besar, saya memiliki fungsi funcyang mengambil array raksasa ini sebagai input (bersama dengan beberapa parameter lain). funcdengan parameter yang berbeda dapat dijalankan secara paralel. Sebagai contoh:

def func(arr, param):
    # do stuff to arr, param

# build array arr

pool = Pool(processes = 6)
results = [pool.apply_async(func, [arr, param]) for param in all_params]
output = [res.get() for res in results]

Jika saya menggunakan perpustakaan multiprosesing, maka array raksasa itu akan disalin beberapa kali ke dalam proses yang berbeda.

Apakah ada cara untuk membiarkan proses yang berbeda berbagi larik yang sama? Objek array ini hanya-baca dan tidak akan pernah diubah.

Apa yang lebih rumit, jika arr bukanlah sebuah array, tetapi sebuah objek python yang berubah-ubah, apakah ada cara untuk membagikannya?

[DIEDIT]

Saya membaca jawabannya tetapi saya masih agak bingung. Karena fork () adalah copy-on-write, kita tidak boleh meminta biaya tambahan saat melakukan proses baru di pustaka multiprosesing python. Tetapi kode berikut menunjukkan ada overhead yang sangat besar:

from multiprocessing import Pool, Manager
import numpy as np; 
import time

def f(arr):
    return len(arr)

t = time.time()
arr = np.arange(10000000)
print "construct array = ", time.time() - t;


pool = Pool(processes = 6)

t = time.time()
res = pool.apply_async(f, [arr,])
res.get()
print "multiprocessing overhead = ", time.time() - t;

output (dan omong-omong, biaya meningkat seiring dengan peningkatan ukuran array, jadi saya curiga masih ada overhead yang terkait dengan penyalinan memori):

construct array =  0.0178790092468
multiprocessing overhead =  0.252444982529

Mengapa ada overhead yang begitu besar, jika kita tidak menyalin array? Dan bagian apa dari memori bersama yang menyelamatkan saya?

Vendetta
sumber
Anda telah melihat dokumennya , bukan?
Lev Levitsky
@FrancisAvila, adakah cara untuk membagikan tidak hanya array, tetapi objek python yang sewenang-wenang?
Vendetta
1
@LevLevitsky Saya harus bertanya, apakah ada cara untuk membagikan tidak hanya array, tetapi objek python yang sewenang-wenang?
Vendetta
2
Jawaban ini menjelaskan dengan baik mengapa objek Python arbitrer tidak dapat dibagikan.
Janne Karila

Jawaban:

121

Jika Anda menggunakan sistem operasi yang menggunakan fork()semantik salin-saat-tulis (seperti unix umum lainnya), maka selama Anda tidak pernah mengubah struktur data Anda, itu akan tersedia untuk semua proses turunan tanpa menggunakan memori tambahan. Anda tidak perlu melakukan sesuatu yang khusus (kecuali pastikan Anda benar-benar tidak mengubah objek).

Hal paling efisien yang dapat Anda lakukan untuk masalah Anda adalah mengemas array Anda ke dalam struktur array yang efisien (menggunakan numpyatau array), menempatkannya di memori bersama, membungkusnya multiprocessing.Array, dan meneruskannya ke fungsi Anda. Jawaban ini menunjukkan bagaimana melakukan itu .

Jika Anda menginginkan objek bersama yang dapat ditulisi , maka Anda perlu membungkusnya dengan semacam sinkronisasi atau penguncian. multiprocessingmenyediakan dua metode untuk melakukan ini : satu menggunakan memori bersama (cocok untuk nilai sederhana, array, atau ctypes) atau Managerproxy, di mana satu proses menyimpan memori dan manajer mengatur akses ke sana dari proses lain (bahkan melalui jaringan).

The Managerpendekatan dapat digunakan dengan sewenang-wenang Python objek, tetapi akan lebih lambat dari setara memori bersama menggunakan karena benda perlu serial / deserialized dan dikirim antara proses.

Ada banyak pustaka dan pendekatan pemrosesan paralel yang tersedia dengan Python . multiprocessingadalah perpustakaan yang sangat baik dan menyeluruh, tetapi jika Anda memiliki kebutuhan khusus mungkin salah satu pendekatan lain mungkin lebih baik.

Francis Avila
sumber
25
Sebagai catatan, pada Python fork () sebenarnya berarti copy pada akses (karena hanya dengan mengakses objek akan mengubah ref-count-nya).
Fabio Zadrozny
3
@FabioZadrozny Apakah itu benar-benar menyalin seluruh objek, atau hanya halaman memori yang berisi refcount-nya?
zigg
5
AFAIK, hanya halaman memori yang berisi refcount (jadi, 4kb pada setiap akses objek).
Fabio Zadrozny
1
@ Max Gunakan penutupan. Fungsi yang diberikan ke apply_asyncharus mereferensikan objek bersama dalam cakupan secara langsung, bukan melalui argumennya.
Francis Avila
3
@FrancisAvila bagaimana Anda menggunakan closure? Bukankah seharusnya fungsi yang Anda berikan untuk apply_async bisa dipilih? Atau ini hanya batasan map_async?
GermanK
17

Saya mengalami masalah yang sama dan menulis sedikit kelas utilitas memori bersama untuk mengatasinya.

Saya menggunakan multiprocessing.RawArray(lockfree), dan juga akses ke array tidak disinkronkan sama sekali (lockfree), berhati-hatilah untuk tidak menembak kaki Anda sendiri.

Dengan solusi ini saya mendapatkan percepatan dengan faktor kira-kira 3 pada quad-core i7.

Berikut kodenya: Jangan ragu untuk menggunakan dan meningkatkannya, dan laporkan kembali setiap bug.

'''
Created on 14.05.2013

@author: martin
'''

import multiprocessing
import ctypes
import numpy as np

class SharedNumpyMemManagerError(Exception):
    pass

'''
Singleton Pattern
'''
class SharedNumpyMemManager:    

    _initSize = 1024

    _instance = None

    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            cls._instance = super(SharedNumpyMemManager, cls).__new__(
                                cls, *args, **kwargs)
        return cls._instance        

    def __init__(self):
        self.lock = multiprocessing.Lock()
        self.cur = 0
        self.cnt = 0
        self.shared_arrays = [None] * SharedNumpyMemManager._initSize

    def __createArray(self, dimensions, ctype=ctypes.c_double):

        self.lock.acquire()

        # double size if necessary
        if (self.cnt >= len(self.shared_arrays)):
            self.shared_arrays = self.shared_arrays + [None] * len(self.shared_arrays)

        # next handle
        self.__getNextFreeHdl()        

        # create array in shared memory segment
        shared_array_base = multiprocessing.RawArray(ctype, np.prod(dimensions))

        # convert to numpy array vie ctypeslib
        self.shared_arrays[self.cur] = np.ctypeslib.as_array(shared_array_base)

        # do a reshape for correct dimensions            
        # Returns a masked array containing the same data, but with a new shape.
        # The result is a view on the original array
        self.shared_arrays[self.cur] = self.shared_arrays[self.cnt].reshape(dimensions)

        # update cnt
        self.cnt += 1

        self.lock.release()

        # return handle to the shared memory numpy array
        return self.cur

    def __getNextFreeHdl(self):
        orgCur = self.cur
        while self.shared_arrays[self.cur] is not None:
            self.cur = (self.cur + 1) % len(self.shared_arrays)
            if orgCur == self.cur:
                raise SharedNumpyMemManagerError('Max Number of Shared Numpy Arrays Exceeded!')

    def __freeArray(self, hdl):
        self.lock.acquire()
        # set reference to None
        if self.shared_arrays[hdl] is not None: # consider multiple calls to free
            self.shared_arrays[hdl] = None
            self.cnt -= 1
        self.lock.release()

    def __getArray(self, i):
        return self.shared_arrays[i]

    @staticmethod
    def getInstance():
        if not SharedNumpyMemManager._instance:
            SharedNumpyMemManager._instance = SharedNumpyMemManager()
        return SharedNumpyMemManager._instance

    @staticmethod
    def createArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__createArray(*args, **kwargs)

    @staticmethod
    def getArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__getArray(*args, **kwargs)

    @staticmethod    
    def freeArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__freeArray(*args, **kwargs)

# Init Singleton on module load
SharedNumpyMemManager.getInstance()

if __name__ == '__main__':

    import timeit

    N_PROC = 8
    INNER_LOOP = 10000
    N = 1000

    def propagate(t):
        i, shm_hdl, evidence = t
        a = SharedNumpyMemManager.getArray(shm_hdl)
        for j in range(INNER_LOOP):
            a[i] = i

    class Parallel_Dummy_PF:

        def __init__(self, N):
            self.N = N
            self.arrayHdl = SharedNumpyMemManager.createArray(self.N, ctype=ctypes.c_double)            
            self.pool = multiprocessing.Pool(processes=N_PROC)

        def update_par(self, evidence):
            self.pool.map(propagate, zip(range(self.N), [self.arrayHdl] * self.N, [evidence] * self.N))

        def update_seq(self, evidence):
            for i in range(self.N):
                propagate((i, self.arrayHdl, evidence))

        def getArray(self):
            return SharedNumpyMemManager.getArray(self.arrayHdl)

    def parallelExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_par(5)
        print(pf.getArray())

    def sequentialExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_seq(5)
        print(pf.getArray())

    t1 = timeit.Timer("sequentialExec()", "from __main__ import sequentialExec")
    t2 = timeit.Timer("parallelExec()", "from __main__ import parallelExec")

    print("Sequential: ", t1.timeit(number=1))    
    print("Parallel: ", t2.timeit(number=1))
martin.preinfalk
sumber
Baru saja menyadari bahwa Anda harus menyiapkan array memori bersama sebelum Anda membuat Kolam multiprosesing, belum tahu mengapa tetapi jelas tidak akan bekerja sebaliknya.
martin.preinfalk
alasannya mengapa Multiprocessing pool memanggil fork () saat Pool dibuat, jadi apa pun setelah itu tidak akan mendapatkan akses ke pointer ke mem bersama yang dibuat setelahnya.
Xiv
Ketika saya mencoba kode ini di bawah py35 saya mendapat pengecualian di multiprocessing.sharedctypes.py, jadi saya kira kode ini hanya untuk py2.
Dr. Hillier Dániel
11

Ini adalah kasus penggunaan yang dimaksudkan untuk Ray , yang merupakan pustaka untuk Python yang paralel dan terdistribusi. Di bawah tenda, itu membuat serial objek menggunakan tata letak data Apache Arrow (yang merupakan format nol-salinan) dan menyimpannya di penyimpanan objek memori bersama sehingga mereka dapat diakses oleh banyak proses tanpa membuat salinan.

Kode tersebut akan terlihat seperti berikut.

import numpy as np
import ray

ray.init()

@ray.remote
def func(array, param):
    # Do stuff.
    return 1

array = np.ones(10**6)
# Store the array in the shared memory object store once
# so it is not copied multiple times.
array_id = ray.put(array)

result_ids = [func.remote(array_id, i) for i in range(4)]
output = ray.get(result_ids)

Jika Anda tidak memanggil ray.putmaka array akan tetap disimpan dalam memori bersama, tetapi itu akan dilakukan sekali per pemanggilan func, yang bukan itu yang Anda inginkan.

Perhatikan bahwa ini akan bekerja tidak hanya untuk array tetapi juga untuk objek yang berisi array , misalnya, kamus yang memetakan int ke array seperti di bawah ini.

Anda dapat membandingkan kinerja serialisasi di Ray versus acar dengan menjalankan berikut ini di IPython.

import numpy as np
import pickle
import ray

ray.init()

x = {i: np.ones(10**7) for i in range(20)}

# Time Ray.
%time x_id = ray.put(x)  # 2.4s
%time new_x = ray.get(x_id)  # 0.00073s

# Time pickle.
%time serialized = pickle.dumps(x)  # 2.6s
%time deserialized = pickle.loads(serialized)  # 1.9s

Serialisasi dengan Ray hanya sedikit lebih cepat daripada acar, tetapi deserialisasi 1000x lebih cepat karena penggunaan memori bersama (angka ini tentu saja akan bergantung pada objeknya).

Lihat dokumentasi Ray . Anda dapat membaca lebih lanjut tentang serialisasi cepat menggunakan Ray dan Panah . Perhatikan Saya salah satu pengembang Ray.

Robert Nishihara
sumber
1
Ray terdengar bagus! Tapi, saya sudah mencoba menggunakan library ini sebelumnya, tapi sayangnya, saya baru menyadari bahwa Ray tidak mendukung windows. Saya harap kalian dapat mendukung windows ASAP. Terima kasih, pengembang!
Hzzkygcs
6

Seperti yang disebutkan Robert Nishihara, Apache Arrow membuat ini mudah, khususnya dengan penyimpanan objek dalam memori Plasma, yang merupakan dasar dari Ray.

Saya membuat plasma otak khusus untuk alasan ini - memuat dan memuat ulang benda-benda besar dengan cepat di aplikasi Flask. Ini adalah namespace objek memori bersama untuk objek Apache Arrow-serializable, termasuk picklebytestrings d yang dihasilkan oleh pickle.dumps(...).

Perbedaan utama dengan Apache Ray dan Plasma adalah ia melacak ID objek untuk Anda. Setiap proses atau utas atau program yang berjalan secara lokal dapat berbagi nilai variabel dengan memanggil nama dari Brainobjek apa pun .

$ pip install brain-plasma
$ plasma_store -m 10000000 -s /tmp/plasma

from brain_plasma import Brain
brain = Brain(path='/tmp/plasma/)

brain['a'] = [1]*10000

brain['a']
# >>> [1,1,1,1,...]
russellthehippo
sumber