Bagaimana cara mendapatkan nilai kembali dari utas dengan python?

342

Fungsi di foobawah ini mengembalikan string 'foo'. Bagaimana saya bisa mendapatkan nilai 'foo'yang dikembalikan dari target utas?

from threading import Thread

def foo(bar):
    print('hello {}'.format(bar))
    return 'foo'

thread = Thread(target=foo, args=('world!',))
thread.start()
return_value = thread.join()

"Satu cara yang jelas untuk melakukannya", yang ditunjukkan di atas, tidak berfungsi: thread.join()dikembalikan None.

wim
sumber

Jawaban:

37

Dalam Python 3.2+, concurrent.futuresmodul stdlib menyediakan API tingkat yang lebih tinggi threading, termasuk meneruskan nilai atau pengecualian kembali dari utas pekerja kembali ke utas utama:

import concurrent.futures

def foo(bar):
    print('hello {}'.format(bar))
    return 'foo'

with concurrent.futures.ThreadPoolExecutor() as executor:
    future = executor.submit(foo, 'world!')
    return_value = future.result()
    print(return_value)
Ramarao Amara
sumber
Bagi mereka yang bertanya-tanya ini dapat dilakukan dengan daftar utas. futures = [executor.submit(foo, param) for param in param_list]Pesanan akan dipertahankan, dan keluar dari withakan memungkinkan pengumpulan hasil. [f.result() for f in futures]
jayreed1
273

FWIW, multiprocessingmodul memiliki antarmuka yang bagus untuk ini menggunakan Poolkelas. Dan jika Anda ingin tetap menggunakan thread daripada proses, Anda bisa menggunakan multiprocessing.pool.ThreadPoolkelas sebagai pengganti drop-in.

def foo(bar, baz):
  print 'hello {0}'.format(bar)
  return 'foo' + baz

from multiprocessing.pool import ThreadPool
pool = ThreadPool(processes=1)

async_result = pool.apply_async(foo, ('world', 'foo')) # tuple of args for foo

# do some other stuff in the main process

return_val = async_result.get()  # get the return value from your function.
Jake Biesinger
sumber
50
@JakeBiesinger Maksud saya adalah, bahwa saya sedang mencari jawaban, bagaimana mendapatkan tanggapan dari Thread, datang ke sini, dan jawaban yang diterima tidak menjawab pertanyaan yang dinyatakan. Saya membedakan utas dan proses. Saya tahu tentang Global Interpreter Lock namun saya sedang mengerjakan masalah I / O sehingga Threads ok, saya tidak perlu proses. Jawaban lain di sini lebih baik menjawab pertanyaan yang dinyatakan.
omikron
7
@omikron Tapi utas dalam python tidak mengembalikan respons kecuali Anda menggunakan subkelas yang memungkinkan fungsi ini. Dari kemungkinan subkelas, ThreadPools adalah pilihan yang cocok (pilih # utas, gunakan peta / terapkan dengan sinkronisasi / asinkron). Meskipun diimpor dari multiprocess, mereka tidak ada hubungannya dengan Proses.
Jake Biesinger
4
@JakeBiesinger Oh, aku buta. Maaf atas komentar saya yang tidak perlu. Kamu benar. Saya hanya berasumsi bahwa multiprocessing = proses.
omikron
12
Jangan lupa atur processes=1lebih dari satu jika Anda memiliki lebih banyak utas!
iman
4
Masalah dengan multiprocessing dan thread pool adalah lebih lambat untuk mensetup dan memulai thread dibandingkan dengan pustaka threading dasar. Ini bagus untuk memulai utas lama berjalan tetapi kalahkan tujuannya ketika harus memulai banyak utas baru. Solusi menggunakan "threading" dan "Antrian" yang didokumentasikan dalam jawaban lain di sini adalah alternatif yang lebih baik untuk kasus penggunaan yang terakhir menurut saya.
Yves Dorfsman
242

Salah satu cara yang saya lihat adalah untuk melewatkan objek yang bisa berubah, seperti daftar atau kamus, ke konstruktor utas, bersama dengan indeks atau pengidentifikasi lainnya. Utas kemudian dapat menyimpan hasilnya di slot khusus di objek itu. Sebagai contoh:

def foo(bar, result, index):
    print 'hello {0}'.format(bar)
    result[index] = "foo"

from threading import Thread

threads = [None] * 10
results = [None] * 10

for i in range(len(threads)):
    threads[i] = Thread(target=foo, args=('world!', results, i))
    threads[i].start()

# do some other stuff

for i in range(len(threads)):
    threads[i].join()

print " ".join(results)  # what sound does a metasyntactic locomotive make?

Jika Anda benar-benar ingin join()mengembalikan nilai balik dari fungsi yang dipanggil, Anda bisa melakukan ini dengan Threadsubkelas seperti berikut:

from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)
    return "foo"

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, Verbose=None):
        Thread.__init__(self, group, target, name, args, kwargs, Verbose)
        self._return = None
    def run(self):
        if self._Thread__target is not None:
            self._return = self._Thread__target(*self._Thread__args,
                                                **self._Thread__kwargs)
    def join(self):
        Thread.join(self)
        return self._return

