Menangani pengecualian dari tugas Java ExecutorService

213

Saya mencoba menggunakan ThreadPoolExecutorkelas Java untuk menjalankan sejumlah besar tugas berat dengan sejumlah utas. Setiap tugas memiliki banyak tempat di mana mungkin gagal karena pengecualian.

Saya telah mensubklasifikasikan ThreadPoolExecutordan mengganti afterExecutemetode yang seharusnya memberikan pengecualian tanpa tertangkap saat menjalankan tugas. Namun, sepertinya saya tidak bisa membuatnya bekerja.

Sebagai contoh:

public class ThreadPoolErrors extends ThreadPoolExecutor {
    public ThreadPoolErrors() {
        super(  1, // core threads
                1, // max threads
                1, // timeout
                TimeUnit.MINUTES, // timeout units
                new LinkedBlockingQueue<Runnable>() // work queue
        );
    }

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if(t != null) {
            System.out.println("Got an error: " + t);
        } else {
            System.out.println("Everything's fine--situation normal!");
        }
    }

    public static void main( String [] args) {
        ThreadPoolErrors threadPool = new ThreadPoolErrors();
        threadPool.submit( 
                new Runnable() {
                    public void run() {
                        throw new RuntimeException("Ouch! Got an error.");
                    }
                }
        );
        threadPool.shutdown();
    }
}

Output dari program ini adalah "Semuanya baik-baik saja - situasi normal!" meskipun satu-satunya Runnable yang dikirimkan ke thread pool memberikan pengecualian. Adakah petunjuk tentang apa yang terjadi di sini?

Terima kasih!

Tom
sumber
Anda tidak pernah mempertanyakan Masa Depan tugas, apa yang terjadi di sana. Seluruh pelaksana atau program layanan tidak akan mogok. Pengecualian ditangkap dan dibungkus dalam ExecutionException. Dan apakah dia akan dipekerjakan kembali jika Anda memanggil future.get (). PS: Masa depan. IsDone () [Harap baca nama api asli] akan kembali benar, bahkan ketika runnable selesai dengan keliru. Karena tugas itu dilakukan dengan nyata.
Jai Pandit

Jawaban:

156

Dari dokumen :

Catatan: Ketika tindakan terlampir dalam tugas (seperti FutureTask) baik secara eksplisit atau melalui metode seperti mengirimkan, objek tugas ini menangkap dan memelihara pengecualian komputasi, dan sehingga mereka tidak menyebabkan penghentian mendadak, dan pengecualian internal tidak diteruskan ke metode ini .

Saat Anda mengirim Runnable, itu akan dibungkus dalam Masa Depan.

AfterExecute Anda harus seperti ini:

public final class ExtendedExecutor extends ThreadPoolExecutor {

    // ...

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (t == null && r instanceof Future<?>) {
            try {
                Future<?> future = (Future<?>) r;
                if (future.isDone()) {
                    future.get();
                }
            } catch (CancellationException ce) {
                t = ce;
            } catch (ExecutionException ee) {
                t = ee.getCause();
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        }
        if (t != null) {
            System.out.println(t);
        }
    }
}
tidak
sumber
7
Terima kasih, saya akhirnya menggunakan solusi ini. Selain itu, kalau-kalau ada yang tertarik: yang lain menyarankan untuk tidak mensubklasifikasi Layanan Eksekutif, tetapi saya tetap melakukannya karena saya ingin memantau tugas yang selesai daripada menunggu semuanya berakhir dan kemudian memanggil get () pada semua Futures yang dikembalikan. .
Tom
1
Pendekatan lain untuk subklas pelaksana adalah dengan subklas FutureTask dan mengganti metode 'selesai'
nos
1
Tom >> Bisakah Anda memposting kode cuplikan sampel di mana Anda subclassed ExecutorService untuk memantau tugas saat selesai ...
jagamot
1
Jawaban ini tidak akan berfungsi jika Anda menggunakan ComplableFuture.runAsync sebagai afterExecute akan berisi objek yang paket pribadi dan tidak ada cara untuk mengakses throwable. Saya mengatasinya dengan menutup telepon. Lihat jawaban saya di bawah ini.
mmm
2
Apakah kita harus memeriksa apakah masa depan selesai menggunakan future.isDone()? Karena afterExecutedijalankan setelah Runnableselesai, saya menganggap future.isDone()selalu kembali true.
Searene
247

PERINGATAN : Perlu dicatat bahwa solusi ini akan memblokir utas panggilan.


