Python multiprocessing pool.map untuk beberapa argumen

536

Di pustaka multiprosesor Python, apakah ada varian pool.map yang mendukung banyak argumen?

text = "test"
def harvester(text, case):
    X = case[0]
    text+ str(X)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=6)
    case = RAW_DATASET
    pool.map(harvester(text,case),case, 1)
    pool.close()
    pool.join()
pengguna642897
sumber
4
Yang mengejutkan saya, saya tidak bisa melakukan partialatau lambdamelakukan ini. Saya pikir itu ada hubungannya dengan cara aneh bahwa fungsi dilewatkan ke subproses (via pickle).
pengirim
10
@senderle: Ini adalah bug di Python 2.6, tetapi telah diperbaiki pada 2.7: bugs.python.org/issue5228
unutbu
1
Cukup ganti pool.map(harvester(text,case),case, 1) dengan: pool.apply_async(harvester(text,case),case, 1)
Tung Nguyen
3
@Syrtis_Major, tolong jangan mengedit pertanyaan OP yang secara efektif memiringkan jawaban yang telah diberikan sebelumnya. Menambahkan returnuntuk harvester()respon @senderie 's berubah menjadi tidak akurat. Itu tidak membantu pembaca di masa depan.
Ricalsin
1
Saya akan mengatakan solusi mudahnya adalah mengemas semua argumen dalam sebuah tuple dan membukanya di func yang menjalankan. Saya melakukan ini ketika saya perlu mengirim beberapa argumen rumit ke sebuah fungsi yang dijalankan oleh kumpulan proses.
HS Rathore

Jawaban:

358

Jawabannya bergantung pada versi dan situasi. Jawaban paling umum untuk versi terbaru Python (sejak 3.3) pertama kali dijelaskan di bawah ini oleh JF Sebastian . 1 Menggunakan Pool.starmapmetode, yang menerima urutan argumen tuple. Itu kemudian secara otomatis membongkar argumen dari setiap tuple dan meneruskannya ke fungsi yang diberikan:

import multiprocessing
from itertools import product

def merge_names(a, b):
    return '{} & {}'.format(a, b)

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with multiprocessing.Pool(processes=3) as pool:
        results = pool.starmap(merge_names, product(names, repeat=2))
    print(results)

# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...

Untuk versi Python sebelumnya, Anda harus menulis fungsi pembantu untuk membongkar argumen secara eksplisit. Jika Anda ingin menggunakan with, Anda juga harus menulis pembungkus untuk berubah Poolmenjadi manajer konteks. (Terima kasih kepada muon untuk menunjukkan ini.)

import multiprocessing
from itertools import product
from contextlib import contextmanager

def merge_names(a, b):
    return '{} & {}'.format(a, b)

def merge_names_unpack(args):
    return merge_names(*args)

@contextmanager
def poolcontext(*args, **kwargs):
    pool = multiprocessing.Pool(*args, **kwargs)
    yield pool
    pool.terminate()

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with poolcontext(processes=3) as pool:
        results = pool.map(merge_names_unpack, product(names, repeat=2))
    print(results)

# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...

Dalam kasus yang lebih sederhana, dengan argumen kedua tetap, Anda juga bisa menggunakan partial, tetapi hanya dengan Python 2.7+.

import multiprocessing
from functools import partial
from contextlib import contextmanager

@contextmanager
def poolcontext(*args, **kwargs):
    pool = multiprocessing.Pool(*args, **kwargs)
    yield pool
    pool.terminate()

def merge_names(a, b):
    return '{} & {}'.format(a, b)

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with poolcontext(processes=3) as pool:
        results = pool.map(partial(merge_names, b='Sons'), names)
    print(results)

# Output: ['Brown & Sons', 'Wilson & Sons', 'Bartlett & Sons', ...

1. Banyak dari ini terinspirasi oleh jawabannya, yang seharusnya diterima sebagai gantinya. Tapi karena yang satu ini macet di puncak, sepertinya lebih baik untuk memperbaikinya bagi pembaca masa depan.

