Tangkap pengecualian utas di utas penelepon dengan Python

208

Saya sangat baru dengan Python dan pemrograman multithread secara umum. Pada dasarnya, saya memiliki skrip yang akan menyalin file ke lokasi lain. Saya ingin ini ditempatkan di utas lain sehingga saya dapat menampilkan ....bahwa skrip masih berjalan.

Masalah yang saya alami adalah jika file tidak dapat disalin maka akan menimbulkan pengecualian. Ini ok jika berjalan di utas utama; namun, memiliki kode berikut tidak berfungsi:

try:
    threadClass = TheThread(param1, param2, etc.)
    threadClass.start()   ##### **Exception takes place here**
except:
    print "Caught an exception"

Di kelas utas itu sendiri, saya mencoba melemparkan kembali pengecualian, tetapi tidak berhasil. Saya telah melihat orang-orang di sini mengajukan pertanyaan serupa, tetapi mereka semua tampaknya melakukan sesuatu yang lebih spesifik daripada apa yang saya coba lakukan (dan saya tidak begitu mengerti solusi yang ditawarkan). Saya telah melihat orang menyebutkan penggunaan sys.exc_info(), namun saya tidak tahu di mana atau bagaimana menggunakannya.

Semua bantuan sangat dihargai!

EDIT: Kode untuk kelas utas di bawah:

class TheThread(threading.Thread):
    def __init__(self, sourceFolder, destFolder):
        threading.Thread.__init__(self)
        self.sourceFolder = sourceFolder
        self.destFolder = destFolder

    def run(self):
        try:
           shul.copytree(self.sourceFolder, self.destFolder)
        except:
           raise
Phanto
sumber
Bisakah Anda memberikan lebih banyak wawasan tentang apa yang terjadi di dalam TheThread? Contoh kode mungkin?
jathanism
Tentu. Saya akan mengedit respons saya di atas untuk memasukkan beberapa detail.
Phanto
1
Sudahkah Anda mempertimbangkan untuk memutarnya sehingga Thread utama adalah bit yang berfungsi dan indikator progres ada di Thread yang muncul?
Dan Head
1
Dan Head, apakah Anda mengacu pada utas utama pertama yang memunculkan fungsi "..." dan kemudian menjalankan fungsi salin? Itu bisa bekerja dan menghindari masalah pengecualian. Tapi, saya masih ingin belajar cara memasang thread python dengan benar.
Phanto

Jawaban:

114

Masalahnya adalah thread_obj.start() segera kembali. Utas anak yang Anda buat dijalankan dalam konteksnya sendiri, dengan tumpukannya sendiri. Setiap pengecualian yang terjadi ada dalam konteks utas anak, dan itu ada di tumpukannya sendiri. Salah satu cara yang dapat saya pikirkan saat ini untuk mengkomunikasikan informasi ini ke utas induk adalah dengan menggunakan semacam pesan yang lewat, sehingga Anda dapat melihatnya.

Coba ini untuk ukuran:

import sys
import threading
import Queue


class ExcThread(threading.Thread):

    def __init__(self, bucket):
        threading.Thread.__init__(self)
        self.bucket = bucket

    def run(self):
        try:
            raise Exception('An error occured here.')
        except Exception:
            self.bucket.put(sys.exc_info())


def main():
    bucket = Queue.Queue()
    thread_obj = ExcThread(bucket)
    thread_obj.start()

    while True:
        try:
            exc = bucket.get(block=False)
        except Queue.Empty:
            pass
        else:
            exc_type, exc_obj, exc_trace = exc
            # deal with the exception
            print exc_type, exc_obj
            print exc_trace

        thread_obj.join(0.1)
        if thread_obj.isAlive():
            continue
        else:
            break


if __name__ == '__main__':
    main()