twrv = ThreadWithReturnValue(target=foo, args=('world!',))

twrv.start()
print twrv.join()   # prints foo

Itu menjadi sedikit berbulu karena beberapa nama mangling, dan mengakses struktur data "pribadi" yang khusus untuk Threadimplementasi ... tetapi berfungsi.

Untuk python3

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, Verbose=None):
        Thread.__init__(self, group, target, name, args, kwargs)
        self._return = None
    def run(self):
        print(type(self._target))
        if self._target is not None:
            self._return = self._target(*self._args,
                                                **self._kwargs)
    def join(self, *args):
        Thread.join(self, *args)
        return self._return
baik hati
sumber
37
keren, terima kasih untuk contohnya! saya bertanya-tanya mengapa Thread tidak diimplementasikan dengan menangani nilai kembali di tempat pertama, sepertinya hal yang cukup jelas untuk didukung.
wim
16
Saya pikir ini harus menjadi jawaban yang diterima - OP meminta threading, bukan perpustakaan yang berbeda untuk mencoba, ditambah batasan ukuran kumpulan memperkenalkan masalah potensial tambahan, yang terjadi dalam kasus saya.
domoarigato
10
Lelucon kereta yang bagus.
meawoppl
7
Pada python3 ini kembali TypeError: __init__() takes from 1 to 6 positional arguments but 7 were given. Adakah cara untuk memperbaikinya?
GuySoft
2
Peringatan bagi siapa saja yang tergoda untuk melakukan yang kedua dari _Thread__targethal ini. Anda akan membuat siapa pun yang mencoba untuk mem-porting kode Anda ke python 3 membenci Anda sampai mereka mengetahui apa yang telah Anda lakukan (karena menggunakan fitur tidak berdokumen yang berubah antara 2 dan 3). Dokumentasikan kode Anda dengan baik.
Ben Taylor
84

Jawaban Jake baik, tetapi jika Anda tidak ingin menggunakan threadpool (Anda tidak tahu berapa banyak thread yang Anda perlukan, tetapi buatlah sesuai kebutuhan) maka cara yang baik untuk mengirimkan informasi antara thread adalah bawaannya. Kelas antrian , karena menawarkan keamanan utas.

Saya membuat dekorator berikut untuk membuatnya bertindak serupa dengan threadpool:

def threaded(f, daemon=False):
    import Queue

    def wrapped_f(q, *args, **kwargs):
        '''this function calls the decorated function and puts the 
        result in a queue'''
        ret = f(*args, **kwargs)
        q.put(ret)

    def wrap(*args, **kwargs):
        '''this is the function returned from the decorator. It fires off
        wrapped_f in a new thread and returns the thread object with
        the result queue attached'''

        q = Queue.Queue()

        t = threading.Thread(target=wrapped_f, args=(q,)+args, kwargs=kwargs)
        t.daemon = daemon
        t.start()
        t.result_queue = q        
        return t

    return wrap

Maka Anda cukup menggunakannya sebagai:

@threaded
def long_task(x):
    import time
    x = x + 5
    time.sleep(5)
    return x

# does not block, returns Thread object
y = long_task(10)
print y

# this blocks, waiting for the result
result = y.result_queue.get()
print result

Fungsi dihiasi membuat utas baru setiap kali dipanggil dan mengembalikan objek Utas yang berisi antrian yang akan menerima hasilnya.

MEMPERBARUI

Sudah cukup lama sejak saya memposting jawaban ini, tetapi masih mendapat tampilan jadi saya pikir saya akan memperbaruinya untuk mencerminkan cara saya melakukan ini dalam versi Python yang lebih baru:

Python 3.2 ditambahkan dalam concurrent.futuresmodul yang menyediakan antarmuka tingkat tinggi untuk tugas paralel. Ini menyediakan ThreadPoolExecutordan ProcessPoolExecutor, sehingga Anda dapat menggunakan utas atau kumpulan proses dengan api yang sama.

Satu keuntungan dari api ini adalah mengirimkan tugas ke objek yang Executordikembalikan Future, yang akan lengkap dengan nilai balik dari callable yang Anda kirimkan.

Ini membuat melampirkan queueobjek tidak perlu, yang menyederhanakan dekorator:

_DEFAULT_POOL = ThreadPoolExecutor()

def threadpool(f, executor=None):
    @wraps(f)
    def wrap(*args, **kwargs):
        return (executor or _DEFAULT_POOL).submit(f, *args, **kwargs)

    return wrap

Ini akan menggunakan pelaksana threadpool modul default jika tidak ada yang masuk

Penggunaannya sangat mirip dengan sebelumnya:

@threadpool
def long_task(x):
    import time
    x = x + 5
    time.sleep(5)
    return x

# does not block, returns Future object
y = long_task(10)
print y

# this blocks, waiting for the result
result = y.result()
print result

Jika Anda menggunakan Python 3.4+, satu fitur yang sangat bagus dari menggunakan metode ini (dan objek Future pada umumnya) adalah bahwa future yang dikembalikan dapat dibungkus untuk mengubahnya menjadi asyncio.Futurewith asyncio.wrap_future. Ini membuatnya bekerja dengan mudah dengan coroutine:

result = await asyncio.wrap_future(long_task(10))

Jika Anda tidak memerlukan akses ke concurrent.Futureobjek yang mendasarinya , Anda dapat menyertakan bungkus di dekorator:

_DEFAULT_POOL = ThreadPoolExecutor()

def threadpool(f, executor=None):
    @wraps(f)
    def wrap(*args, **kwargs):
        return asyncio.wrap_future((executor or _DEFAULT_POOL).submit(f, *args, **kwargs))

    return wrap

Kemudian, setiap kali Anda perlu mendorong kode intensif atau memblokir cpu dari untaian loop acara, Anda dapat meletakkannya dalam fungsi yang didekorasi:

@threadpool
def some_long_calculation():
    ...

# this will suspend while the function is executed on a threadpool
result = await some_long_calculation()
bj0
sumber
Sepertinya saya tidak bisa melakukan ini; Saya mendapatkan pesan kesalahan yang menyatakan bahwa AttributeError: 'module' object has no attribute 'Lock'ini berasal dari garis y = long_task(10)... pemikiran?
sadmicrowave
1
Kode tidak secara eksplisit menggunakan Kunci, jadi masalahnya bisa berada di tempat lain dalam kode Anda. Anda mungkin ingin memposting pertanyaan SO baru tentang hal itu
bj0
Mengapa result_queue merupakan atribut instance? Apakah akan lebih baik jika itu adalah atribut kelas sehingga pengguna tidak perlu tahu untuk memanggil result_queue saat menggunakan @threaded yang tidak eksplisit dan ambigu?
nonbot
@ t88, tidak yakin apa yang Anda maksud, Anda perlu cara mengakses hasilnya, yang berarti Anda harus tahu harus menelepon apa. Jika Anda menginginkannya menjadi sesuatu yang lain, Anda dapat mensubklasifikasikan Thread dan melakukan apa yang Anda inginkan (ini solusi sederhana) Alasan mengapa antrian harus dilampirkan ke utas adalah agar beberapa panggilan / fungsi memiliki antrian sendiri
bj0
1
Ini brilian! Terima kasih banyak.
Ganesh Kathiresan
53

Solusi lain yang tidak perlu mengubah kode Anda yang ada:

import Queue
from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)
    return 'foo'

que = Queue.Queue()

t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
t.join()
result = que.get()
print result

Itu juga dapat dengan mudah disesuaikan dengan lingkungan multi-utas:

import Queue
from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)
    return 'foo'

que = Queue.Queue()
threads_list = list()

t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
threads_list.append(t)

# Add more threads here
...
threads_list.append(t2)
...
threads_list.append(t3)
...

# Join all the threads
for t in threads_list:
    t.join()

# Check thread's return value
while not que.empty():
    result = que.get()
    print result
