Saya mencoba membaca dokumentasi di http://docs.python.org/dev/library/multiprocessing.html tetapi saya masih kesulitan dengan multiprocessing Queue, Pool and Locking. Dan untuk saat ini saya dapat membuat contoh di bawah ini.
Mengenai Antrian dan Pool, saya tidak yakin apakah saya memahami konsep dengan cara yang benar, jadi koreksi saya jika saya salah. Apa yang saya coba capai adalah memproses 2 permintaan sekaligus (daftar data memiliki 8 dalam contoh ini) jadi, apa yang harus saya gunakan? Pool untuk membuat 2 proses yang dapat menangani dua antrian berbeda (maksimal 2) atau haruskah saya menggunakan Antrean untuk memproses 2 input setiap kali? Kunci akan mencetak output dengan benar.
import multiprocessing
import time
data = (['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']
)
def mp_handler(var1):
for indata in var1:
p = multiprocessing.Process(target=mp_worker, args=(indata[0], indata[1]))
p.start()
def mp_worker(inputs, the_time):
print " Processs %s\tWaiting %s seconds" % (inputs, the_time)
time.sleep(int(the_time))
print " Process %s\tDONE" % inputs
if __name__ == '__main__':
mp_handler(data)
sumber
var1
semuanya, mengacu pada globaldata
sebagai gantinya.Ini mungkin tidak 100% terkait dengan pertanyaan tersebut, tetapi pada pencarian saya untuk contoh penggunaan multiprocessing dengan antrian ini muncul pertama kali di google.
Ini adalah contoh kelas dasar yang Anda dapat membuat instance dan meletakkan item dalam antrian dan bisa menunggu sampai antrian selesai. Itu yang saya butuhkan.
from multiprocessing import JoinableQueue from multiprocessing.context import Process class Renderer: queue = None def __init__(self, nb_workers=2): self.queue = JoinableQueue() self.processes = [Process(target=self.upload) for i in range(nb_workers)] for p in self.processes: p.start() def render(self, item): self.queue.put(item) def upload(self): while True: item = self.queue.get() if item is None: break # process your item here self.queue.task_done() def terminate(self): """ wait until queue is empty and terminate processes """ self.queue.join() for p in self.processes: p.terminate() r = Renderer() r.render(item1) r.render(item2) r.terminate()
sumber
item1
danitem2
? Apakah mereka semacam tugas atau fungsi, yang akan dijalankan dalam dua proses berbeda?Ini adalah kebagian pribadi saya untuk topik ini:
Intinya di sini, (pull request welcome!): Https://gist.github.com/thorsummoner/b5b1dfcff7e7fdd334ec
import multiprocessing import sys THREADS = 3 # Used to prevent multiple threads from mixing thier output GLOBALLOCK = multiprocessing.Lock() def func_worker(args): """This function will be called by each thread. This function can not be a class method. """ # Expand list of args into named args. str1, str2 = args del args # Work # ... # Serial-only Portion GLOBALLOCK.acquire() print(str1) print(str2) GLOBALLOCK.release() def main(argp=None): """Multiprocessing Spawn Example """ # Create the number of threads you want pool = multiprocessing.Pool(THREADS) # Define two jobs, each with two args. func_args = [ ('Hello', 'World',), ('Goodbye', 'World',), ] try: # Spawn up to 9999999 jobs, I think this is the maximum possible. # I do not know what happens if you exceed this. pool.map_async(func_worker, func_args).get(9999999) except KeyboardInterrupt: # Allow ^C to interrupt from any thread. sys.stdout.write('\033[0m') sys.stdout.write('User Interupt\n') pool.close() if __name__ == '__main__': main()
sumber
get()
adalah batas waktu, tidak ada hubungannya dengan jumlah pekerjaan yang dimulai..get(timeout=1)
? dan apakah boleh hanya mengatakan.get()
untuk mendapatkan daftar lengkap?.get()
menunggu tanpa batas waktu hingga semua hasil tersedia dan mengembalikan daftar hasil. Anda bisa menggunakan loop polling untuk memeriksa apakah hasil cuaca tersedia, atau Anda bisa meneruskan fungsi callback dalammap_async()
panggilan yang kemudian akan dipanggil untuk setiap hasil setelah tersedia.Untuk semua orang yang menggunakan editor seperti Komodo Edit (win10) tambahkan
sys.stdout.flush()
ke:def mp_worker((inputs, the_time)): print " Process %s\tWaiting %s seconds" % (inputs, the_time) time.sleep(int(the_time)) print " Process %s\tDONE" % inputs sys.stdout.flush()
atau sebagai baris pertama ke:
if __name__ == '__main__': sys.stdout.flush()
Ini membantu untuk melihat apa yang terjadi selama menjalankan skrip; daripada harus melihat kotak baris perintah hitam.
sumber
Berikut adalah contoh dari kode saya (untuk kumpulan berulir, tetapi cukup ubah nama kelas dan Anda akan memiliki kumpulan proses):
def execute_run(rp): ... do something pool = ThreadPoolExecutor(6) for mat in TESTED_MATERIAL: for en in TESTED_ENERGIES: for ecut in TESTED_E_CUT: rp = RunParams( simulations, DEST_DIR, PARTICLE, mat, 960, 0.125, ecut, en ) pool.submit(execute_run, rp) pool.join()
Pada dasarnya:
pool = ThreadPoolExecutor(6)
membuat kolam untuk 6 utaspool.submit(execute_run, rp)
menambahkan tugas ke pool, arogumen pertama adalah fungsi yang dipanggil dalam utas / proses, sisa argumen diteruskan ke fungsi yang dipanggil.pool.join
menunggu sampai semua tugas selesai.sumber
concurrent.futures
, tetapi OP menanyakan tentangmultiprocessing
dan Python 2.7.