pengirim
sumber
Sepertinya saya bahwa RAW_DATASET dalam kasus ini harus menjadi variabel global? Sementara saya ingin partial_harvester mengubah nilai case di setiap panggilan harvester (). Bagaimana cara mencapainya?
xgdgsc
Yang paling penting di sini adalah menetapkan =RAW_DATASETnilai default case. Kalau tidak, pool.mapakan membingungkan tentang beberapa argumen.
Emerson Xu
1
Saya bingung, apa yang terjadi pada textvariabel dalam contoh Anda? Kenapa RAW_DATASETsepertinya dilewati dua kali. Saya pikir Anda mungkin memiliki kesalahan ketik?
Dave
tidak yakin mengapa menggunakan with .. as .. memberi saya AttributeError: __exit__, tetapi berfungsi dengan baik jika saya panggil pool = Pool();lalu tutup secara manual pool.close()(python2.7)
muon
1
@ muon, tangkapan yang bagus. Tampaknya Poolobjek tidak menjadi manajer konteks sampai Python 3.3. Saya telah menambahkan fungsi pembungkus sederhana yang mengembalikan Poolmanajer konteks.
pengirim
501

apakah ada varian pool.map yang mendukung banyak argumen?

Python 3.3 termasuk pool.starmap()metode :

#!/usr/bin/env python3
from functools import partial
from itertools import repeat
from multiprocessing import Pool, freeze_support

def func(a, b):
    return a + b

def main():
    a_args = [1,2,3]
    second_arg = 1
    with Pool() as pool:
        L = pool.starmap(func, [(1, 1), (2, 1), (3, 1)])
        M = pool.starmap(func, zip(a_args, repeat(second_arg)))
        N = pool.map(partial(func, b=second_arg), a_args)
        assert L == M == N

if __name__=="__main__":
    freeze_support()
    main()

Untuk versi yang lebih lama:

#!/usr/bin/env python2
import itertools
from multiprocessing import Pool, freeze_support

def func(a, b):
    print a, b

def func_star(a_b):
    """Convert `f([1,2])` to `f(1,2)` call."""
    return func(*a_b)

def main():
    pool = Pool()
    a_args = [1,2,3]
    second_arg = 1
    pool.map(func_star, itertools.izip(a_args, itertools.repeat(second_arg)))

if __name__=="__main__":
    freeze_support()
    main()

Keluaran

1 1
2 1
3 1

Perhatikan bagaimana itertools.izip()dan itertools.repeat()digunakan di sini.

Karena bug yang disebutkan oleh @unutbu Anda tidak dapat menggunakan functools.partial()atau kemampuan serupa pada Python 2.6, sehingga fungsi pembungkus sederhana func_star()harus didefinisikan secara eksplisit. Lihat juga solusi yang disarankan olehuptimebox .

jfs
sumber
1
F .: Anda dapat membongkar tupel argumen dalam tanda tangan func_starseperti ini: def func_star((a, b)). Tentu saja, ini hanya berfungsi untuk sejumlah argumen, tetapi jika itu adalah satu-satunya kasus yang ia miliki, itu lebih mudah dibaca.
Björn Pollex
1
@ Space_C0wb0y: f((a,b))sintaks sudah usang dan dihapus di py3k. Dan itu tidak perlu di sini.
jfs
mungkin lebih pythonic: func = lambda x: func(*x)alih-alih mendefinisikan fungsi wrapper
dylam
1
@ zthomas.nc pertanyaan ini adalah tentang bagaimana mendukung banyak argumen untuk multiprocessing pool.map. Jika ingin tahu cara memanggil metode alih-alih fungsi dalam proses Python yang berbeda melalui multiprocessing, maka tanyakan pertanyaan terpisah (jika semuanya gagal, Anda selalu dapat membuat fungsi global yang membungkus panggilan metode yang serupa dengan di func_star()atas)
jfs
1
Saya berharap ada starstarmap.
Константин Ван
141

Saya pikir di bawah ini akan lebih baik

def multi_run_wrapper(args):
   return add(*args)
def add(x,y):
    return x+y
if __name__ == "__main__":
    from multiprocessing import Pool
    pool = Pool(4)
    results = pool.map(multi_run_wrapper,[(1,2),(2,3),(3,4)])
    print results

keluaran

[3, 5, 7]
imotai
sumber
16
Solusi termudah. Ada optimasi kecil; hapus fungsi pembungkus dan bongkar argslangsung add, itu berfungsi untuk sejumlah argumen:def add(args): (x,y) = args
Ahmed
1
Anda juga bisa menggunakan lambdafungsi alih-alih mendefinisikanmulti_run_wrapper(..)
Andre Holzner
2
hm ... pada kenyataannya, menggunakan lambdatidak berfungsi karena pool.map(..)mencoba untuk mengambil fungsi yang diberikan
Andre Holzner
Bagaimana Anda menggunakan ini jika Anda ingin menyimpan hasil adddalam daftar?
Vivek Subramanian
@Ahmed Saya suka apa adanya, karena IMHO pemanggilan metode harus gagal, setiap kali jumlah parameter tidak benar.
Michael Dorner
56