Santa
sumber
5
Mengapa tidak bergabung dengan utas alih-alih lingkaran jelek ini? Lihat yang multiprocessingsetara: gist.github.com/2311116
schlamar
1
Mengapa tidak menggunakan pola EventHook stackoverflow.com/questions/1092531/event-system-in-python/… berdasarkan jawaban @Lasse? Daripada hal loop?
Andre Miras
1
Antrian bukan kendaraan terbaik untuk mengkomunikasikan kesalahan kembali, kecuali jika Anda ingin memiliki antrian penuh dari mereka. Konstruk yang jauh lebih baik adalah threading.Event ()
Muposat
1
Ini tampaknya tidak aman bagi saya. Apa yang terjadi ketika utas memunculkan pengecualian tepat setelah bucket.get()kenaikan Queue.Empty? Maka utas join(0.1)akan selesai dan isAlive() is False, dan Anda melewatkan pengecualian Anda.
Steve
1
Queuetidak perlu dalam kasus sederhana ini - Anda bisa menyimpan info pengecualian sebagai properti ExcThreadselama Anda memastikan bahwa run()melengkapi tepat setelah pengecualian (yang dilakukan dalam contoh sederhana ini). Maka Anda cukup mengajukan kembali pengecualian setelah (atau selama) t.join(). Tidak ada masalah sinkronisasi karena join()memastikan utas telah selesai. Lihat jawaban oleh Rok Strniša di bawah stackoverflow.com/a/12223550/126362
ejm
42

The concurrent.futuresModul memudahkan untuk melakukan pekerjaan di thread terpisah (atau proses) dan menangani pengecualian yang dihasilkan:

import concurrent.futures
import shutil

def copytree_with_dots(src_path, dst_path):
    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
        # Execute the copy on a separate thread,
        # creating a future object to track progress.
        future = executor.submit(shutil.copytree, src_path, dst_path)

        while future.running():
            # Print pretty dots here.
            pass

        # Return the value returned by shutil.copytree(), None.
        # Raise any exceptions raised during the copy process.
        return future.result()

concurrent.futuresdisertakan dengan Python 3.2, dan tersedia sebagai modul backportedfutures untuk versi sebelumnya.

Jon-Eric
sumber
5
Meskipun ini tidak melakukan persis apa yang diminta OP, itu persis petunjuk yang saya butuhkan. Terima kasih.
Fisikawan Gila
2
Dan dengan concurrent.futures.as_completed, Anda dapat segera diberi tahu karena pengecualian muncul: stackoverflow.com/questions/2829329/…
Ciro Santilli 郝海东 冠状 病 六四 事件 法轮功
1
Kode ini memblokir utas utama. Bagaimana Anda melakukan ini secara tidak sinkron?
Nikolay Shindarov
40

Ada banyak jawaban rumit yang sangat aneh untuk pertanyaan ini. Apakah saya terlalu menyederhanakan ini, karena ini tampaknya cukup untuk sebagian besar hal bagi saya.

from threading import Thread

class PropagatingThread(Thread):
    def run(self):
        self.exc = None
        try:
            if hasattr(self, '_Thread__target'):
                # Thread uses name mangling prior to Python 3.
                self.ret = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
            else:
                self.ret = self._target(*self._args, **self._kwargs)
        except BaseException as e:
            self.exc = e

    def join(self):
        super(PropagatingThread, self).join()
        if self.exc:
            raise self.exc
        return self.ret

Jika Anda yakin Anda hanya akan menjalankan satu atau versi lain dari Python, Anda bisa mengurangi run()metode menjadi hanya versi yang rusak (jika Anda hanya akan berjalan pada versi Python sebelum 3), atau hanya versi bersih (jika Anda hanya akan menjalankan versi Python dimulai dengan 3).

Contoh penggunaan:

def f(*args, **kwargs):
    print(args)
    print(kwargs)
    raise Exception('I suck at this')

t = PropagatingThread(target=f, args=(5,), kwargs={'hello':'world'})
t.start()
t.join()

Dan Anda akan melihat pengecualian yang muncul di utas lainnya saat Anda bergabung.