Arik
sumber
t = Utas (target = lambda q, arg1: q.put (foo (arg1)), args = (que, 'world!')) whats q.put lakukan di sini, apa yang dilakukan oleh Queue.Queue ()
vijay shanker
6
Seharusnya ada patung Anda di kota Anda, terima kasih!
Onilol
3
@Onilol - Terima kasih banyak. Komentar Anda adalah alasan saya melakukan ini :)
Arik
4
Untuk Python3, perlu diubah ke from queue import Queue.
Gino Mempin
1
Ini tampaknya menjadi metode yang paling tidak mengganggu (tidak perlu secara dramatis merestrukturisasi basis kode asli) untuk memungkinkan nilai kembali kembali ke utas utama.
Fanchen Bao
24

Parris / kindall's answer join / returnanswer porting ke Python 3:

from threading import Thread

def foo(bar):
    print('hello {0}'.format(bar))
    return "foo"

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None):
        Thread.__init__(self, group, target, name, args, kwargs, daemon=daemon)

        self._return = None

    def run(self):
        if self._target is not None:
            self._return = self._target(*self._args, **self._kwargs)

    def join(self):
        Thread.join(self)
        return self._return


twrv = ThreadWithReturnValue(target=foo, args=('world!',))

twrv.start()
print(twrv.join())   # prints foo

Catatan, Threadkelas diimplementasikan secara berbeda di Python 3.

GuySoft
sumber
1
join mengambil parameter batas waktu yang harus dilewati
cz
22

Saya mencuri jawaban baik hati dan membersihkannya sedikit saja.

Bagian kuncinya adalah menambahkan * args dan ** kwargs untuk bergabung () untuk menangani batas waktu

class threadWithReturn(Thread):
    def __init__(self, *args, **kwargs):
        super(threadWithReturn, self).__init__(*args, **kwargs)

        self._return = None

    def run(self):
        if self._Thread__target is not None:
            self._return = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)

    def join(self, *args, **kwargs):
        super(threadWithReturn, self).join(*args, **kwargs)

        return self._return

JAWABAN DIPERBARUI DI BAWAH INI

Ini adalah jawaban saya yang paling populer, jadi saya memutuskan untuk memperbarui dengan kode yang akan berjalan pada py2 dan py3.

Selain itu, saya melihat banyak jawaban untuk pertanyaan ini yang menunjukkan kurangnya pemahaman tentang Thread.join (). Beberapa gagal menangani timeoutarg. Tetapi ada juga kasus sudut yang harus Anda ketahui tentang instance ketika Anda memiliki (1) fungsi target yang dapat kembali Nonedan (2) Anda juga memberikan timeoutargumen untuk bergabung (). Silakan lihat "TEST 4" untuk memahami kasus sudut ini.

Kelas ThreadWithReturn yang bekerja dengan py2 dan py3:

import sys
from threading import Thread
from builtins import super    # https://stackoverflow.com/a/30159479

if sys.version_info >= (3, 0):
    _thread_target_key = '_target'
    _thread_args_key = '_args'
    _thread_kwargs_key = '_kwargs'
else:
    _thread_target_key = '_Thread__target'
    _thread_args_key = '_Thread__args'
    _thread_kwargs_key = '_Thread__kwargs'

class ThreadWithReturn(Thread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._return = None

    def run(self):
        target = getattr(self, _thread_target_key)
        if not target is None:
            self._return = target(
                *getattr(self, _thread_args_key),
                **getattr(self, _thread_kwargs_key)
            )

    def join(self, *args, **kwargs):
        super().join(*args, **kwargs)
        return self._return

Beberapa tes sampel ditunjukkan di bawah ini:

import time, random

# TEST TARGET FUNCTION
def giveMe(arg, seconds=None):
    if not seconds is None:
        time.sleep(seconds)
    return arg

# TEST 1
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',))
my_thread.start()
returned = my_thread.join()
# (returned == 'stringy')

# TEST 2
my_thread = ThreadWithReturn(target=giveMe, args=(None,))
my_thread.start()
returned = my_thread.join()
# (returned is None)

# TEST 3
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=2)
# (returned is None) # because join() timed out before giveMe() finished

# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))

Bisakah Anda mengidentifikasi kasus sudut yang mungkin kita temui dengan TEST 4?