Menggunakan Python 3.3+ denganpool.starmap():

from multiprocessing.dummy import Pool as ThreadPool 

def write(i, x):
    print(i, "---", x)

a = ["1","2","3"]
b = ["4","5","6"] 

pool = ThreadPool(2)
pool.starmap(write, zip(a,b)) 
pool.close() 
pool.join()

Hasil:

1 --- 4
2 --- 5
3 --- 6

Anda juga dapat memberi zip () lebih banyak argumen jika suka: zip(a,b,c,d,e)

Jika Anda ingin memiliki nilai konstan yang diteruskan sebagai argumen yang harus Anda gunakan import itertoolsdan kemudian zip(itertools.repeat(constant), a)misalnya.

pengguna136036
sumber
2
Ini adalah jawaban rangkap yang hampir sama dengan jawaban dari @JFSebastian pada tahun 2011 (dengan 60+ suara).
Mike McKerns
29
Tidak. Pertama-tama menghapus banyak hal yang tidak perlu dan dengan jelas menyatakan itu untuk python 3.3+ dan ditujukan untuk pemula yang mencari jawaban yang sederhana dan bersih. Sebagai seorang pemula sendiri butuh beberapa saat untuk mencari tahu seperti itu (ya dengan posting JFSebastians) dan inilah mengapa saya menulis posting saya untuk membantu pemula lain, karena postingnya hanya mengatakan "ada starmap" tetapi tidak menjelaskannya - ini adalah apa yang ingin posting saya. Jadi sama sekali tidak ada alasan untuk memukul saya dengan dua downvotes.
user136036
Pada 2011, tidak ada "+" di python 3.3 + ... jadi jelas.
Mike McKerns
27

Setelah mengetahui tentang itertools di JF Sebastian, saya memutuskan untuk mengambil langkah lebih jauh dan menulis parmappaket yang memperhatikan paralelisasi, penawaran mapdan starmapfungsi pada python-2.7 dan python-3.2 (dan kemudian juga) yang dapat mengambil sejumlah argumen posisi .

Instalasi

pip install parmap

Cara memparalelkan:

import parmap
# If you want to do:
y = [myfunction(x, argument1, argument2) for x in mylist]
# In parallel:
y = parmap.map(myfunction, mylist, argument1, argument2)

# If you want to do:
z = [myfunction(x, y, argument1, argument2) for (x,y) in mylist]
# In parallel:
z = parmap.starmap(myfunction, mylist, argument1, argument2)

# If you want to do:
listx = [1, 2, 3, 4, 5, 6]
listy = [2, 3, 4, 5, 6, 7]
param = 3.14
param2 = 42
listz = []
for (x, y) in zip(listx, listy):
        listz.append(myfunction(x, y, param1, param2))
# In parallel:
listz = parmap.starmap(myfunction, zip(listx, listy), param1, param2)

Saya telah mengunggah parmap ke PyPI dan ke repositori github .

Sebagai contoh, pertanyaannya bisa dijawab sebagai berikut:

import parmap

def harvester(case, text):
    X = case[0]
    text+ str(X)

if __name__ == "__main__":
    case = RAW_DATASET  # assuming this is an iterable
    parmap.map(harvester, case, "test", chunksize=1)
zeehio
sumber
20

# "Cara mengambil banyak argumen".

def f1(args):
    a, b, c = args[0] , args[1] , args[2]
    return a+b+c

if __name__ == "__main__":
    import multiprocessing
    pool = multiprocessing.Pool(4) 

    result1 = pool.map(f1, [ [1,2,3] ])
    print(result1)
Dane Lee
sumber
2
Rapi dan elegan.
Prav001
1
Saya tidak mengerti mengapa saya harus menggulir ke sini untuk menemukan jawaban terbaik.
toti
12

