Gunakan numpy array di memori bersama untuk multiprocessing

111

Saya ingin menggunakan array numpy dalam memori bersama untuk digunakan dengan modul multiprocessing. Kesulitannya adalah menggunakannya seperti array numpy, dan bukan hanya sebagai array ctypes.

from multiprocessing import Process, Array
import scipy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    arr = Array('d', unshared_arr)
    print "Originally, the first two elements of arr = %s"%(arr[:2])

    # Create, start, and finish the child processes
    p = Process(target=f, args=(arr,))
    p.start()
    p.join()

    # Printing out the changed values
    print "Now, the first two elements of arr = %s"%arr[:2]

Ini menghasilkan keluaran seperti:

Originally, the first two elements of arr = [0.3518653236697369, 0.517794725524976]
Now, the first two elements of arr = [-0.3518653236697369, 0.517794725524976]

Array dapat diakses dengan cara ctypes, misalnya arr[i]masuk akal. Namun, ini bukan array numpy, dan saya tidak dapat melakukan operasi seperti -1*arr, atau arr.sum(). Saya kira solusinya adalah mengubah array ctypes menjadi array numpy. Namun (selain tidak dapat membuat ini berfungsi), saya tidak percaya itu akan dibagikan lagi.

Sepertinya akan ada solusi standar untuk apa yang harus menjadi masalah bersama.

Ian Langmore
sumber
1
Tidak sama dengan yang ini? stackoverflow.com/questions/5033799/…
pygabriel
1
Ini bukan pertanyaan yang persis sama. Pertanyaan terkait adalah menanyakan tentang subprocessdaripada multiprocessing.
Andrew

Jawaban:

82

Untuk menambah jawaban @ unutbu (tidak tersedia lagi) dan @Henry Gomersall. Anda dapat menggunakan shared_arr.get_lock()untuk menyinkronkan akses bila diperlukan:

shared_arr = mp.Array(ctypes.c_double, N)
# ...
def f(i): # could be anything numpy accepts as an index such another numpy array
    with shared_arr.get_lock(): # synchronize access
        arr = np.frombuffer(shared_arr.get_obj()) # no data copying
        arr[i] = -arr[i]

Contoh

import ctypes
import logging
import multiprocessing as mp

from contextlib import closing

import numpy as np

info = mp.get_logger().info

def main():
    logger = mp.log_to_stderr()
    logger.setLevel(logging.INFO)

    # create shared array
    N, M = 100, 11
    shared_arr = mp.Array(ctypes.c_double, N)
    arr = tonumpyarray(shared_arr)

    # fill with random values
    arr[:] = np.random.uniform(size=N)
    arr_orig = arr.copy()

    # write to arr from different processes
    with closing(mp.Pool(initializer=init, initargs=(shared_arr,))) as p:
        # many processes access the same slice
        stop_f = N // 10
        p.map_async(f, [slice(stop_f)]*M)

        # many processes access different slices of the same array
        assert M % 2 # odd
        step = N // 10
        p.map_async(g, [slice(i, i + step) for i in range(stop_f, N, step)])
    p.join()
    assert np.allclose(((-1)**M)*tonumpyarray(shared_arr), arr_orig)

def init(shared_arr_):
    global shared_arr
    shared_arr = shared_arr_ # must be inherited, not passed as an argument

def tonumpyarray(mp_arr):
    return np.frombuffer(mp_arr.get_obj())

def f(i):
    """synchronized."""
    with shared_arr.get_lock(): # synchronize access
        g(i)

def g(i):
    """no synchronization."""
    info("start %s" % (i,))
    arr = tonumpyarray(shared_arr)
    arr[i] = -1 * arr[i]
    info("end   %s" % (i,))

if __name__ == '__main__':
    mp.freeze_support()
    main()

Jika Anda tidak memerlukan akses tersinkronisasi atau Anda membuat kunci sendiri, maka mp.Array()itu tidak perlu. Anda bisa menggunakan mp.sharedctypes.RawArraydalam kasus ini.