Jika Anda menggunakan sixatau hanya pada Python 3, Anda bisa meningkatkan informasi jejak tumpukan yang Anda dapatkan ketika pengecualian kembali. Alih-alih hanya tumpukan di titik gabungan, Anda bisa membungkus pengecualian dalam dengan pengecualian luar baru, dan mendapatkan kedua jejak tumpukan dengan

six.raise_from(RuntimeError('Exception in thread'),self.exc)

atau

raise RuntimeError('Exception in thread') from self.exc
ArtOfWarfare
sumber
1
Saya tidak yakin mengapa jawaban ini juga tidak lebih populer. Ada orang lain di sini yang melakukan propagasi sederhana juga, tetapi membutuhkan perluasan kelas dan overriding. Yang ini hanya melakukan apa yang diharapkan banyak orang, dan hanya membutuhkan perubahan dari Thread ke ProagatingThread. Dan 4 tab spasi sehingga salinan / tempel saya sepele :-) ... satu-satunya peningkatan yang saya sarankan adalah menggunakan six.raise_from () sehingga Anda mendapatkan kumpulan jejak tumpukan bersarang yang bagus, bukan hanya tumpukan untuk situs reraise.
aggieNick02
Terima kasih banyak. Solusi yang sangat sederhana.
sonulohani
Masalah saya adalah bahwa saya memiliki banyak utas anak. Gabungan dieksekusi dalam urutan, dan pengecualian mungkin dimunculkan dari utas yang bergabung kemudian. Apakah ada solusi sederhana untuk masalah saya? jalankan gabung secara bersamaan?
chuan
Terima kasih, itu bekerja dengan sempurna! Tidak yakin mengapa itu tidak ditangani langsung oleh python tho ...
GG.
Ini adalah jawaban yang paling berguna, sulution ini jauh lebih umum daripada yang lain namun sederhana. Akan menggunakannya dalam proyek!
Konstantin Sekeresh
30

Meskipun tidak mungkin untuk secara langsung menangkap pengecualian yang dilemparkan ke utas yang berbeda, berikut adalah kode untuk mendapatkan sesuatu yang sangat dekat dengan fungsi ini secara transparan. Utas anak Anda harus subkelas ExThreadkelas alih-alih threading.Threaddan utas induk harus memanggil child_thread.join_with_exception()metode alih-alih child_thread.join()saat menunggu utas menyelesaikan tugasnya.

Detail teknis dari implementasi ini: ketika utas anak melempar pengecualian, ia diteruskan ke induk melalui Queuedan dilemparkan lagi di utas induk. Perhatikan bahwa tidak ada kesibukan menunggu dalam pendekatan ini.

#!/usr/bin/env python

import sys
import threading
import Queue

class ExThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.__status_queue = Queue.Queue()

    def run_with_exception(self):
        """This method should be overriden."""
        raise NotImplementedError

    def run(self):
        """This method should NOT be overriden."""
        try:
            self.run_with_exception()
        except BaseException:
            self.__status_queue.put(sys.exc_info())
        self.__status_queue.put(None)

    def wait_for_exc_info(self):
        return self.__status_queue.get()

    def join_with_exception(self):
        ex_info = self.wait_for_exc_info()
        if ex_info is None:
            return
        else:
            raise ex_info[1]

class MyException(Exception):
    pass

class MyThread(ExThread):
    def __init__(self):
        ExThread.__init__(self)

    def run_with_exception(self):
        thread_name = threading.current_thread().name
        raise MyException("An error in thread '{}'.".format(thread_name))

def main():
    t = MyThread()
    t.start()
    try:
        t.join_with_exception()
    except MyException as ex:
        thread_name = threading.current_thread().name
        print "Caught a MyException in thread '{}': {}".format(thread_name, ex)

if __name__ == '__main__':
    main()
