python multithreading tunggu sampai semua utas selesai

119

Ini mungkin telah ditanyakan dalam konteks yang sama tetapi saya tidak dapat menemukan jawaban setelah sekitar 20 menit pencarian, jadi saya akan bertanya.

Saya telah menulis skrip Python (katakanlah: scriptA.py) dan skrip (katakanlah scriptB.py)

Di scriptB saya ingin memanggil scriptA beberapa kali dengan argumen yang berbeda, setiap kali membutuhkan waktu sekitar satu jam untuk dijalankan, (ini adalah script yang sangat besar, melakukan banyak hal .. jangan khawatir tentang itu) dan saya ingin dapat menjalankan scriptA dengan semua argumen yang berbeda secara bersamaan, tapi saya harus menunggu sampai SEMUAnya selesai sebelum melanjutkan; kode saya:

import subprocess

#setup
do_setup()

#run scriptA
subprocess.call(scriptA + argumentsA)
subprocess.call(scriptA + argumentsB)
subprocess.call(scriptA + argumentsC)

#finish
do_finish()

Saya ingin menjalankan semua subprocess.call()pada waktu yang sama, dan kemudian menunggu sampai semuanya selesai, bagaimana saya harus melakukan ini?

Saya mencoba menggunakan threading seperti contoh di sini :

from threading import Thread
import subprocess

def call_script(args)
    subprocess.call(args)

#run scriptA   
t1 = Thread(target=call_script, args=(scriptA + argumentsA))
t2 = Thread(target=call_script, args=(scriptA + argumentsB))
t3 = Thread(target=call_script, args=(scriptA + argumentsC))
t1.start()
t2.start()
t3.start()

Tetapi menurut saya ini tidak benar.

Bagaimana saya tahu mereka semua telah selesai berlari sebelum pergi ke saya do_finish()?

Inbar Rose
sumber

Jawaban:

150

Anda perlu menggunakan metode gabunganThread objek di akhir skrip.

t1 = Thread(target=call_script, args=(scriptA + argumentsA))
t2 = Thread(target=call_script, args=(scriptA + argumentsB))
t3 = Thread(target=call_script, args=(scriptA + argumentsC))

t1.start()
t2.start()
t3.start()

t1.join()
t2.join()
t3.join()

Dengan demikian utas utama akan menunggu sampai t1, t2dan t3menyelesaikan eksekusi.

Maksim Skurydzin
sumber
5
hmmm - mengalami kesulitan memahami sesuatu, apakah ini akan dijalankan pertama kali t1, tunggu sampai selesai, lalu pergi ke t2..etc, dll? bagaimana membuat semuanya terjadi sekaligus? saya tidak melihat bagaimana ini akan menjalankan mereka pada saat yang sama?
Inbar Rose
25
Panggilan untuk joinmemblokir hingga thread menyelesaikan eksekusi. Anda harus menunggu semua utasnya. Jika t1selesai dulu Anda akan mulai menunggu t2(yang mungkin sudah selesai dan Anda akan segera melanjutkan menunggu t3). Jika t1membutuhkan waktu paling lama untuk dieksekusi, ketika Anda kembali dari keduanya t1dan t2akan segera kembali tanpa memblokir.
Maksim Skurydzin
1
Anda tidak mengerti pertanyaan saya - jika saya menyalin kode di atas ke kode saya - apakah akan berhasil? atau apakah saya melewatkan sesuatu?
Inbar Rose
2
oke, begitu. sekarang saya mengerti, agak bingung tentang hal itu tetapi saya pikir saya mengerti, joinsemacam melampirkan proses saat ini ke utas dan menunggu sampai selesai, dan jika t2 selesai sebelum t1 maka ketika t1 selesai itu akan memeriksa t2 selesai lihat bahwa itu benar, dan kemudian periksa t3..etc..etc .. dan kemudian hanya setelah semua selesai maka akan dilanjutkan. mengagumkan.
Inbar Rose
3
katakanlah t1 membutuhkan waktu paling lama, tetapi t2 memiliki pengecualian. lalu apa yang terjadi? dapatkah Anda menangkap pengecualian itu atau memeriksa apakah t2 selesai dengan baik atau tidak?
Ciprian Tomoiagă
174