Ada garpu yang multiprocessingdisebut patho ( catatan: gunakan versi di github ) yang tidak perlu starmap- fungsi peta mencerminkan API untuk peta python, sehingga peta dapat mengambil beberapa argumen. Dengan pathos, Anda juga dapat secara umum melakukan multiprocessing dalam interpreter, alih-alih terjebak di __main__blok. Pathos akan dirilis, setelah beberapa pembaruan ringan - kebanyakan konversi ke python 3.x.

  Python 2.7.5 (default, Sep 30 2013, 20:15:49) 
  [GCC 4.2.1 (Apple Inc. build 5566)] on darwin
  Type "help", "copyright", "credits" or "license" for more information.
  >>> def func(a,b):
  ...     print a,b
  ...
  >>>
  >>> from pathos.multiprocessing import ProcessingPool    
  >>> pool = ProcessingPool(nodes=4)
  >>> pool.map(func, [1,2,3], [1,1,1])
  1 1
  2 1
  3 1
  [None, None, None]
  >>>
  >>> # also can pickle stuff like lambdas 
  >>> result = pool.map(lambda x: x**2, range(10))
  >>> result
  [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
  >>>
  >>> # also does asynchronous map
  >>> result = pool.amap(pow, [1,2,3], [4,5,6])
  >>> result.get()
  [1, 32, 729]
  >>>
  >>> # or can return a map iterator
  >>> result = pool.imap(pow, [1,2,3], [4,5,6])
  >>> result
  <processing.pool.IMapIterator object at 0x110c2ffd0>
  >>> list(result)
  [1, 32, 729]

pathosmemiliki beberapa cara yang dapat Anda lakukan untuk mendapatkan perilaku yang tepat starmap.

>>> def add(*x):
...   return sum(x)
... 
>>> x = [[1,2,3],[4,5,6]]
>>> import pathos
>>> import numpy as np
>>> # use ProcessPool's map and transposing the inputs
>>> pp = pathos.pools.ProcessPool()
>>> pp.map(add, *np.array(x).T)
[6, 15]
>>> # use ProcessPool's map and a lambda to apply the star
>>> pp.map(lambda x: add(*x), x)
[6, 15]
>>> # use a _ProcessPool, which has starmap
>>> _pp = pathos.pools._ProcessPool()
>>> _pp.starmap(add, x)
[6, 15]
>>> 
Mike McKerns
sumber
Saya ingin mencatat bahwa ini tidak membahas struktur dalam pertanyaan awal. [[1,2,3], [4,5,6]] akan membongkar dengan starmap ke [pow (1,2,3), pow (4,5,6)], bukan [pow (1,4) , pow (2,5), pow (3, 6)]. Jika Anda tidak memiliki kontrol yang baik atas input yang diteruskan ke fungsi Anda, Anda mungkin perlu merestrukturisasi mereka terlebih dahulu.
Scott
@ Esc: ah, saya tidak memperhatikan itu ... lebih dari 5 tahun yang lalu. Saya akan membuat pembaruan kecil. Terima kasih.
Mike McKerns
8

Anda dapat menggunakan dua fungsi berikut untuk menghindari penulisan pembungkus untuk setiap fungsi baru:

import itertools
from multiprocessing import Pool

def universal_worker(input_pair):
    function, args = input_pair
    return function(*args)

def pool_args(function, *args):
    return zip(itertools.repeat(function), zip(*args))

Gunakan fungsi functiondengan daftar argumen arg_0, arg_1dan arg_2sebagai berikut:

pool = Pool(n_core)
list_model = pool.map(universal_worker, pool_args(function, arg_0, arg_1, arg_2)
pool.close()
pool.join()
M. Toya
sumber
8

Solusi yang lebih baik untuk python2:

from multiprocessing import Pool
def func((i, (a, b))):
    print i, a, b
    return a + b
pool = Pool(3)
pool.map(func, [(0,(1,2)), (1,(2,3)), (2,(3, 4))])

2 3 4

1 2 3

0 1 2

di luar[]:

[3, 5, 7]

xmduhan
sumber
7

Alternatif sederhana lainnya adalah membungkus parameter fungsi Anda dalam tuple dan kemudian membungkus parameter yang harus dilewatkan dalam tupel juga. Ini mungkin tidak ideal ketika berhadapan dengan banyak data. Saya percaya itu akan membuat salinan untuk setiap tuple.

from multiprocessing import Pool

def f((a,b,c,d)):
    print a,b,c,d
    return a + b + c +d

if __name__ == '__main__':
    p = Pool(10)
    data = [(i+0,i+1,i+2,i+3) for i in xrange(10)]
    print(p.map(f, data))
    p.close()
    p.join()

Memberikan output dalam urutan acak:

0 1 2 3
1 2 3 4
2 3 4 5
3 4 5 6
4 5 6 7
5 6 7 8
7 8 9 10
6 7 8 9
8 9 10 11
9 10 11 12
[6, 10, 14, 18, 22, 26, 30, 34, 38, 42]
Alex Klibisz
sumber
Memang benar, masih mencari cara yang lebih baik :(
Fábio Dias
6

Cara yang lebih baik adalah menggunakan dekorator alih-alih menulis fungsi pembungkus dengan tangan. Terutama ketika Anda memiliki banyak fungsi untuk dipetakan, dekorator akan menghemat waktu Anda dengan menghindari pembungkus tulisan untuk setiap fungsi. Biasanya fungsi yang didekorasi tidak dapat dipilih, namun kita dapat menggunakannya functoolsuntuk menyiasatinya. Lebih banyak diskusi dapat ditemukan di sini .

Berikut contohnya

def unpack_args(func):
    from functools import wraps
    @wraps(func)
    def wrapper(args):
        if isinstance(args, dict):
            return func(**args)
        else:
            return func(*args)
    return wrapper

@unpack_args
def func(x, y):
    return x + y

Kemudian Anda dapat memetakannya dengan argumen zip

np, xlist, ylist = 2, range(10), range(10)
pool = Pool(np)
res = pool.map(func, zip(xlist, ylist))
pool.close()
pool.join()

Tentu saja, Anda selalu dapat menggunakan Pool.starmapdalam Python 3 (> = 3.3) sebagaimana disebutkan dalam jawaban lain.

Syrtis Major
sumber
Hasilnya tidak seperti yang diharapkan: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18] Saya harapkan: [0,1,2,3,4,5,6,7,8, 9,1,2,3,4,5,6,7,8,9,10,2,3,4,5,6,7,8,9,10,11, ...
Tedo Vrbanec
@TedoVrbanec Hasil seharusnya [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]. Jika Anda menginginkan yang lebih baru, Anda dapat menggunakannya itertools.productsebagai gantinya zip.
Syrtis Major
4

Cara lain adalah dengan menyerahkan daftar daftar ke rutinitas satu argumen:

import os
from multiprocessing import Pool

def task(args):
    print "PID =", os.getpid(), ", arg1 =", args[0], ", arg2 =", args[1]

pool = Pool()

pool.map(task, [
        [1,2],
        [3,4],
        [5,6],
        [7,8]
    ])

Dari satu dapat membangun daftar daftar argumen dengan metode favorit seseorang.

Adobe
sumber
Ini adalah cara yang mudah, tetapi Anda perlu mengubah fungsi asli Anda. Terlebih lagi, beberapa waktu mengingat fungsi orang lain yang mungkin tidak dapat dimodifikasi.
WeizhongTu
Saya akan mengatakan ini menempel pada Python zen. Seharusnya ada satu dan hanya satu cara yang jelas untuk melakukannya. Jika kebetulan Anda adalah penulis fungsi panggilan, ini Anda harus menggunakan metode ini, untuk kasus lain kita dapat menggunakan metode imotai.
nehem
Pilihan saya adalah menggunakan tuple, dan kemudian segera membuka bungkusnya sebagai hal pertama di baris pertama.
nehem
3

Berikut adalah cara lain untuk melakukannya bahwa IMHO lebih sederhana dan elegan daripada jawaban lain yang disediakan.

Program ini memiliki fungsi yang mengambil dua parameter, mencetaknya dan juga mencetak jumlah:

import multiprocessing

def main():

    with multiprocessing.Pool(10) as pool:
        params = [ (2, 2), (3, 3), (4, 4) ]
        pool.starmap(printSum, params)
    # end with

# end function

def printSum(num1, num2):
    mySum = num1 + num2
    print('num1 = ' + str(num1) + ', num2 = ' + str(num2) + ', sum = ' + str(mySum))
# end function

if __name__ == '__main__':
    main()

output adalah:

num1 = 2, num2 = 2, sum = 4
num1 = 3, num2 = 3, sum = 6
num1 = 4, num2 = 4, sum = 8

Lihat dokumen python untuk info lebih lanjut:

https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool

Khususnya pastikan untuk memeriksa starmapfungsi.

Saya menggunakan Python 3.6, saya tidak yakin apakah ini akan berfungsi dengan versi Python yang lebih lama

Mengapa tidak ada contoh yang sangat mudah seperti ini di dokumen, saya tidak yakin.

cdahms
sumber
2

Dari python 3.4.4, Anda dapat menggunakan multiprocessing.get_context () untuk mendapatkan objek konteks untuk menggunakan beberapa metode mulai:

import multiprocessing as mp

def foo(q, h, w):
    q.put(h + ' ' + w)
    print(h + ' ' + w)

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,'hello', 'world'))
    p.start()
    print(q.get())
    p.join()

Atau Anda cukup mengganti saja

pool.map(harvester(text,case),case, 1)

oleh:

pool.apply_async(harvester(text,case),case, 1)
Tung Nguyen
sumber
2

Ada banyak jawaban di sini, tetapi tampaknya tidak ada yang memberikan kode yang kompatibel dengan Python 2/3 yang akan berfungsi pada versi apa pun. Jika Anda ingin kode Anda hanya berfungsi , ini akan berfungsi untuk versi Python:

# For python 2/3 compatibility, define pool context manager
# to support the 'with' statement in Python 2
if sys.version_info[0] == 2:
    from contextlib import contextmanager
    @contextmanager
    def multiprocessing_context(*args, **kwargs):
        pool = multiprocessing.Pool(*args, **kwargs)
        yield pool
        pool.terminate()
else:
    multiprocessing_context = multiprocessing.Pool

Setelah itu, Anda bisa menggunakan multiprosesing dengan cara Python 3 biasa, sesuka Anda. Sebagai contoh:

def _function_to_run_for_each(x):
       return x.lower()
with multiprocessing_context(processes=3) as pool:
    results = pool.map(_function_to_run_for_each, ['Bob', 'Sue', 'Tim'])    print(results)

akan bekerja di Python 2 atau Python 3.

cgnorthcutt
sumber
1

Dalam dokumentasi resmi disebutkan bahwa itu hanya mendukung satu argumen yang dapat diubah. Saya suka menggunakan apply_async dalam kasus seperti itu. Dalam kasus Anda, saya akan melakukan:

from multiprocessing import Process, Pool, Manager

text = "test"
def harvester(text, case, q = None):
 X = case[0]
 res = text+ str(X)
 if q:
  q.put(res)
 return res


def block_until(q, results_queue, until_counter=0):
 i = 0
 while i < until_counter:
  results_queue.put(q.get())
  i+=1

if __name__ == '__main__':
 pool = multiprocessing.Pool(processes=6)
 case = RAW_DATASET
 m = Manager()
 q = m.Queue()
 results_queue = m.Queue() # when it completes results will reside in this queue
 blocking_process = Process(block_until, (q, results_queue, len(case)))
 blocking_process.start()
 for c in case:
  try:
   res = pool.apply_async(harvester, (text, case, q = None))
   res.get(timeout=0.1)
  except:
   pass
 blocking_process.join()
roj4s
sumber
1
text = "test"

def unpack(args):
    return args[0](*args[1:])

def harvester(text, case):
    X = case[0]
    text+ str(X)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=6)
    case = RAW_DATASET
    # args is a list of tuples 
    # with the function to execute as the first item in each tuple
    args = [(harvester, text, c) for c in case]
    # doing it this way, we can pass any function
    # and we don't need to define a wrapper for each different function
    # if we need to use more than one
    pool.map(unpack, args)
    pool.close()
    pool.join()
Jaime
sumber
1

Ini adalah contoh rutin yang saya gunakan untuk meneruskan beberapa argumen ke fungsi satu argumen yang digunakan dalam fork pool.imap :

from multiprocessing import Pool

# Wrapper of the function to map:
class makefun:
    def __init__(self, var2):
        self.var2 = var2
    def fun(self, i):
        var2 = self.var2
        return var1[i] + var2

# Couple of variables for the example:
var1 = [1, 2, 3, 5, 6, 7, 8]
var2 = [9, 10, 11, 12]

# Open the pool:
pool = Pool(processes=2)

# Wrapper loop
for j in range(len(var2)):
    # Obtain the function to map
    pool_fun = makefun(var2[j]).fun

    # Fork loop
    for i, value in enumerate(pool.imap(pool_fun, range(len(var1))), 0):
        print(var1[i], '+' ,var2[j], '=', value)

# Close the pool
pool.close()
A. Nodar
sumber
-3

untuk python2, Anda dapat menggunakan trik ini

def fun(a,b):
    return a+b

pool = multiprocessing.Pool(processes=6)
b=233
pool.map(lambda x:fun(x,b),range(1000))
Hz Shang
sumber
mengapa b = 233. mengalahkan tujuan pertanyaan
as - if