Mateusz Kobos
sumber
1
Tidakkah Anda ingin menangkap BaseException, bukan Exception? Yang Anda lakukan hanyalah menyebarkan pengecualian dari satu Threadke yang lain. Saat ini, IE, a KeyboardInterrupt, akan diabaikan secara diam-diam jika diangkat di utas latar belakang.
ArtOfWarfare
join_with_exceptionhang tanpa batas waktu jika dipanggil untuk kedua kalinya pada utas mati. Perbaiki: github.com/fraserharris/threading-extensions/blob/master/…
Fraser Harris
Saya pikir Queueitu tidak perlu; lihat komentar saya untuk jawaban @ Santa. Anda dapat menyederhanakannya menjadi sesuatu seperti jawaban Rok Strniša di bawah stackoverflow.com/a/12223550/126362
ejm
22

Jika pengecualian terjadi di utas, cara terbaik adalah menaikkannya di utas penelepon selama join. Anda bisa mendapatkan informasi tentang pengecualian yang sedang ditangani menggunakan sys.exc_info()fungsi ini. Informasi ini dapat dengan mudah disimpan sebagai properti dari objek thread sampai joindipanggil, pada titik mana ia dapat diangkat kembali.

Perhatikan bahwa Queue.Queue(seperti yang disarankan dalam jawaban lain) tidak diperlukan dalam kasus sederhana ini di mana utas melemparkan paling banyak 1 pengecualian dan selesai tepat setelah melemparkan pengecualian . Kami menghindari kondisi balapan dengan hanya menunggu utas selesai.

Misalnya, tambah ExcThread(di bawah), excRunganti (bukan run).

Python 2.x:

import threading

class ExcThread(threading.Thread):
  def excRun(self):
    pass

  def run(self):
    self.exc = None
    try:
      # Possibly throws an exception
      self.excRun()
    except:
      import sys
      self.exc = sys.exc_info()
      # Save details of the exception thrown but don't rethrow,
      # just complete the function

  def join(self):
    threading.Thread.join(self)
    if self.exc:
      msg = "Thread '%s' threw an exception: %s" % (self.getName(), self.exc[1])
      new_exc = Exception(msg)
      raise new_exc.__class__, new_exc, self.exc[2]

Python 3.x:

Bentuk argumen 3 untuk raisehilang dalam Python 3, jadi ubah baris terakhir ke:

raise new_exc.with_traceback(self.exc[2])
Rok Strniša
sumber
2
Mengapa Anda menggunakan threading.Thread.join (self) alih-alih super (ExcThread, self) .join ()?
Richard Möhn
9

concurrent.futures.as_completed

https://docs.python.org/3.7/library/concurrent.futures.html#concurrent.futures.as_completed

Solusi berikut:

  • kembali ke utas utama segera ketika pengecualian dipanggil
  • tidak memerlukan kelas tambahan yang ditentukan pengguna karena tidak perlu:
    • eksplisit Queue
    • untuk menambahkan kecuali yang lain di sekitar utas kerja Anda

Sumber:

#!/usr/bin/env python3

import concurrent.futures
import time

def func_that_raises(do_raise):
    for i in range(3):
        print(i)
        time.sleep(0.1)
    if do_raise:
        raise Exception()
    for i in range(3):
        print(i)
        time.sleep(0.1)

with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    futures = []
    futures.append(executor.submit(func_that_raises, False))
    futures.append(executor.submit(func_that_raises, True))
    for future in concurrent.futures.as_completed(futures):
        print(repr(future.exception()))

Output yang mungkin:

0
0
1
1
2
2
0
Exception()
1
2
None

Sayangnya tidak mungkin untuk membunuh berjangka untuk membatalkan yang lain karena salah satu gagal:

Jika Anda melakukan sesuatu seperti:

for future in concurrent.futures.as_completed(futures):
    if future.exception() is not None:
        raise future.exception()

kemudian withmenangkapnya, dan menunggu utas kedua selesai sebelum melanjutkan. Berikut ini berperilaku serupa:

for future in concurrent.futures.as_completed(futures):
    future.result()

karena future.result()kembali memunculkan pengecualian jika terjadi.

