Bagaimana saya bisa memulihkan nilai kembali fungsi yang diteruskan ke multiprocessing.Process?

190

Dalam kode contoh di bawah ini, saya ingin memulihkan nilai kembali fungsi worker. Bagaimana saya bisa melakukan ini? Di mana nilai ini disimpan?

Kode Contoh:

import multiprocessing

def worker(procnum):
    '''worker function'''
    print str(procnum) + ' represent!'
    return procnum


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print jobs

Keluaran:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]

Sepertinya saya tidak dapat menemukan atribut yang relevan pada objek yang disimpan jobs.

blz
sumber

Jawaban:

189

Gunakan variabel bersama untuk berkomunikasi. Misalnya seperti ini:

import multiprocessing

def worker(procnum, return_dict):
    '''worker function'''
    print str(procnum) + ' represent!'
    return_dict[procnum] = procnum


if __name__ == '__main__':
    manager = multiprocessing.Manager()
    return_dict = manager.dict()
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,return_dict))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print return_dict.values()
vartec
sumber
46
Saya akan merekomendasikan menggunakan multiprocessing.Queue, daripada di Managersini. Menggunakan Managermembutuhkan pemijahan proses yang sama sekali baru, yang berlebihan ketika Queueakan dilakukan.
dano
1
@ano: Saya ingin tahu, jika kita menggunakan objek Queue (), kita tidak bisa memastikan urutan kapan setiap proses mengembalikan nilai. Maksud saya jika kita perlu urutan dalam hasil, untuk melakukan pekerjaan selanjutnya. Bagaimana kita bisa yakin di mana tepatnya keluaran mana dari proses mana
Catbuilts
4
@Catbuilts Anda bisa mengembalikan tuple dari setiap proses, di mana satu nilai adalah nilai pengembalian aktual yang Anda pedulikan, dan yang lainnya adalah pengidentifikasi unik dari proses. Tetapi saya juga bertanya-tanya mengapa Anda perlu tahu proses mana yang mengembalikan nilai mana. Jika itu yang sebenarnya perlu Anda ketahui tentang prosesnya, atau apakah Anda perlu menghubungkan antara daftar input Anda dan daftar output? Dalam hal ini, saya akan merekomendasikan menggunakan multiprocessing.Pool.mapuntuk memproses daftar item pekerjaan Anda.
dano
5
peringatan untuk fungsi dengan hanya satu argumen : harus digunakan args=(my_function_argument, ). Perhatikan ,koma di sini! Atau Python akan mengeluh "argumen posisi hilang". Butuh waktu 10 menit untuk mencari tahu. Periksa juga penggunaan manual (di bawah bagian "kelas proses").
yuqli
2
@vartec satu kelemahan dari penggunaan kamus multipriocessing.Manager () adalah acar (bersambung) objek yang dikembalikan, sehingga memiliki bottleneck yang diberikan oleh pustaka acar ukuran maksimum 2GiB untuk objek yang dikembalikan. Apakah ada cara lain untuk melakukan ini menghindari serialisasi objek yang kembali?
hirschme
68

Saya pikir pendekatan yang disarankan oleh @sega_sai adalah yang lebih baik. Tapi itu benar-benar membutuhkan contoh kode, jadi begini:

import multiprocessing
from os import getpid

def worker(procnum):
    print('I am number %d in process %d' % (procnum, getpid()))
    return getpid()

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes = 3)
    print(pool.map(worker, range(5)))

Yang akan mencetak nilai kembali:

I am number 0 in process 19139
I am number 1 in process 19138
I am number 2 in process 19140
I am number 3 in process 19139
I am number 4 in process 19140
[19139, 19138, 19140, 19139, 19140]

Jika Anda terbiasa dengan map(Python 2 built-in) ini seharusnya tidak terlalu menantang. Kalau tidak, lihat tautan sega_Sai .

Perhatikan betapa sedikit kode yang dibutuhkan. (Perhatikan juga bagaimana proses digunakan kembali).