Masalahnya adalah kita mengharapkan giveMe () mengembalikan None (lihat TEST 2), tetapi kami juga berharap join () mengembalikan None jika itu habis.

returned is None berarti:

(1) itulah yang mengembalikan giveMe (), atau

(2) gabung () habis

Contoh ini sepele karena kita tahu bahwa giveMe () akan selalu mengembalikan None. Tetapi dalam contoh dunia nyata (di mana target dapat secara sah mengembalikan Tidak ada atau sesuatu yang lain) kami ingin secara eksplisit memeriksa apa yang terjadi.

Di bawah ini adalah cara mengatasi casing sudut ini:

# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))

if my_thread.isAlive():
    # returned is None because join() timed out
    # this also means that giveMe() is still running in the background
    pass
    # handle this based on your app's logic
else:
    # join() is finished, and so is giveMe()
    # BUT we could also be in a race condition, so we need to update returned, just in case
    returned = my_thread.join()
pengguna2426679
sumber
Apakah Anda tahu padanan _Thread_target untuk Python3? Atribut itu tidak ada di Python3.
GreySage
Saya melihat dalam file threading.py, ternyata itu adalah _target (atribut lain juga bernama).
GreySage
Anda bisa menghindari mengakses variabel pribadi kelas benang, jika Anda menyimpan target, argsdan kwargsargumen untuk init sebagai variabel anggota di kelas Anda.
Tolli
@GreySage Lihat jawaban saya, saya porting blok ini ke python3 di bawah ini
GuySoft
@GreySage menjawab sekarang mendukung py2 dan py3
user2426679
15

Menggunakan Antrian:

import threading, queue

def calc_square(num, out_queue1):
  l = []
  for x in num:
    l.append(x*x)
  out_queue1.put(l)


arr = [1,2,3,4,5,6,7,8,9,10]
out_queue1=queue.Queue()
t1=threading.Thread(target=calc_square, args=(arr,out_queue1))
t1.start()
t1.join()
print (out_queue1.get())
pengguna341143
sumber
1
Sangat suka solusi ini, pendek dan manis. Jika fungsi Anda membaca antrian input, dan Anda menambahnya, out_queue1Anda harus mengulang out_queue1.get()dan menangkap antrian. Pengecualian ret = [] ; try: ; while True; ret.append(out_queue1.get(block=False)) ; except Queue.Empty: ; pass. Semi-titik dua untuk mensimulasikan garis putus.
sastorsl
6

Solusi saya untuk masalah ini adalah untuk membungkus fungsi dan utas dalam sebuah kelas. Tidak perlu menggunakan kumpulan, antrian, atau variabel lewat tipe c. Ini juga bukan pemblokiran. Anda malah memeriksa status. Lihat contoh cara menggunakannya di akhir kode.

import threading

class ThreadWorker():
    '''
    The basic idea is given a function create an object.
    The object can then run the function in a thread.
    It provides a wrapper to start it,check its status,and get data out the function.
    '''
    def __init__(self,func):
        self.thread = None
        self.data = None
        self.func = self.save_data(func)

    def save_data(self,func):
        '''modify function to save its returned data'''
        def new_func(*args, **kwargs):
            self.data=func(*args, **kwargs)

        return new_func

    def start(self,params):
        self.data = None
        if self.thread is not None:
            if self.thread.isAlive():
                return 'running' #could raise exception here

        #unless thread exists and is alive start or restart it
        self.thread = threading.Thread(target=self.func,args=params)
        self.thread.start()
        return 'started'

    def status(self):
        if self.thread is None:
            return 'not_started'
        else:
            if self.thread.isAlive():
                return 'running'
            else:
                return 'finished'

    def get_results(self):
        if self.thread is None:
            return 'not_started' #could return exception
        else:
            if self.thread.isAlive():
                return 'running'
            else:
                return self.data

def add(x,y):
    return x +y

add_worker = ThreadWorker(add)
print add_worker.start((1,2,))
print add_worker.status()
print add_worker.get_results()
Peter Lonjers
sumber
bagaimana Anda menangani pengecualian? katakanlah fungsi add diberikan dan int dan str. apakah semua utas gagal atau hanya satu yang gagal?
user1745713
4

joinselalu kembali None, saya pikir Anda harus subclass Threaduntuk menangani kode pengembalian dan sebagainya.

Brainstorm
sumber
4

Mempertimbangkan komentar @iman pada jawaban @JakeBiesinger, saya telah mengomposisikan ulang untuk memiliki sejumlah utas:

from multiprocessing.pool import ThreadPool

def foo(bar, baz):
    print 'hello {0}'.format(bar)
    return 'foo' + baz

numOfThreads = 3 
results = []

pool = ThreadPool(numOfThreads)

for i in range(0, numOfThreads):
    results.append(pool.apply_async(foo, ('world', 'foo'))) # tuple of args for foo)

# do some other stuff in the main process
# ...
# ...

results = [r.get() for r in results]
print results

pool.close()
pool.join()

Bersulang,

Orang.

Guy Avraham
sumber
2

Anda dapat mendefinisikan mutable di atas lingkup fungsi berulir, dan menambahkan hasilnya. (Saya juga memodifikasi kode agar kompatibel dengan python3)

returns = {}
def foo(bar):
    print('hello {0}'.format(bar))
    returns[bar] = 'foo'

from threading import Thread
t = Thread(target=foo, args=('world!',))
t.start()
t.join()
print(returns)

Ini kembali {'world!': 'foo'}

Jika Anda menggunakan input fungsi sebagai kunci untuk hasil Anda, setiap input unik dijamin untuk memberikan entri dalam hasil

Thijs D
sumber
2

Saya menggunakan pembungkus ini, yang dengan nyaman mengubah fungsi apa pun untuk berjalan dalam Thread- menjaga nilai pengembalian atau pengecualiannya. Itu tidak menambah Queueoverhead.

def threading_func(f):
    """Decorator for running a function in a thread and handling its return
    value or exception"""
    def start(*args, **kw):
        def run():
            try:
                th.ret = f(*args, **kw)
            except:
                th.exc = sys.exc_info()
        def get(timeout=None):
            th.join(timeout)
            if th.exc:
                raise th.exc[0], th.exc[1], th.exc[2] # py2
                ##raise th.exc[1] #py3                
            return th.ret
        th = threading.Thread(None, run)
        th.exc = None
        th.get = get
        th.start()
        return th
    return start

Contoh Penggunaan

def f(x):
    return 2.5 * x
th = threading_func(f)(4)
print("still running?:", th.is_alive())
print("result:", th.get(timeout=1.0))

@threading_func
def th_mul(a, b):
    return a * b
th = th_mul("text", 2.5)

try:
    print(th.get())
except TypeError:
    print("exception thrown ok.")

Catatan tentang threadingmodul

Nilai pengembalian yang nyaman & penanganan perkecualian pada fungsi berulir adalah kebutuhan "Pythonic" yang sering dan seharusnya memang sudah ditawarkan oleh threadingmodul - mungkin langsung di Threadkelas standar . ThreadPoolmemiliki terlalu banyak overhead untuk tugas-tugas sederhana - 3 mengelola utas, banyak birokrasi. Sayangnya Threadtata letak disalin dari Jawa awalnya - yang Anda lihat misalnya dari parameter konstruktor 1st (!) Yang masih tidak berguna group.

kxr
sumber
konstruktor pertama tidak sia-sia, yang disediakan di sana untuk implementasi di masa depan .. dari buku resep pemrograman paralel python
vijay shanker
1

Tentukan target Anda untuk
1) mengambil argumen q
2) ganti pernyataan apa pun return foodenganq.put(foo); return

jadi fungsi

def func(a):
    ans = a * a
    return ans

akan menjadi

def func(a, q):
    ans = a * a
    q.put(ans)
    return

dan kemudian Anda akan melanjutkannya

from Queue import Queue
from threading import Thread

ans_q = Queue()
arg_tups = [(i, ans_q) for i in xrange(10)]

threads = [Thread(target=func, args=arg_tup) for arg_tup in arg_tups]
_ = [t.start() for t in threads]
_ = [t.join() for t in threads]
results = [q.get() for _ in xrange(len(threads))]

Dan Anda dapat menggunakan fungsi dekorator / pembungkus untuk membuatnya sehingga Anda dapat menggunakan fungsi yang ada targettanpa mengubahnya, tetapi ikuti skema dasar ini.

tscizzle
sumber
Seharusnyaresults = [ans_q.get() for _ in xrange(len(threads))]
Hemant H Kumar
1

