Bagaimana cara melakukan pemrograman paralel dengan Python?

141

Untuk C ++, kita dapat menggunakan OpenMP untuk melakukan pemrograman paralel; Namun, OpenMP tidak akan berfungsi untuk Python. Apa yang harus saya lakukan jika saya ingin memparalelkan beberapa bagian dari program python saya?

Struktur kode dapat dianggap sebagai:

solve1(A)
solve2(B)

Di mana solve1dan solve2dua fungsi independen. Bagaimana menjalankan kode semacam ini secara paralel dan bukan secara berurutan untuk mengurangi waktu berjalan? Semoga ada yang bisa membantu saya. Terima kasih banyak sebelumnya. Kode tersebut adalah:

def solve(Q, G, n):
    i = 0
    tol = 10 ** -4

    while i < 1000:
        inneropt, partition, x = setinner(Q, G, n)
        outeropt = setouter(Q, G, n)

        if (outeropt - inneropt) / (1 + abs(outeropt) + abs(inneropt)) < tol:
            break

        node1 = partition[0]
        node2 = partition[1]

        G = updateGraph(G, node1, node2)

        if i == 999:
            print "Maximum iteration reaches"
    print inneropt

Di mana setinner dan setouter adalah dua fungsi independen. Di situlah saya ingin paralel ...

ilovecp3
sumber
31
Lihatlah multiprocessing . Catatan: Utas Python tidak cocok untuk tugas yang terikat CPU, hanya untuk I / O-terikat.
9000
4
@ 9000 +100 Internet untuk menyebutkan tugas yang bergantung pada CPU vs I / O.
Hyperboreus
@ 9000 Sebenarnya utas tidak cocok sama sekali untuk tugas yang terikat CPU sejauh yang saya tahu! Proses adalah cara untuk pergi ketika melakukan tugas-tugas yang terikat CPU nyata.
Omar Al-Ithawi
6
@OmarIthawi: mengapa, utas berfungsi dengan baik jika Anda memiliki banyak core CPU (seperti biasa sekarang). Kemudian proses Anda dapat menjalankan beberapa utas memuat semua inti ini secara paralel dan berbagi data umum di antara mereka secara implisit (yaitu, tanpa memiliki area memori bersama yang eksplisit atau pesan antar-proses).
9000
1
@ user2134774: Ya, ya, komentar kedua saya tidak masuk akal. Mungkin satu-satunya ekstensi C yang merilis GIL dapat mengambil manfaat dari itu; mis. bagian NumPy dan Panda melakukan itu. Pada kasus lain, itu salah (tapi saya tidak bisa mengeditnya sekarang).
9000

Jawaban:

162

Anda dapat menggunakan multiprocessing modul . Untuk kasus ini saya mungkin menggunakan kumpulan pemrosesan:

from multiprocessing import Pool
pool = Pool()
result1 = pool.apply_async(solve1, [A])    # evaluate "solve1(A)" asynchronously
result2 = pool.apply_async(solve2, [B])    # evaluate "solve2(B)" asynchronously
answer1 = result1.get(timeout=10)
answer2 = result2.get(timeout=10)

Ini akan menelurkan proses yang dapat melakukan pekerjaan umum untuk Anda. Karena kami tidak lulus processes, itu akan menelurkan satu proses untuk setiap inti CPU pada mesin Anda. Setiap inti CPU dapat menjalankan satu proses secara bersamaan.

Jika Anda ingin memetakan daftar ke satu fungsi Anda akan melakukan ini:

args = [A, B]
results = pool.map(solve1, args)

Jangan gunakan utas karena GIL mengunci operasi apa pun pada objek python.