Jika Anda ingin keluar dari seluruh proses Python, Anda mungkin lolos os._exit(0), tetapi ini berarti Anda membutuhkan refactor.

Kelas khusus dengan semantik pengecualian sempurna

Saya akhirnya coding antarmuka yang sempurna untuk diri saya sendiri di: Cara yang tepat untuk membatasi jumlah maksimum utas berjalan sekaligus? bagian "Contoh antrian dengan penanganan kesalahan". Kelas itu bertujuan untuk nyaman, dan memberi Anda kontrol total atas pengiriman dan penanganan hasil / kesalahan.

Diuji pada Python 3.6.7, Ubuntu 18.04.

Ciro Santilli 新疆 改造 中心 996ICU 六四 事件
sumber
4

Ini adalah masalah kecil yang tidak menyenangkan, dan saya ingin memasukkan solusi saya. Beberapa solusi lain yang saya temukan (async.io misalnya) terlihat menjanjikan tetapi juga memberikan sedikit kotak hitam. Pendekatan loop antrian / acara mengaitkan Anda dengan implementasi tertentu. Namun, kode sumber berjangka berjangka hanya sekitar 1000 baris, dan mudah dipahami . Itu memungkinkan saya untuk dengan mudah menyelesaikan masalah saya: membuat utas pekerja ad-hoc tanpa banyak pengaturan, dan untuk dapat menangkap pengecualian di utas utama.

Solusi saya menggunakan API berjangka bersamaan dan API threading. Ini memungkinkan Anda untuk membuat pekerja yang memberi Anda utas dan masa depan. Dengan begitu, Anda dapat bergabung dengan utas untuk menunggu hasilnya:

worker = Worker(test)
thread = worker.start()
thread.join()
print(worker.future.result())

... atau Anda dapat membiarkan pekerja hanya mengirim panggilan balik ketika selesai:

worker = Worker(test)
thread = worker.start(lambda x: print('callback', x))

... atau Anda dapat mengulang sampai acara selesai:

worker = Worker(test)
thread = worker.start()

while True:
    print("waiting")
    if worker.future.done():
        exc = worker.future.exception()
        print('exception?', exc)
        result = worker.future.result()
        print('result', result)           
        break
    time.sleep(0.25)

Berikut kodenya:

from concurrent.futures import Future
import threading
import time

class Worker(object):
    def __init__(self, fn, args=()):
        self.future = Future()
        self._fn = fn
        self._args = args

    def start(self, cb=None):
        self._cb = cb
        self.future.set_running_or_notify_cancel()
        thread = threading.Thread(target=self.run, args=())
        thread.daemon = True #this will continue thread execution after the main thread runs out of code - you can still ctrl + c or kill the process
        thread.start()
        return thread

    def run(self):
        try:
            self.future.set_result(self._fn(*self._args))
        except BaseException as e:
            self.future.set_exception(e)

        if(self._cb):
            self._cb(self.future.result())

... dan fungsi tes:

def test(*args):
    print('args are', args)
    time.sleep(2)
    raise Exception('foo')
Calvin Froedge
sumber
2

Sebagai noobie untuk Threading, saya butuh waktu lama untuk memahami bagaimana menerapkan kode Mateusz Kobos (di atas). Berikut adalah versi yang diklarifikasi untuk membantu memahami cara menggunakannya.

#!/usr/bin/env python

import sys
import threading
import Queue

class ExThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.__status_queue = Queue.Queue()

    def run_with_exception(self):
        """This method should be overriden."""
        raise NotImplementedError

    def run(self):
        """This method should NOT be overriden."""
        try:
            self.run_with_exception()
        except Exception:
            self.__status_queue.put(sys.exc_info())
        self.__status_queue.put(None)

    def wait_for_exc_info(self):
        return self.__status_queue.get()

    def join_with_exception(self):
        ex_info = self.wait_for_exc_info()
        if ex_info is None:
            return
        else:
            raise ex_info[1]