Menandai
sumber
1
Ada ide mengapa saya getpid()mengembalikan semua nilai yang sama? Saya menjalankan Python3
zelusp
Saya tidak yakin bagaimana Pool mendistribusikan tugas kepada pekerja. Mungkin mereka semua bisa berakhir di pekerja yang sama jika mereka sangat cepat? Apakah ini terjadi secara konsisten? Juga jika Anda menambahkan penundaan?
Tandai
Saya juga berpikir itu adalah hal yang berhubungan dengan kecepatan tetapi ketika saya memberi makan pool.map1.000.000 menggunakan lebih dari 10 proses, saya melihat paling banyak dua pids yang berbeda.
zelusp
1
Maka saya tidak yakin. Saya pikir akan menarik untuk membuka pertanyaan terpisah untuk ini.
Markus
Jika hal-hal yang ingin Anda kirim fungsi berbeda untuk setiap proses, gunakan pool.apply_async: docs.python.org/3/library/…
Kyle
24

Contoh ini menunjukkan cara menggunakan daftar multiprosesing. Contoh instance untuk mengembalikan string dari sejumlah proses arbitrer:

import multiprocessing

def worker(procnum, send_end):
    '''worker function'''
    result = str(procnum) + ' represent!'
    print result
    send_end.send(result)

def main():
    jobs = []
    pipe_list = []
    for i in range(5):
        recv_end, send_end = multiprocessing.Pipe(False)
        p = multiprocessing.Process(target=worker, args=(i, send_end))
        jobs.append(p)
        pipe_list.append(recv_end)
        p.start()

    for proc in jobs:
        proc.join()
    result_list = [x.recv() for x in pipe_list]
    print result_list

if __name__ == '__main__':
    main()

Keluaran:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
['0 represent!', '1 represent!', '2 represent!', '3 represent!', '4 represent!']

Solusi ini menggunakan lebih sedikit sumber daya daripada multiprocessing.Queue yang menggunakan

  • sebuah pipa
  • setidaknya satu Kunci
  • penyangga
  • sebuah utas

atau multiprocessing. SimpleQueue yang digunakan

  • sebuah pipa
  • setidaknya satu Kunci

Sangat instruktif untuk melihat sumber untuk masing-masing jenis ini.

David Cullen
sumber
Apa cara terbaik untuk melakukan itu tanpa menjadikan pipa sebagai variabel global?
Nickpick
Saya meletakkan semua data dan kode global ke dalam fungsi utama dan kerjanya sama. Apakah itu menjawab pertanyaan Anda?
David Cullen
apakah pipa selalu harus dibaca sebelum nilai baru dapat ditambahkan (dikirim) ke dalamnya?
Nickpick
+1, jawaban yang bagus. Tetapi tentang solusi yang lebih efisien, tradeoffnya adalah Anda membuat satu Pipeper proses vs satu Queueuntuk semua proses. Saya tidak tahu apakah itu menjadi lebih efisien dalam semua kasus.
sudo
2
Jawaban ini menyebabkan jalan buntu jika objek yang kembali besar. Alih-alih melakukan proc.join () pertama saya akan mencoba recv () nilai kembali dan kemudian lakukan join.
L. Pes
22

Untuk beberapa alasan, saya tidak dapat menemukan contoh umum tentang cara melakukan ini di Queuemana saja (bahkan contoh dokumen Python tidak menelurkan banyak proses), jadi inilah yang saya dapat kerjakan setelah 10 percobaan:

def add_helper(queue, arg1, arg2): # the func called in child processes
    ret = arg1 + arg2
    queue.put(ret)

def multi_add(): # spawns child processes
    q = Queue()
    processes = []
    rets = []
    for _ in range(0, 100):
        p = Process(target=add_helper, args=(q, 1, 2))
        processes.append(p)
        p.start()
    for p in processes:
        ret = q.get() # will block
        rets.append(ret)
    for p in processes:
        p.join()
    return rets

Queueadalah pemblokiran, antrian aman yang dapat Anda gunakan untuk menyimpan nilai kembali dari proses anak. Jadi, Anda harus melewati antrian untuk setiap proses. Sesuatu yang kurang jelas di sini adalah bahwa Anda harus get()dari antrian sebelum Anda joinyang Processes atau antrian mengisi dan blok segalanya.

Pembaruan untuk mereka yang berorientasi objek (diuji dengan Python 3.4):

from multiprocessing import Process, Queue

class Multiprocessor():

    def __init__(self):
        self.processes = []
        self.queue = Queue()

    @staticmethod
    def _wrapper(func, queue, args, kwargs):
        ret = func(*args, **kwargs)
        queue.put(ret)

    def run(self, func, *args, **kwargs):
        args2 = [func, self.queue, args, kwargs]
        p = Process(target=self._wrapper, args=args2)
        self.processes.append(p)
        p.start()

    def wait(self):
        rets = []
        for p in self.processes:
            ret = self.queue.get()
            rets.append(ret)
        for p in self.processes:
            p.join()
        return rets