Seperti yang disebutkan multiprocessing pool jauh lebih lambat daripada dasar threading. Menggunakan antrian seperti yang diusulkan dalam beberapa jawaban di sini adalah alternatif yang sangat efektif. Saya telah menggunakannya dengan kamus untuk dapat menjalankan banyak utas kecil dan memulihkan beberapa jawaban dengan menggabungkannya dengan kamus:

#!/usr/bin/env python3

import threading
# use Queue for python2
import queue
import random

LETTERS = 'abcdefghijklmnopqrstuvwxyz'
LETTERS = [ x for x in LETTERS ]

NUMBERS = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

def randoms(k, q):
    result = dict()
    result['letter'] = random.choice(LETTERS)
    result['number'] = random.choice(NUMBERS)
    q.put({k: result})

threads = list()
q = queue.Queue()
results = dict()

for name in ('alpha', 'oscar', 'yankee',):
    threads.append( threading.Thread(target=randoms, args=(name, q)) )
    threads[-1].start()
_ = [ t.join() for t in threads ]
while not q.empty():
    results.update(q.get())

print(results)
Yves Dorfsman
sumber
1

Ide GuySoft bagus, tapi saya pikir objek tidak harus mewarisi dari Thread dan mulai () dapat dihapus dari antarmuka:

from threading import Thread
import queue
class ThreadWithReturnValue(object):
    def __init__(self, target=None, args=(), **kwargs):
        self._que = queue.Queue()
        self._t = Thread(target=lambda q,arg1,kwargs1: q.put(target(*arg1, **kwargs1)) ,
                args=(self._que, args, kwargs), )
        self._t.start()

    def join(self):
        self._t.join()
        return self._que.get()


def foo(bar):
    print('hello {0}'.format(bar))
    return "foo"

twrv = ThreadWithReturnValue(target=foo, args=('world!',))

print(twrv.join())   # prints foo
pesolek
sumber
0

Salah satu solusi yang biasa adalah membungkus fungsi Anda foodengan dekorator seperti

result = queue.Queue()

def task_wrapper(*args):
    result.put(target(*args))

Maka seluruh kode mungkin terlihat seperti itu

result = queue.Queue()

def task_wrapper(*args):
    result.put(target(*args))

threads = [threading.Thread(target=task_wrapper, args=args) for args in args_list]

for t in threads:
    t.start()
    while(True):
        if(len(threading.enumerate()) < max_num):
            break
for t in threads:
    t.join()
return result

Catatan

Satu masalah penting adalah bahwa nilai-nilai kembali mungkin tidak teratur . (Bahkan, return valuetidak harus disimpan ke queue, karena Anda dapat memilih struktur data aman- sewenang - wenang )

Response777
sumber
0

Mengapa tidak menggunakan variabel global saja?

import threading


class myThread(threading.Thread):
    def __init__(self, ind, lock):
        threading.Thread.__init__(self)
        self.ind = ind
        self.lock = lock

    def run(self):
        global results
        with self.lock:
            results.append(self.ind)



results = []
lock = threading.Lock()
threads = [myThread(x, lock) for x in range(1, 4)]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(results)
Alexey
sumber
0

Jawaban Kindall dalam Python3

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, *, daemon=None):
        Thread.__init__(self, group, target, name, args, kwargs, daemon)
        self._return = None 

    def run(self):
        try:
            if self._target:
                self._return = self._target(*self._args, **self._kwargs)
        finally:
            del self._target, self._args, self._kwargs 

    def join(self,timeout=None):
        Thread.join(self,timeout)
        return self._return
SmartManoj
sumber
-2

Jika hanya Benar atau Salah yang akan divalidasi dari panggilan fungsi, solusi sederhana yang saya temukan adalah memperbarui daftar global.

import threading

lists = {"A":"True", "B":"True"}

def myfunc(name: str, mylist):
    for i in mylist:
        if i == 31:
            lists[name] = "False"
            return False
        else:
            print("name {} : {}".format(name, i))

t1 = threading.Thread(target=myfunc, args=("A", [1, 2, 3, 4, 5, 6], ))
t2 = threading.Thread(target=myfunc, args=("B", [11, 21, 31, 41, 51, 61], ))
t1.start()
t2.start()
t1.join()
t2.join()

for value in lists.values():
    if value == False:
        # Something is suspicious 
        # Take necessary action 

Ini lebih bermanfaat jika Anda ingin mengetahui apakah ada salah satu utas yang mengembalikan status palsu untuk mengambil tindakan yang diperlukan.

Sravya
sumber