Jika Anda ingin memproses pengecualian yang dilemparkan oleh tugas, maka umumnya lebih baik digunakan Callable daripada Runnable.

Callable.call() diizinkan untuk melemparkan pengecualian yang diperiksa, dan ini dapat disebarkan kembali ke utas panggilan:

Callable task = ...
Future future = executor.submit(task);
try {
   future.get();
} catch (ExecutionException ex) {
   ex.getCause().printStackTrace();
}

Jika Callable.call()melempar pengecualian, ini akan dibungkus dengan ExecutionExceptiondan dibuangFuture.get() .

Ini mungkin lebih disukai daripada subklasifikasi ThreadPoolExecutor. Ini juga memberi Anda kesempatan untuk mengirimkan kembali tugas jika pengecualiannya dapat dipulihkan.

skaffman
sumber
5
> Callable.call () diizinkan untuk melempar pengecualian yang diperiksa, dan ini dapat disebarkan kembali ke utas panggilan: Perhatikan bahwa pengecualian yang dilemparkan akan merambat ke utas panggilan hanya jika future.get()atau versi kelebihannya dipanggil.
nhylated
16
Ini sempurna, tetapi apa yang harus dilakukan jika saya menjalankan tugas secara paralel dan tidak ingin memblokir eksekusi?
Grigory Kislin
44
Jangan gunakan solusi ini, karena itu merusak seluruh tujuan menggunakan ExecutorService. ExecutorService adalah mekanisme eksekusi asinkron yang mampu menjalankan tugas di latar belakang. Jika Anda memanggil future.get () setelah menjalankannya, ia akan memblokir utas panggilan sampai tugas selesai.
user1801374
2
Solusi ini seharusnya tidak dinilai tinggi. Future.get () bekerja secara serempak dan akan bertindak sebagai pemblokir sampai Runnable atau Callable telah dieksekusi dan sebagaimana dinyatakan di atas mengalahkan tujuan menggunakan Layanan Executor
Super Hans
2
Seperti yang ditunjukkan #nhylated, ini pantas BUG jdk. Jika Future.get () tidak dipanggil, setiap pengecualian tanpa panggilan dari Callable diabaikan secara diam-diam. Desain yang sangat buruk .... hanya menghabiskan 1+ hari untuk mencari tahu perpustakaan menggunakan ini dan jdk diam-diam mengabaikan pengecualian. Dan, ini masih ada di jdk12.
Ben Jiang
18

Penjelasan untuk perilaku ini tepat di javadoc untuk afterExecute :

Catatan: Ketika tindakan terlampir dalam tugas (seperti FutureTask) baik secara eksplisit atau melalui metode seperti mengirimkan, objek tugas ini menangkap dan memelihara pengecualian komputasi, dan sehingga mereka tidak menyebabkan penghentian mendadak, dan pengecualian internal tidak diteruskan ke metode ini .

Drew Wills
sumber
10

Saya mengatasinya dengan membungkus runnable yang disediakan yang diserahkan kepada pelaksana.

CompletableFuture.runAsync(() -> {
        try {
              runnable.run();
        } catch (Throwable e) {
              Log.info(Concurrency.class, "runAsync", e);
        }
}, executorService);
mmm
sumber
3
Anda dapat meningkatkan keterbacaan dengan menggunakan whenComplete()metode CompletableFuture.
Eduard Wirch
@EduardWirch ini berfungsi tetapi Anda tidak dapat membuang kembali pengecualian dari whenComplete ()
Akshat
7

Saya menggunakan VerboseRunnablekelas dari jcabi-log , yang menelan semua pengecualian dan mencatatnya. Sangat mudah, misalnya:

import com.jcabi.log.VerboseRunnable;
scheduler.scheduleWithFixedDelay(
  new VerboseRunnable(
    Runnable() {
      public void run() { 
        // the code, which may throw
      }
    },
    true // it means that all exceptions will be swallowed and logged
  ),
  1, 1, TimeUnit.MILLISECONDS
);
yegor256
sumber
3

Solusi lain adalah dengan menggunakan ManagedTask dan ManagedTaskListener .

Anda memerlukan Callable atau Runnable yang mengimplementasikan antarmuka ManagedTask .

Metode getManagedTaskListenermengembalikan contoh yang Anda inginkan.