# tester
if __name__ == "__main__":
    mp = Multiprocessor()
    num_proc = 64
    for _ in range(num_proc): # queue up multiple tasks running `sum`
        mp.run(sum, [1, 2, 3, 4, 5])
    ret = mp.wait() # get all results
    print(ret)
    assert len(ret) == num_proc and all(r == 15 for r in ret)
sudo
sumber
18

Untuk siapa pun yang mencari cara untuk mendapatkan nilai dari Processmenggunakan Queue:

import multiprocessing

ret = {'foo': False}

def worker(queue):
    ret = queue.get()
    ret['foo'] = True
    queue.put(ret)

if __name__ == '__main__':
    queue = multiprocessing.Queue()
    queue.put(ret)
    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()
    print queue.get()  # Prints {"foo": True}
    p.join()
Matthew Moisen
sumber
1
ketika saya memasukkan sesuatu ke dalam antrean proses pekerja saya, join saya tidak pernah tercapai. Adakah yang tahu bagaimana ini bisa terjadi?
Laurens Koppenol
@LaurensKoppenol maksud Anda bahwa kode utama Anda hang di p.join () secara permanen dan tidak pernah dilanjutkan? Apakah proses Anda memiliki loop tak terbatas?
Matthew Moisen
4
Ya, itu tergantung di sana tanpa batas. Pekerja saya selesai semua (lingkaran dalam fungsi pekerja berakhir, pernyataan cetak kemudian dicetak, untuk semua pekerja). Bergabung tidak melakukan apa pun. Jika saya menghapus Queuedari fungsi saya itu membiarkan saya lulusjoin()
Laurens Koppenol
@LaurensKoppenol Apakah Anda mungkin tidak menelepon queue.put(ret)sebelum menelepon p.start()? Dalam hal ini, utas pekerja akan menggantung queue.get()selamanya. Anda dapat meniru ini dengan menyalin cuplikan saya di atas saat berkomentar queue.put(ret).
Matthew Moisen
Saya mengedit jawaban ini, queue.get()harus terjadi sebelum p.join(). Sekarang berfungsi untuk saya.
jfunk
10

Anda dapat menggunakan exitbawaan untuk mengatur kode keluar dari suatu proses. Itu dapat diperoleh dari exitcodeatribut proses:

import multiprocessing

def worker(procnum):
    print str(procnum) + ' represent!'
    exit(procnum)

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    result = []
    for proc in jobs:
        proc.join()
        result.append(proc.exitcode)
    print result

Keluaran:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]
David Cullen
sumber
4
Berhati-hatilah bahwa pendekatan ini bisa membingungkan. Proses umumnya harus keluar dengan kode keluar 0 jika selesai tanpa kesalahan. Jika Anda memiliki sesuatu yang memonitor kode keluar proses sistem Anda maka Anda mungkin melihat ini dilaporkan sebagai kesalahan.
ferrouswheel
1
Sempurna jika Anda hanya ingin mengajukan pengecualian pada proses induk karena kesalahan.
crizCraig
5

The kerikil paket memiliki leveraging abstraksi yang bagus multiprocessing.Pipeyang membuat ini cukup mudah:

from pebble import concurrent

@concurrent.process
def function(arg, kwarg=0):
    return arg + kwarg

future = function(1, kwarg=1)

print(future.result())

Contoh dari: https://pythonhosted.org/Pebble/#concurrent-decorators

erikreed
sumber
3

Pikir saya akan menyederhanakan contoh paling sederhana yang disalin dari atas, bekerja untuk saya di Py3.6. Paling sederhana adalah multiprocessing.Pool:

import multiprocessing
import time

def worker(x):
    time.sleep(1)
    return x

pool = multiprocessing.Pool()
print(pool.map(worker, range(10)))

Anda dapat mengatur jumlah proses di kumpulan dengan, misalnya Pool(processes=5),. Namun standarnya adalah jumlah CPU, jadi biarkan kosong untuk tugas yang terikat CPU. (Tugas I / O-terikat sering sesuai dengan utas, karena utas sebagian besar menunggu sehingga dapat berbagi inti CPU.) PoolJuga menerapkan optimasi chunking .