Matt Williamson
sumber
1
apakah pool.mapmenerima kamus sebagai args? Atau hanya daftar sederhana?
The Bndr
Daftar saja saya pikir. Tapi Anda bisa memasukkan dict.items () yang akan menjadi daftar tuple nilai kunci
Matt Williamson
Sayangnya, ini berakhir dengan kesalahan `tipe yang tidak dapat ditemukan: 'list'`
The Bndr
selain komentar terakhir saya: `dict.items ()` work. Kesalahan memunculkan, karena saya harus mengubah penanganan variabel wawasan proses-funktion. Sayangnya pesan kesalahannya tidak terlalu membantu ... Jadi: terima kasih atas petunjuk Anda. :-)
The Bndr
2
Apa batas waktu di sini?
gamma
26

Ini bisa dilakukan dengan sangat elegan bersama Ray .

Untuk memparalelkan contoh Anda, Anda harus mendefinisikan fungsi Anda dengan @ray.remotedekorator, dan kemudian memohonnya .remote.

import ray

ray.init()

# Define the functions.

@ray.remote
def solve1(a):
    return 1

@ray.remote
def solve2(b):
    return 2

# Start two tasks in the background.
x_id = solve1.remote(0)
y_id = solve2.remote(1)

# Block until the tasks are done and get the results.
x, y = ray.get([x_id, y_id])

Ada beberapa kelebihan dari hal ini dibandingkan modul multiprosesor .

  1. Kode yang sama akan dijalankan pada mesin multicore serta sekelompok mesin.
  2. Memproses berbagi data secara efisien melalui memori bersama dan serialisasi tanpa salinan .
  3. Pesan kesalahan disebarkan dengan baik.
  4. Panggilan fungsi ini dapat disusun bersama, misalnya,

    @ray.remote
    def f(x):
        return x + 1
    
    x_id = f.remote(1)
    y_id = f.remote(x_id)
    z_id = f.remote(y_id)
    ray.get(z_id)  # returns 4
  5. Selain menjalankan fungsi dari jarak jauh, kelas dapat dipakai dari jauh sebagai aktor .

Perhatikan bahwa Ray adalah kerangka kerja yang telah saya bantu kembangkan.

Robert Nishihara
sumber
saya terus mendapatkan kesalahan yang mengatakan "Tidak dapat menemukan versi yang memenuhi persyaratan ray (dari versi
:)
2
Biasanya kesalahan semacam ini berarti Anda perlu memutakhirkan pip. Saya sarankan mencoba pip install --upgrade pip. Jika Anda perlu menggunakan sudosama sekali maka ada kemungkinan bahwa versi pipyang Anda gunakan untuk menginstal raytidak sama dengan yang ditingkatkan. Anda bisa memeriksanya pip --version. Juga, Windows saat ini tidak didukung jadi jika Anda menggunakan Windows itu mungkin masalahnya.
Robert Nishihara
1
Hanya sebuah catatan, ini terutama untuk mendistribusikan pekerjaan bersamaan melalui beberapa mesin.
Matt Williamson
2
Ini sebenarnya dioptimalkan untuk kasus mesin tunggal dan pengaturan cluster. Banyak keputusan desain (misalnya, memori bersama, serialisasi tanpa salinan) ditargetkan untuk mendukung mesin tunggal dengan baik.
Robert Nishihara
2
Akan lebih bagus jika dokter menunjukkan lebih banyak. Saya mendapatkan pengertian dari membaca dokumen bahwa itu tidak benar-benar dimaksudkan untuk kasus mesin tunggal.
Sledge
4

Solusinya, seperti yang orang lain katakan, adalah menggunakan banyak proses. Namun kerangka mana yang lebih tepat tergantung pada banyak faktor. Selain yang sudah disebutkan, ada juga charm4py dan mpi4py (saya adalah pengembang charm4py).

Ada cara yang lebih efisien untuk menerapkan contoh di atas daripada menggunakan abstraksi kumpulan pekerja. Loop utama mengirimkan parameter yang sama (termasuk grafik lengkap G) berulang kali ke pekerja di masing-masing dari 1000 iterasi. Karena setidaknya satu pekerja akan berada pada proses yang berbeda, ini melibatkan menyalin dan mengirimkan argumen ke proses lain. Ini bisa sangat mahal tergantung pada ukuran benda. Sebagai gantinya, masuk akal untuk meminta pekerja menyimpan status dan hanya mengirim informasi yang diperbarui.