jfs
sumber
2
Jawaban yang bagus! Jika saya ingin memiliki lebih dari satu array bersama, masing-masing dapat dikunci secara terpisah, tetapi dengan jumlah array yang ditentukan saat runtime, apakah itu perpanjangan langsung dari apa yang telah Anda lakukan di sini?
Andrew
3
@Andrew: array bersama harus dibuat sebelum proses turunan muncul.
jfs
Poin bagus tentang urutan operasi. Itulah yang ada dalam pikiran saya: buat jumlah array bersama yang ditentukan pengguna, lalu buat beberapa proses anak. Apakah itu mudah?
Andrew
1
@Chicony: Anda tidak dapat mengubah ukuran Array. Anggap saja sebagai blok memori bersama yang harus dialokasikan sebelum proses anak dimulai. Anda tidak perlu menggunakan semua memori, misalnya, Anda dapat meneruskan countke numpy.frombuffer(). Anda dapat mencoba melakukannya pada tingkat yang lebih rendah dengan menggunakan mmapatau sesuatu seperti posix_ipcsecara langsung untuk mengimplementasikan ukuran yang dapat diubah ukurannya (mungkin melibatkan penyalinan saat mengubah ukuran) analog RawArray (atau mencari perpustakaan yang ada). Atau jika tugas Anda memungkinkan: salin data menjadi beberapa bagian (jika Anda tidak membutuhkan semuanya sekaligus). "Bagaimana mengubah ukuran memori bersama" adalah pertanyaan terpisah yang bagus.
jfs
1
@umopapisdn: Pool()menentukan jumlah proses (jumlah inti CPU yang tersedia digunakan secara default). Madalah berapa kali f()fungsi dipanggil.
jfs
21

The Arrayobjek memiliki get_obj()metode yang terkait dengan itu, yang mengembalikan ctypes array yang yang menyajikan antarmuka penyangga. Saya pikir berikut ini harus bekerja ...

from multiprocessing import Process, Array
import scipy
import numpy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    a = Array('d', unshared_arr)
    print "Originally, the first two elements of arr = %s"%(a[:2])

    # Create, start, and finish the child process
    p = Process(target=f, args=(a,))
    p.start()
    p.join()

    # Print out the changed values
    print "Now, the first two elements of arr = %s"%a[:2]

    b = numpy.frombuffer(a.get_obj())

    b[0] = 10.0
    print a[0]

Saat dijalankan, ini mencetak elemen pertama yang asekarang menjadi 10.0, menampilkan adan bhanya dua tampilan ke dalam memori yang sama.

Untuk memastikan itu masih multiprosesor aman, saya yakin Anda harus menggunakan metode acquiredan releaseyang ada pada Arrayobjek a, dan kunci bawaannya untuk memastikan semuanya aman diakses (meskipun saya bukan ahli dalam modul multiprosesor).

Henry Gomersall
sumber
itu tidak akan bekerja tanpa sinkronisasi seperti yang ditunjukkan @unutbu dalam jawabannya (sekarang dihapus).
jfs
1
Agaknya, jika Anda hanya ingin mengakses pemrosesan pasca array, itu dapat dilakukan dengan bersih tanpa khawatir tentang masalah konkurensi dan penguncian?
Henry Gomersall
dalam hal ini Anda tidak perlu mp.Array.
jfs
1
Kode pemrosesan mungkin memerlukan larik terkunci, tetapi interpretasi pasca pemrosesan data mungkin belum tentu. Saya rasa ini berasal dari pemahaman apa sebenarnya masalahnya. Jelas, mengakses data bersama secara bersamaan akan membutuhkan perlindungan, yang menurut saya sudah jelas!
Henry Gomersall
16

Meskipun jawaban yang sudah diberikan sudah bagus, ada solusi yang jauh lebih mudah untuk masalah ini asalkan dua syarat terpenuhi:

  1. Anda menggunakan sistem operasi yang sesuai dengan POSIX (misalnya Linux, Mac OSX); dan
  2. Proses anak Anda memerlukan akses hanya baca ke array bersama.

Dalam hal ini Anda tidak perlu mengutak-atik variabel yang dibagikan secara eksplisit, karena proses anak akan dibuat menggunakan garpu. Anak bercabang secara otomatis membagikan ruang memori orang tuanya. Dalam konteks multiprocessing Python, ini berarti ia berbagi semua variabel tingkat modul ; perhatikan bahwa ini tidak berlaku untuk argumen yang Anda teruskan secara eksplisit ke proses anak Anda atau ke fungsi yang Anda panggil multiprocessing.Poolatau lebih.

Contoh sederhana:

import multiprocessing
import numpy as np

# will hold the (implicitly mem-shared) data
data_array = None

# child worker function
def job_handler(num):
    # built-in id() returns unique memory ID of a variable
    return id(data_array), np.sum(data_array)

def launch_jobs(data, num_jobs=5, num_worker=4):
    global data_array
    data_array = data

    pool = multiprocessing.Pool(num_worker)
    return pool.map(job_handler, range(num_jobs))

# create some random data and execute the child jobs
mem_ids, sumvals = zip(*launch_jobs(np.random.rand(10)))