public ManagedTaskListener getManagedTaskListener() {

Dan Anda menerapkan di ManagedTaskListener yang taskDonemetode:

@Override
public void taskDone(Future<?> future, ManagedExecutorService executor, Object task, Throwable exception) {
    if (exception != null) {
        LOGGER.log(Level.SEVERE, exception.getMessage());
    }
}

Lebih detail tentang siklus hidup dan pendengar tugas yang dikelola .

CSchulz
sumber
2

Ini bekerja

  • Ini berasal dari SingleThreadExecutor, tetapi Anda dapat mengadaptasinya dengan mudah
  • Kode 8 lamdas Java, tetapi mudah diperbaiki

Ini akan membuat Executor dengan utas tunggal, yang bisa mendapatkan banyak tugas; dan akan menunggu saat ini untuk mengakhiri eksekusi untuk memulai dengan yang berikutnya

Jika terjadi kesalahan atau pengecualian uncaugth, uncaughtExceptionHandler akan menangkapnya

kelas final publik SingleThreadExecutorWithExceptions {

    public static ExecutorService newSingleThreadExecutorWithExceptions (Utas terakhir.UncaughtExceptionHandler uncaughtExceptionHandler) {

        Pabrik ThreadFactory = (Runnable runnable) -> {
            final Thread newThread = new Thread (runnable, "SingleThreadExecutorWithExceptions");
            newThread.setUncaughtExceptionHandler ((final Thread caugthThread, final Throwable throwable) -> {
                uncaughtExceptionHandler.uncaughtException (caugthThread, throwable);
            });
            return newThread;
        };
        kembalikan FinalizableDelegatedExecutorService baru
                (ThreadPoolExecutor baru (1, 1,
                        0L, TimeUnit.MILLISECONDS,
                        LinkedBlockingQueue () baru,
                        pabrik){


                    void afterExecute yang dilindungi (Runnable runnable, Throwable throwable) {
                        super.afterExecute (runnable, throwable);
                        if (throwable == null && runnable instance of Future) {
                            coba {
                                Future future = (Future) runnable;
                                if (future.isDone ()) {
                                    future.get ();
                                }
                            } catch (PembatalanException ce) {
                                throwable = ce;
                            } catch (ExecutionException ee) {
                                throwable = ee.getCause ();
                            } catch (mis. InterruptedException) {
                                Thread.currentThread (). Interrupt (); // abaikan / atur ulang
                            }
                        }
                        if (throwable! = null) {
                            uncaughtExceptionHandler.uncaughtException (Thread.currentThread (), dapat dibuang);
                        }
                    }
                });
    }



    kelas privat statis FinalizableDelegatedExecutorService
            memperpanjang DelegatedExecutorService {
        FinalizableDelegatedExecutorService (pelaksana ExecutorService) {
            super (pelaksana);
        }
        batal diselesaikan finalisasi () {
            super.shutdown ();
        }
    }

    / **
     * Kelas pembungkus yang memaparkan hanya metode ExecutorService
     * dari implementasi ExecutorService.
     * /
    kelas statis pribadi DelegatedExecutorService memperluas AbstractExecutorService {
        Layanan ExecutorService final pribadi;
        DelegatedExecutorService (ExecutorService executor) {e = pelaksana; }
        public void execute (perintah Runnable) {e.execute (perintah); }
        public void shutdown () {e.shutdown (); }
        Daftar publik shutdownNow () {return e.shutdownNow (); }
        public boolean isShutdown () {return e.isShutdown (); }
        boolean publik isTerminated () {return e.isTerminated (); }
        boolean publik menungguTerminasi (batas waktu yang lama, unit TimeUnit)
                melempar InterruptedException {
            mengembalikan e.awaitTermination (batas waktu, unit);
        }
        publik yang akan datang (Tugas yang dapat dijalankan) {
            mengembalikan e.submit (tugas);
        }
        Public Future submit (Tugas yang dapat dipanggil) {
            mengembalikan e.submit (tugas);
        }
        Public Future submit (Tugas yang dapat dijalankan, hasil T) {
            mengembalikan e.submit (tugas, hasil);
        }
        Daftar publik> invokeAll (Koleksi> tugas)
                melempar InterruptedException {
            return e.invokeAll (tugas);
        }
        Daftar publik> invokeAll (Koleksi> tugas,
                                             batas waktu lama, unit TimeUnit)
                melempar InterruptedException {
            mengembalikan e.invokeAll (tugas, batas waktu, unit);
        }
        publik T invokeAny (Koleksi> tugas)
                melempar InterruptedException, ExecutionException {
            return e.invokeAny (tugas);
        }
        publik T invokeAny (Koleksi> tugas,
                               batas waktu lama, unit TimeUnit)
                melempar InterruptedException, ExecutionException, TimeoutException {
            return e.invokeAny (tugas, batas waktu, unit);
        }
    }



    private SingleThreadExecutorWithExceptions () {}
}
obesga_tirant
sumber
Sayangnya, menggunakan finalisasi agak tidak stabil, karena itu hanya akan disebut "nanti ketika pengumpul sampah mengumpulkannya" (atau mungkin tidak dalam kasus Thread, tidak tahu) ...
rogerdpack
1

Jika Anda ingin memantau pelaksanaan tugas, Anda dapat memutar 1 atau 2 utas (mungkin lebih tergantung pada beban) dan menggunakannya untuk mengambil tugas dari bungkus ExecutionCompletionService.

Cristian Botiza
sumber
0

Jika Anda ExecutorServiceberasal dari sumber eksternal (yaitu tidak mungkin untuk subkelas ThreadPoolExecutordan menimpanya afterExecute()), Anda dapat menggunakan proxy dinamis untuk mencapai perilaku yang diinginkan:

public static ExecutorService errorAware(final ExecutorService executor) {
    return (ExecutorService) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
            new Class[] {ExecutorService.class},
            (proxy, method, args) -> {
                if (method.getName().equals("submit")) {
                    final Object arg0 = args[0];
                    if (arg0 instanceof Runnable) {
                        args[0] = new Runnable() {
                            @Override
                            public void run() {
                                final Runnable task = (Runnable) arg0;
                                try {
                                    task.run();
                                    if (task instanceof Future<?>) {
                                        final Future<?> future = (Future<?>) task;

                                        if (future.isDone()) {
                                            try {
                                                future.get();
                                            } catch (final CancellationException ce) {
                                                // Your error-handling code here
                                                ce.printStackTrace();
                                            } catch (final ExecutionException ee) {
                                                // Your error-handling code here
                                                ee.getCause().printStackTrace();
                                            } catch (final InterruptedException ie) {
                                                Thread.currentThread().interrupt();
                                            }
                                        }
                                    }
                                } catch (final RuntimeException re) {
                                    // Your error-handling code here
                                    re.printStackTrace();
                                    throw re;
                                } catch (final Error e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                }
                            }
                        };
                    } else if (arg0 instanceof Callable<?>) {
                        args[0] = new Callable<Object>() {
                            @Override
                            public Object call() throws Exception {
                                final Callable<?> task = (Callable<?>) arg0;
                                try {
                                    return task.call();
                                } catch (final Exception e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                } catch (final Error e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                }
                            }
                        };
                    }
                }
                return method.invoke(executor, args);
            });
}
Bas
sumber
0