class MyException(Exception):
    pass

class MyThread(ExThread):
    def __init__(self):
        ExThread.__init__(self)

    # This overrides the "run_with_exception" from class "ExThread"
    # Note, this is where the actual thread to be run lives. The thread
    # to be run could also call a method or be passed in as an object
    def run_with_exception(self):
        # Code will function until the int
        print "sleeping 5 seconds"
        import time
        for i in 1, 2, 3, 4, 5:
            print i
            time.sleep(1) 
        # Thread should break here
        int("str")
# I'm honestly not sure why these appear here? So, I removed them. 
# Perhaps Mateusz can clarify?        
#         thread_name = threading.current_thread().name
#         raise MyException("An error in thread '{}'.".format(thread_name))

if __name__ == '__main__':
    # The code lives in MyThread in this example. So creating the MyThread 
    # object set the code to be run (but does not start it yet)
    t = MyThread()
    # This actually starts the thread
    t.start()
    print
    print ("Notice 't.start()' is considered to have completed, although" 
           " the countdown continues in its new thread. So you code "
           "can tinue into new processing.")
    # Now that the thread is running, the join allows for monitoring of it
    try:
        t.join_with_exception()
    # should be able to be replace "Exception" with specific error (untested)
    except Exception, e: 
        print
        print "Exceptioon was caught and control passed back to the main thread"
        print "Do some handling here...or raise a custom exception "
        thread_name = threading.current_thread().name
        e = ("Caught a MyException in thread: '" + 
             str(thread_name) + 
             "' [" + str(e) + "]")
        raise Exception(e) # Or custom class of exception, such as MyException
BurningKrome
sumber
2

Cara yang sama seperti RickardSjogren's tanpa Antrian, sys dll. Tetapi juga tanpa beberapa pendengar sinyal: mengeksekusi langsung pengecualian handler yang sesuai dengan blok kecuali.

#!/usr/bin/env python3

import threading

class ExceptionThread(threading.Thread):

    def __init__(self, callback=None, *args, **kwargs):
        """
        Redirect exceptions of thread to an exception handler.

        :param callback: function to handle occured exception
        :type callback: function(thread, exception)
        :param args: arguments for threading.Thread()
        :type args: tuple
        :param kwargs: keyword arguments for threading.Thread()
        :type kwargs: dict
        """
        self._callback = callback
        super().__init__(*args, **kwargs)

    def run(self):
        try:
            if self._target:
                self._target(*self._args, **self._kwargs)
        except BaseException as e:
            if self._callback is None:
                raise e
            else:
                self._callback(self, e)
        finally:
            # Avoid a refcycle if the thread is running a function with
            # an argument that has a member that points to the thread.
            del self._target, self._args, self._kwargs, self._callback

Hanya self._callback dan kecuali blok yang sedang dijalankan () yang ditambahkan ke threading normal.

rev Chickenmarkus
sumber
2

Saya tahu saya agak terlambat ke pesta di sini, tetapi saya memiliki masalah yang sangat mirip tetapi itu termasuk menggunakan tkinter sebagai GUI, dan mainloop membuatnya tidak mungkin untuk menggunakan solusi yang bergantung pada .join (). Karena itu saya mengadaptasi solusi yang diberikan dalam EDIT dari pertanyaan awal, tetapi membuatnya lebih umum untuk membuatnya lebih mudah dipahami orang lain.

Inilah kelas utas baru yang sedang beraksi:

import threading
import traceback
import logging


class ExceptionThread(threading.Thread):
    def __init__(self, *args, **kwargs):
        threading.Thread.__init__(self, *args, **kwargs)

    def run(self):
        try:
            if self._target:
                self._target(*self._args, **self._kwargs)
        except Exception:
            logging.error(traceback.format_exc())


def test_function_1(input):
    raise IndexError(input)


if __name__ == "__main__":
    input = 'useful'

    t1 = ExceptionThread(target=test_function_1, args=[input])
    t1.start()