# this will print 'True' on POSIX OS, since the data was shared
print(np.all(np.asarray(mem_ids) == id(data_array)))
EelkeSpaak
sumber
3
+1 Info yang sangat berharga. Bisakah Anda menjelaskan mengapa hanya vars tingkat modul yang dibagikan? Mengapa vars lokal bukan bagian dari ruang memori orang tua? Misalnya, mengapa ini tidak dapat berfungsi jika saya memiliki fungsi F dengan var lokal V dan fungsi G di dalam F yang mereferensikan V?
Coffee_Table
5
Peringatan: Jawaban ini sedikit menipu. Proses anak menerima salinan status proses induk, termasuk variabel global, pada saat percabangan. Negara bagian sama sekali tidak disinkronkan dan akan menyimpang sejak saat itu. Teknik ini mungkin berguna dalam beberapa skenario (misalnya: membagi proses anak ad-hoc yang masing-masing menangani snapshot dari proses induk dan kemudian menghentikannya), tetapi tidak berguna di skenario lain (misalnya: proses anak yang berjalan lama yang harus berbagi dan sinkronkan data dengan proses induk).
David Stein
4
@EelkeSpaak: Pernyataan Anda - "anak bercabang secara otomatis berbagi ruang memori orang tuanya" - tidak benar. Jika saya memiliki proses turunan yang ingin memantau status proses induk, dengan cara yang hanya dapat dibaca secara ketat, percabangan tidak akan membawa saya ke sana: anak hanya melihat cuplikan dari status induk pada saat pencabangan. Sebenarnya, itulah yang saya coba lakukan (mengikuti jawaban Anda) ketika saya menemukan batasan ini. Karenanya catatan tambahan untuk jawaban Anda. Singkatnya: Status induk tidak "dibagikan", tetapi hanya disalin ke anak. Itu bukanlah "berbagi" dalam arti biasa.
David Stein
2
Apakah saya salah mengira ini adalah situasi copy-on-write, setidaknya pada sistem posix? Artinya, setelah percabangan, saya pikir memori dibagi sampai data baru ditulis, di mana salinan dibuat. Jadi ya, memang benar bahwa data tidak "dibagikan" dengan tepat, tetapi dapat memberikan peningkatan kinerja yang berpotensi besar. Jika proses Anda hanya baca, maka tidak akan ada overhead penyalinan! Apakah saya memahami maksudnya dengan benar?
senderle
2
@senderle Ya, itulah yang saya maksud! Karenanya poin saya (2) dalam jawaban tentang akses hanya-baca.
EelkeSpaak
11

Saya telah menulis modul python kecil yang menggunakan memori bersama POSIX untuk berbagi array numpy antara interpreter python. Mungkin Anda akan menemukannya berguna.

https://pypi.python.org/pypi/SharedArray

Begini cara kerjanya:

import numpy as np
import SharedArray as sa

# Create an array in shared memory
a = sa.create("test1", 10)

# Attach it as a different array. This can be done from another
# python interpreter as long as it runs on the same computer.
b = sa.attach("test1")

# See how they are actually sharing the same memory block
a[0] = 42
print(b[0])

# Destroying a does not affect b.
del a
print(b[0])

# See how "test1" is still present in shared memory even though we
# destroyed the array a.
sa.list()

# Now destroy the array "test1" from memory.
sa.delete("test1")

# The array b is not affected, but once you destroy it then the
# data are lost.
print(b[0])
tikar
sumber
8

Anda dapat menggunakan sharedmemmodul: https://bitbucket.org/cleemesser/numpy-sharedmem

Inilah kode asli Anda, kali ini menggunakan memori bersama yang berperilaku seperti array NumPy (perhatikan pernyataan terakhir tambahan yang memanggil sum()fungsi NumPy ):

from multiprocessing import Process
import sharedmem
import scipy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    arr = sharedmem.empty(N)
    arr[:] = unshared_arr.copy()
    print "Originally, the first two elements of arr = %s"%(arr[:2])

    # Create, start, and finish the child process
    p = Process(target=f, args=(arr,))
    p.start()
    p.join()

    # Print out the changed values
    print "Now, the first two elements of arr = %s"%arr[:2]

    # Perform some NumPy operation
    print arr.sum()
Velimir Mlaker
sumber
1
Catatan: ini tidak lagi dikembangkan dan tampaknya tidak berfungsi di linux github.com/sturlamolden/sharedmem-numpy/issues/4
AD
numpy-sharedmem mungkin tidak dalam pengembangan, tetapi masih berfungsi di Linux, lihat github.com/vmlaker/benchmark-sharedmem .
Velimir Mlaker