Misalnya, dalam charm4py ini dapat dilakukan seperti ini:

class Worker(Chare):

    def __init__(self, Q, G, n):
        self.G = G
        ...

    def setinner(self, node1, node2):
        self.updateGraph(node1, node2)
        ...


def solve(Q, G, n):
    # create 2 workers, each on a different process, passing the initial state
    worker_a = Chare(Worker, onPE=0, args=[Q, G, n])
    worker_b = Chare(Worker, onPE=1, args=[Q, G, n])
    while i < 1000:
        result_a = worker_a.setinner(node1, node2, ret=True)  # execute setinner on worker A
        result_b = worker_b.setouter(node1, node2, ret=True)  # execute setouter on worker B

        inneropt, partition, x = result_a.get()  # wait for result from worker A
        outeropt = result_b.get()  # wait for result from worker B
        ...

Perhatikan bahwa untuk contoh ini kita hanya perlu satu pekerja. Loop utama dapat menjalankan salah satu fungsi, dan meminta pekerja mengeksekusi yang lain. Tetapi kode saya membantu mengilustrasikan beberapa hal:

  1. Worker A berjalan dalam proses 0 (sama seperti loop utama). Sementara result_a.get()diblokir menunggu hasil, pekerja A melakukan perhitungan dalam proses yang sama.
  2. Argumen secara otomatis diteruskan dengan merujuk ke pekerja A, karena itu dalam proses yang sama (tidak ada penyalinan yang terlibat).
Juan Galvez
sumber
2

Dalam beberapa kasus, dimungkinkan untuk secara otomatis menyejajarkan loop menggunakan Numba , meskipun itu hanya bekerja dengan subset kecil dari Python:

from numba import njit, prange

@njit(parallel=True)
def prange_test(A):
    s = 0
    # Without "parallel=True" in the jit-decorator
    # the prange statement is equivalent to range
    for i in prange(A.shape[0]):
        s += A[i]
    return s

Sayangnya, tampaknya Numba hanya bekerja dengan array Numpy, tetapi tidak dengan objek Python lainnya. Secara teori, mungkin juga untuk mengkompilasi Python ke C ++ dan kemudian secara otomatis memparalelasinya menggunakan kompiler Intel C ++ , meskipun saya belum mencobanya.

Anderson Green
sumber
2

Anda dapat menggunakan joblibpustaka untuk melakukan komputasi paralel dan multiprosesing.

from joblib import Parallel, delayed

Anda cukup membuat fungsi fooyang ingin Anda jalankan secara paralel dan berdasarkan pada potongan kode berikut yang mengimplementasikan pemrosesan paralel:

output = Parallel(n_jobs=num_cores)(delayed(foo)(i) for i in input)

Di mana num_coresdapat diperoleh dari multiprocessingperpustakaan sebagai berikut:

import multiprocessing

num_cores = multiprocessing.cpu_count()

Jika Anda memiliki fungsi dengan lebih dari satu argumen input, dan Anda hanya ingin mengulangi salah satu argumen dengan daftar, Anda dapat menggunakan partialfungsi dari functoolspustaka sebagai berikut:

from joblib import Parallel, delayed
import multiprocessing
from functools import partial
def foo(arg1, arg2, arg3, arg4):
    '''
    body of the function
    '''
    return output
input = [11,32,44,55,23,0,100,...] # arbitrary list
num_cores = multiprocessing.cpu_count()
foo_ = partial(foo, arg2=arg2, arg3=arg3, arg4=arg4)
# arg1 is being fetched from input list
output = Parallel(n_jobs=num_cores)(delayed(foo_)(i) for i in input)

Anda dapat menemukan penjelasan lengkap tentang python dan R multiprocessing dengan beberapa contoh di sini .

vahab najari
sumber