Tentu saja Anda selalu dapat membuatnya menangani pengecualian dengan cara lain dari pencatatan, seperti mencetaknya, atau mengeluarkannya ke konsol.

Ini memungkinkan Anda untuk menggunakan kelas ExceptionThread persis seperti yang Anda lakukan pada kelas Thread, tanpa modifikasi khusus.

Firo
sumber
1

Salah satu metode yang saya sukai didasarkan pada pola pengamat . Saya mendefinisikan kelas sinyal yang menggunakan utas saya untuk memancarkan pengecualian untuk pendengar. Itu juga dapat digunakan untuk mengembalikan nilai dari utas. Contoh:

import threading

class Signal:
    def __init__(self):
        self._subscribers = list()

    def emit(self, *args, **kwargs):
        for func in self._subscribers:
            func(*args, **kwargs)

    def connect(self, func):
        self._subscribers.append(func)

    def disconnect(self, func):
        try:
            self._subscribers.remove(func)
        except ValueError:
            raise ValueError('Function {0} not removed from {1}'.format(func, self))


class WorkerThread(threading.Thread):

    def __init__(self, *args, **kwargs):
        super(WorkerThread, self).__init__(*args, **kwargs)
        self.Exception = Signal()
        self.Result = Signal()

    def run(self):
        if self._Thread__target is not None:
            try:
                self._return_value = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
            except Exception as e:
                self.Exception.emit(e)
            else:
                self.Result.emit(self._return_value)

if __name__ == '__main__':
    import time

    def handle_exception(exc):
        print exc.message

    def handle_result(res):
        print res

    def a():
        time.sleep(1)
        raise IOError('a failed')

    def b():
        time.sleep(2)
        return 'b returns'

    t = WorkerThread(target=a)
    t2 = WorkerThread(target=b)
    t.Exception.connect(handle_exception)
    t2.Result.connect(handle_result)
    t.start()
    t2.start()

    print 'Threads started'

    t.join()
    t2.join()
    print 'Done'

Saya tidak memiliki cukup pengalaman bekerja dengan utas untuk mengklaim bahwa ini adalah metode yang sepenuhnya aman. Tapi itu berhasil bagi saya dan saya suka fleksibilitasnya.

RickardJogren
sumber
Anda putuskan sambungan setelah bergabung ()?
ealeon
Saya tidak, tapi saya kira itu adalah ide yang baik sehingga Anda tidak memiliki referensi untuk hal-hal yang tidak terpakai.
RickardSjogren
saya perhatikan bahwa "handle_exception" masih merupakan bagian dari utas anak. perlu cara untuk menyebarkannya ke pemanggil utas
ealeon
1

Menggunakan pengecualian telanjang bukanlah praktik yang baik karena Anda biasanya menangkap lebih banyak daripada yang Anda tawar-menawar.

Saya sarankan memodifikasi exceptHANYA untuk pengecualian yang ingin Anda tangani. Saya tidak berpikir bahwa meningkatkannya memiliki efek yang diinginkan, karena ketika Anda pergi ke instantiate TheThreaddi bagian luar try, jika itu menimbulkan pengecualian, tugas itu tidak akan pernah terjadi.

Alih-alih, Anda mungkin ingin hanya mengingatkan dan melanjutkan, seperti:

def run(self):
    try:
       shul.copytree(self.sourceFolder, self.destFolder)
    except OSError, err:
       print err

Lalu ketika pengecualian itu tertangkap, Anda bisa menanganinya di sana. Kemudian ketika bagian luar trymenangkap pengecualian TheThread, Anda tahu itu bukan yang sudah Anda tangani, dan akan membantu Anda mengisolasi aliran proses Anda.