Ini karena AbstractExecutorService :: submitsedang membungkus Anda runnableke RunnableFuture(tidak lain kecuali FutureTask) seperti di bawah ini

AbstractExecutorService.java

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null); /////////HERE////////
    execute(ftask);
    return ftask;
}

Kemudian executeakan meneruskannya ke Workerdan Worker.run()akan memanggil di bawah ini.

ThreadPoolExecutor.java

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();           /////////HERE////////
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

Akhirnya task.run();dalam kode di atas panggilan akan dipanggil FutureTask.run(). Berikut adalah kode penangan pengecualian, karena ini Anda TIDAK mendapatkan pengecualian yang diharapkan.

class FutureTask<V> implements RunnableFuture<V>

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {   /////////HERE////////
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}
Kanagavelu Sugumar
sumber
0

Ini mirip dengan solusi mmm, tetapi sedikit lebih bisa dimengerti. Mintalah tugas Anda memperluas kelas abstrak yang membungkus metode run ().

public abstract Task implements Runnable {

    public abstract void execute();

    public void run() {
      try {
        execute();
      } catch (Throwable t) {
        // handle it  
      }
    }
}


public MySampleTask extends Task {
    public void execute() {
        // heavy, error-prone code here
    }
}
ccleve
sumber
-4

Alih-alih mensubklasifikasikan ThreadPoolExecutor, saya akan menyediakannya dengan instance ThreadFactory yang membuat Thread baru dan memberi mereka UncaughtExceptionHandler

Kevin
sumber
3
Saya mencoba ini juga, tetapi metode uncaughtException tampaknya tidak pernah dipanggil. Saya percaya ini karena thread pekerja di kelas ThreadPoolExecutor menangkap pengecualian.
Tom
5
Metode uncaughtException tidak dipanggil karena metode pengiriman ExecutorService membungkus Callable / Runnable di Masa Depan; pengecualian sedang ditangkap di sana.
Emil Duduk