(Perhatikan bahwa metode pekerja tidak dapat disarangkan dalam metode. Saya awalnya mendefinisikan metode pekerja saya di dalam metode yang membuat panggilan ke pool.map, untuk menjaga semuanya mandiri, tetapi kemudian proses tidak dapat mengimpornya, dan melemparkan "AttributeError : Tidak dapat mengasah objek lokal outer_method..inner_method ". Lebih lanjut di sini . Bisa di dalam kelas.)

(Menghargai pertanyaan asli yang ditentukan 'represent!'daripada mencetak time.sleep(), tetapi tanpa itu saya pikir beberapa kode berjalan bersamaan ketika tidak.)


Py3 ProcessPoolExecutorjuga dua baris ( .mapmengembalikan generator sehingga Anda perlu list()):

from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as executor:
    print(list(executor.map(worker, range(10))))

Dengan Processes polos :

import multiprocessing
import time

def worker(x, queue):
    time.sleep(1)
    queue.put(x)

queue = multiprocessing.SimpleQueue()
tasks = range(10)

for task in tasks:
    multiprocessing.Process(target=worker, args=(task, queue,)).start()

for _ in tasks:
    print(queue.get())

Gunakan SimpleQueuejika yang Anda butuhkan adalah putdan get. Loop pertama memulai semua proses, sebelum yang kedua membuat queue.getpanggilan pemblokiran . Saya tidak berpikir ada alasan untuk menelepon p.join()juga.

Chris
sumber
2

Solusi sederhana:

import multiprocessing

output=[]
data = range(0,10)

def f(x):
    return x**2

def handler():
    p = multiprocessing.Pool(64)
    r=p.map(f, data)
    return r

if __name__ == '__main__':
    output.append(handler())

print(output[0])

Keluaran:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Rubens_Zimbres
sumber
2

Jika Anda menggunakan Python 3, Anda bisa menggunakan concurrent.futures.ProcessPoolExecutorabstraksi yang praktis:

from concurrent.futures import ProcessPoolExecutor

def worker(procnum):
    '''worker function'''
    print(str(procnum) + ' represent!')
    return procnum


if __name__ == '__main__':
    with ProcessPoolExecutor() as executor:
        print(list(executor.map(worker, range(5))))

Keluaran:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]
Aleph Aleph
sumber
0

Saya mengubah jawaban vartec sedikit karena saya perlu mendapatkan kode kesalahan dari fungsi. (Terima kasih vertec !!! ini trik yang luar biasa)

Ini juga dapat dilakukan dengan manager.listtetapi saya pikir lebih baik untuk memilikinya di dict dan menyimpan daftar di dalamnya. Dengan begitu, cara kita menjaga fungsi dan hasilnya karena kita tidak bisa memastikan urutan daftar yang akan diisi.

from multiprocessing import Process
import time
import datetime
import multiprocessing


def func1(fn, m_list):
    print 'func1: starting'
    time.sleep(1)
    m_list[fn] = "this is the first function"
    print 'func1: finishing'
    # return "func1"  # no need for return since Multiprocess doesnt return it =(

def func2(fn, m_list):
    print 'func2: starting'
    time.sleep(3)
    m_list[fn] = "this is function 2"
    print 'func2: finishing'
    # return "func2"

def func3(fn, m_list):
    print 'func3: starting'
    time.sleep(9)
    # if fail wont join the rest because it never populate the dict
    # or do a try/except to get something in return.
    raise ValueError("failed here")
    # if we want to get the error in the manager dict we can catch the error
    try:
        raise ValueError("failed here")
        m_list[fn] = "this is third"
    except:
        m_list[fn] = "this is third and it fail horrible"
        # print 'func3: finishing'
        # return "func3"


def runInParallel(*fns):  # * is to accept any input in list
    start_time = datetime.datetime.now()
    proc = []
    manager = multiprocessing.Manager()
    m_list = manager.dict()
    for fn in fns:
        # print fn
        # print dir(fn)
        p = Process(target=fn, name=fn.func_name, args=(fn, m_list))
        p.start()
        proc.append(p)
    for p in proc:
        p.join()  # 5 is the time out

    print datetime.datetime.now() - start_time
    return m_list, proc

if __name__ == '__main__':
    manager, proc = runInParallel(func1, func2, func3)
    # print dir(proc[0])
    # print proc[0]._name
    # print proc[0].name
    # print proc[0].exitcode

    # here you can check what did fail
    for i in proc:
        print i.name, i.exitcode  # name was set up in the Process line 53

    # here will only show the function that worked and where able to populate the 
    # manager dict
    for i, j in manager.items():
        print dir(i)  # things you can do to the function
        print i, j
pelos
sumber