Masukkan utas ke dalam daftar dan kemudian gunakan metode Gabung

 threads = []

 t = Thread(...)
 threads.append(t)

 ...repeat as often as necessary...

 # Start all threads
 for x in threads:
     x.start()

 # Wait for all of them to finish
 for x in threads:
     x.join()
Aaron Digulla
sumber
1
Ya, itu akan berhasil tetapi lebih sulit untuk dipahami. Anda harus selalu mencoba menemukan keseimbangan antara kode ringkas dan "keterbacaan". Ingat: Kode ditulis sekali tetapi dibaca berkali-kali. Jadi yang lebih penting adalah mudah dimengerti.
Aaron Digulla
2
"Pola pabrik" bukanlah sesuatu yang bisa saya jelaskan dalam satu kalimat. Google untuk itu dan telusuri stackoverflow.com. Ada banyak contoh dan penjelasannya. Singkatnya: Anda menulis kode yang membangun sesuatu yang kompleks untuk Anda. Seperti pabrik sungguhan: Anda memberi pesanan dan mendapatkan produk jadi kembali.
Aaron Digulla
18
Saya tidak suka gagasan menggunakan pemahaman daftar karena efek sampingnya dan tidak melakukan sesuatu yang berguna dengan daftar yang dihasilkan. Loop sederhana akan lebih bersih meskipun menyebar dua baris ...
Ioan Alexandru Cucu
1
@Aaron DIgull Saya mengerti itu. Yang saya maksud adalah bahwa saya hanya akan melakukan for x in threads: x.join()daripada menggunakan pemahaman daftar
Ioan Alexandru Cucu
1
@IoanAlexandruCucu: Saya masih bertanya-tanya apakah ada solusi yang lebih mudah dibaca dan efisien: stackoverflow.com/questions/21428602/…
Aaron Digulla
29

Di Python3, karena Python 3.2 ada pendekatan baru untuk mencapai hasil yang sama, yang secara pribadi saya lebih suka pembuatan utas tradisional / start / join, paket concurrent.futures: https://docs.python.org/3/library/concurrent.futures .html

Menggunakan ThreadPoolExecutorkode tersebut adalah:

from concurrent.futures.thread import ThreadPoolExecutor
import time

def call_script(ordinal, arg):
    print('Thread', ordinal, 'argument:', arg)
    time.sleep(2)
    print('Thread', ordinal, 'Finished')

args = ['argumentsA', 'argumentsB', 'argumentsC']

with ThreadPoolExecutor(max_workers=2) as executor:
    ordinal = 1
    for arg in args:
        executor.submit(call_script, ordinal, arg)
        ordinal += 1
print('All tasks has been finished')

Output dari kode sebelumnya adalah seperti:

Thread 1 argument: argumentsA
Thread 2 argument: argumentsB
Thread 1 Finished
Thread 2 Finished
Thread 3 argument: argumentsC
Thread 3 Finished
All tasks has been finished

Salah satu keuntungannya adalah Anda dapat mengontrol pengaturan throughput pekerja bersamaan secara maksimal.

Roberto
sumber
tapi bagaimana Anda bisa tahu kapan semua utas di threadpool telah selesai?
Prime By Design
1
Seperti yang Anda lihat di contoh, kode setelah withpernyataan dijalankan ketika semua tugas telah selesai.
Roberto
ini tidak berhasil. Cobalah melakukan sesuatu yang sangat panjang. Pernyataan cetak Anda akan dieksekusi sebelum utas selesai
Pranalee
@Pranalee, Kode itu berfungsi, saya telah memperbarui kode untuk menambahkan baris keluaran. Anda tidak dapat melihat "Semua tugas ..." sebelum semua utas selesai. Begitulah cara kerja withpernyataan dengan desain dalam kasus ini. Bagaimanapun, Anda selalu dapat membuka pertanyaan baru di SO dan memposting kode Anda sehingga kami dapat membantu Anda untuk mengetahui apa yang terjadi dalam kasus Anda.
Roberto
@PrimeByDesign Anda dapat menggunakan concurrent.futures.waitfungsi, Anda dapat melihat contoh nyata di sini. Dokumen resmi: docs.python.org/3/library/…
Alexander Fortin
28

Saya lebih suka menggunakan pemahaman daftar berdasarkan daftar masukan:

inputs = [scriptA + argumentsA, scriptA + argumentsB, ...]
threads = [Thread(target=call_script, args=(i)) for i in inputs]
[t.start() for t in threads]
[t.join() for t in threads]
Adam Matan
sumber
Jawaban yang dicentang menjelaskan dengan baik tetapi yang ini lebih pendek dan tidak memerlukan pengulangan yang buruk. Jawaban yang bagus. :)
tleb
Pemahaman daftar hanya untuk efek samping biasanya disusutkan *. Tapi dalam kasus penggunaan ini, sepertinya itu ide yang bagus. * stackoverflow.com/questions/5753597/…
Vinayak Kaniyarakkal
3
@VinayakKaniyarakkal for t in threads:t.start()bukankah lebih baik?
SmartManoj
5

Anda dapat memiliki kelas seperti di bawah ini di mana Anda dapat menambahkan 'n' sejumlah fungsi atau console_scripts yang ingin Anda jalankan dalam passion paralel dan memulai eksekusi dan menunggu semua pekerjaan selesai ..

from multiprocessing import Process

class ProcessParallel(object):
    """
    To Process the  functions parallely

    """    
    def __init__(self, *jobs):
        """
        """
        self.jobs = jobs
        self.processes = []

    def fork_processes(self):
        """
        Creates the process objects for given function deligates
        """
        for job in self.jobs:
            proc  = Process(target=job)
            self.processes.append(proc)

    def start_all(self):
        """
        Starts the functions process all together.
        """
        for proc in self.processes:
            proc.start()

    def join_all(self):
        """
        Waits untill all the functions executed.
        """
        for proc in self.processes:
            proc.join()


def two_sum(a=2, b=2):
    return a + b

def multiply(a=2, b=2):
    return a * b


#How to run:
if __name__ == '__main__':
    #note: two_sum, multiply can be replace with any python console scripts which
    #you wanted to run parallel..
    procs =  ProcessParallel(two_sum, multiply)
    #Add all the process in list
    procs.fork_processes()
    #starts  process execution 
    procs.start_all()
    #wait until all the process got executed
    procs.join_all()
PBD
sumber
Ini multiprosesing. Pertanyaannya tentang docs.python.org/3/library/threading.html
Rustam A.
3

Dari threading dokumentasi modul

Ada objek "utas utama"; ini sesuai dengan thread awal kontrol dalam program Python. Ini bukan utas daemon.

Ada kemungkinan bahwa "objek benang tiruan" dibuat. Ini adalah objek utas yang sesuai dengan "utas alien", yang merupakan utas kontrol yang dimulai di luar modul threading, seperti langsung dari kode C. Objek benang tiruan memiliki fungsionalitas terbatas; mereka selalu dianggap hidup dan daemonik, dan tidak dapat diedit join(). Mereka tidak pernah dihapus, karena tidak mungkin mendeteksi penghentian utas alien.

Jadi, untuk menangkap dua kasus tersebut ketika Anda tidak tertarik untuk menyimpan daftar utas yang Anda buat:

import threading as thrd


def alter_data(data, index):
    data[index] *= 2


data = [0, 2, 6, 20]

for i, value in enumerate(data):
    thrd.Thread(target=alter_data, args=[data, i]).start()

for thread in thrd.enumerate():
    if thread.daemon:
        continue
    try:
        thread.join()
    except RuntimeError as err:
        if 'cannot join current thread' in err.args[0]:
            # catchs main thread
            continue
        else:
            raise

Dimana:

>>> print(data)
[0, 4, 12, 40]
berna1111
sumber
2

Mungkin, seperti itu

for t in threading.enumerate():
    if t.daemon:
        t.join()
jno
sumber
Saya telah mencoba kode ini tetapi tidak yakin tentang kerjanya karena instruksi terakhir dari kode saya dicetak setelah ini untuk loop dan masih prosesnya tidak dihentikan.
Omkar
1

Saya baru saja menemukan masalah yang sama di mana saya harus menunggu semua utas yang dibuat menggunakan for loop. Saya baru saja mencoba potongan kode berikut Ini mungkin bukan solusi yang tepat tetapi saya pikir itu akan menjadi solusi yang sederhana untuk menguji:

for t in threading.enumerate():
    try:
        t.join()
    except RuntimeError as err:
        if 'cannot join current thread' in err:
            continue
        else:
            raise
Omkar
sumber