jathanism
sumber
1
Nah, jika ada kesalahan di utas itu sama sekali, saya ingin program lengkap untuk memberi tahu pengguna bahwa ada masalah dan berakhir dengan anggun. Untuk alasan itu, saya ingin utas utama menangkap dan menangani semua pengecualian. Namun, masalah masih ada di mana jika TheThread melempar pengecualian, utas utama mencoba / kecuali masih tidak akan menangkapnya. Saya dapat meminta utas mendeteksi pengecualian dan mengembalikan false yang menunjukkan bahwa operasi tidak berhasil. Itu akan mencapai hasil yang diinginkan yang sama, tetapi saya masih ingin tahu cara menangkap pengecualian sub-thread dengan benar.
Phanto
1

Cara sederhana menangkap pengecualian utas dan berkomunikasi kembali ke metode pemanggil bisa dengan melewati kamus atau daftar ke workermetode.

Contoh (meneruskan kamus ke metode pekerja):

import threading

def my_method(throw_me):
    raise Exception(throw_me)

def worker(shared_obj, *args, **kwargs):
    try:
        shared_obj['target'](*args, **kwargs)
    except Exception as err:
        shared_obj['err'] = err

shared_obj = {'err':'', 'target': my_method}
throw_me = "Test"

th = threading.Thread(target=worker, args=(shared_obj, throw_me), kwargs={})
th.start()
th.join()

if shared_obj['err']:
    print(">>%s" % shared_obj['err'])
rado stoyanov
sumber
1

Bungkus Utas dengan penyimpanan pengecualian.

import threading
import sys
class ExcThread(threading.Thread):

    def __init__(self, target, args = None):
        self.args = args if args else []
        self.target = target
        self.exc = None
        threading.Thread.__init__(self)

    def run(self):
        try:
            self.target(*self.args)
            raise Exception('An error occured here.')
        except Exception:
            self.exc=sys.exc_info()

def main():
    def hello(name):
        print(!"Hello, {name}!")
    thread_obj = ExcThread(target=hello, args=("Jack"))
    thread_obj.start()

    thread_obj.join()
    exc = thread_obj.exc
    if exc:
        exc_type, exc_obj, exc_trace = exc
        print(exc_type, ':',exc_obj, ":", exc_trace)

main()
ahuigo
sumber
0

pygolang menyediakan sync.WorkGroup yang, khususnya, menyebarkan pengecualian dari utas pekerja yang bertelur ke utas utama. Sebagai contoh:

#!/usr/bin/env python
"""This program demostrates how with sync.WorkGroup an exception raised in
spawned thread is propagated into main thread which spawned the worker."""

from __future__ import print_function
from golang import sync, context

def T1(ctx, *argv):
    print('T1: run ... %r' % (argv,))
    raise RuntimeError('T1: problem')

def T2(ctx):
    print('T2: ran ok')

def main():
    wg = sync.WorkGroup(context.background())
    wg.go(T1, [1,2,3])
    wg.go(T2)

    try:
        wg.wait()
    except Exception as e:
        print('Tmain: caught exception: %r\n' %e)
        # reraising to see full traceback
        raise

if __name__ == '__main__':
    main()

memberikan yang berikut saat dijalankan:

T1: run ... ([1, 2, 3],)
T2: ran ok
Tmain: caught exception: RuntimeError('T1: problem',)

Traceback (most recent call last):
  File "./x.py", line 28, in <module>
    main()
  File "./x.py", line 21, in main
    wg.wait()
  File "golang/_sync.pyx", line 198, in golang._sync.PyWorkGroup.wait
    pyerr_reraise(pyerr)
  File "golang/_sync.pyx", line 178, in golang._sync.PyWorkGroup.go.pyrunf
    f(pywg._pyctx, *argv, **kw)
  File "./x.py", line 10, in T1
    raise RuntimeError('T1: problem')
RuntimeError: T1: problem

Kode asli dari pertanyaan adalah:

    wg = sync.WorkGroup(context.background())

    def _(ctx):
        shul.copytree(sourceFolder, destFolder)
    wg.go(_)

    # waits for spawned worker to complete and, on error, reraises
    # its exception on the main thread.
    wg.wait()
kirr
sumber