Java executors: bagaimana cara diberitahu, tanpa memblokir, ketika tugas selesai?

154

Katakanlah saya memiliki antrian penuh tugas yang harus saya serahkan ke layanan pelaksana. Saya ingin mereka diproses satu per satu. Cara paling sederhana yang bisa saya pikirkan adalah:

  1. Ambil tugas dari antrian
  2. Kirimkan ke pelaksana
  3. Panggil. Dapatkan di Masa Depan yang dikembalikan dan blokir hingga hasilnya tersedia
  4. Ambil tugas lain dari antrian ...

Namun, saya berusaha menghindari pemblokiran sepenuhnya. Jika saya memiliki 10.000 antrian seperti itu, yang membutuhkan tugas mereka diproses satu per satu, saya akan kehabisan ruang stack karena kebanyakan dari mereka akan berpegangan pada utas yang diblokir.

Yang saya inginkan adalah mengirimkan tugas dan memberikan panggilan balik yang dipanggil saat tugas selesai. Saya akan menggunakan notifikasi panggil itu sebagai bendera untuk mengirim tugas selanjutnya. (functionaljava dan jetlang tampaknya menggunakan algoritma non-blocking seperti itu, tetapi saya tidak dapat memahami kode mereka)

Bagaimana saya bisa melakukan itu menggunakan java.util.concurrent JDK, singkat menulis layanan pelaksana saya sendiri?

(antrian yang memberi saya tugas-tugas ini mungkin sendiri diblokir, tapi itu masalah yang harus ditangani kemudian)

Shahbaz
sumber

Jawaban:

146

Tetapkan antarmuka panggilan balik untuk menerima parameter apa pun yang ingin Anda sampaikan dalam pemberitahuan penyelesaian. Kemudian panggil itu di akhir tugas.

Anda bahkan bisa menulis pembungkus umum untuk tugas-tugas Runnable, dan mengirimkannya ke ExecutorService. Atau, lihat di bawah untuk mekanisme yang dibangun ke Java 8.

class CallbackTask implements Runnable {

  private final Runnable task;

  private final Callback callback;

  CallbackTask(Runnable task, Callback callback) {
    this.task = task;
    this.callback = callback;
  }

  public void run() {
    task.run();
    callback.complete();
  }

}

Dengan CompletableFuture, Java 8 termasuk cara yang lebih rumit untuk menyusun jaringan pipa di mana proses dapat diselesaikan secara tidak sinkron dan kondisional. Berikut adalah contoh notifikasi yang dibuat tetapi lengkap.

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

public class GetTaskNotificationWithoutBlocking {

  public static void main(String... argv) throws Exception {
    ExampleService svc = new ExampleService();
    GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking();
    CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work);
    f.thenAccept(listener::notify);
    System.out.println("Exiting main()");
  }

  void notify(String msg) {
    System.out.println("Received message: " + msg);
  }

}

class ExampleService {

  String work() {
    sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */
    char[] str = new char[5];
    ThreadLocalRandom current = ThreadLocalRandom.current();
    for (int idx = 0; idx < str.length; ++idx)
      str[idx] = (char) ('A' + current.nextInt(26));
    String msg = new String(str);
    System.out.println("Generated message: " + msg);
    return msg;
  }

  public static void sleep(long average, TimeUnit unit) {
    String name = Thread.currentThread().getName();
    long timeout = Math.min(exponential(average), Math.multiplyExact(10, average));
    System.out.printf("%s sleeping %d %s...%n", name, timeout, unit);
    try {
      unit.sleep(timeout);
      System.out.println(name + " awoke.");
    } catch (InterruptedException abort) {
      Thread.currentThread().interrupt();
      System.out.println(name + " interrupted.");
    }
  }

  public static long exponential(long avg) {
    return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble()));
  }

}
erickson
sumber
1
Tiga jawaban dalam sekejap mata! Saya suka CallbackTask, solusi yang sederhana dan lurus ke depan. Terlihat jelas dalam retrospeksi. Terima kasih. Mengenai komentar orang lain tentang SingleThreadedExecutor: Saya mungkin memiliki ribuan antrian yang mungkin memiliki ribuan tugas. Masing-masing dari mereka perlu memproses tugas mereka satu per satu, tetapi antrian yang berbeda dapat beroperasi secara paralel. Itu sebabnya saya menggunakan threadpool global tunggal. Saya baru mengenal pelaksana, jadi tolong beri tahu saya jika saya salah.
Shahbaz
5
Pola yang baik, namun saya akan menggunakan API masa depan yang dapat didengar Guava yang menyediakan implementasi yang sangat baik.
Pierre-Henri
tidakkah ini mengalahkan tujuan menggunakan Masa Depan?
takecare
2
@ Zelphir Itu adalah Callbackantarmuka yang Anda nyatakan; bukan dari perpustakaan. Saat ini saya mungkin hanya menggunakan Runnable,, Consumeratau BiConsumer, tergantung pada apa yang saya perlukan untuk beralih dari tugas ke pendengar.
erickson
1
@Bhargav Ini adalah khas dari callback — entitas eksternal "panggilan balik" ke entitas pengendali. Apakah Anda ingin utas yang membuat tugas diblokir hingga tugas selesai? Lalu tujuan apa yang ada dalam menjalankan tugas di utas kedua? Jika Anda membiarkan utas untuk melanjutkan, itu perlu berulang kali memeriksa beberapa status bersama (mungkin dalam satu lingkaran, tetapi tergantung pada program Anda) sampai pemberitahuan pembaruan (bendera boolean, item baru dalam antrian, dll.) Dibuat oleh true panggilan balik seperti yang dijelaskan dalam jawaban ini. Kemudian dapat melakukan beberapa pekerjaan tambahan.
erickson
52

Di Java 8 Anda bisa menggunakan CompletableFuture . Berikut adalah contoh yang saya miliki dalam kode saya di mana saya menggunakannya untuk menjemput pengguna dari layanan pengguna saya, memetakannya ke objek tampilan saya dan kemudian memperbarui tampilan saya atau menampilkan dialog kesalahan (ini adalah aplikasi GUI):

    CompletableFuture.supplyAsync(
            userService::listUsers
    ).thenApply(
            this::mapUsersToUserViews
    ).thenAccept(
            this::updateView
    ).exceptionally(
            throwable -> { showErrorDialogFor(throwable); return null; }
    );

Ini dijalankan secara tidak sinkron. Saya menggunakan dua metode pribadi: mapUsersToUserViewsdan updateView.

Mat
sumber
Bagaimana cara menggunakan CompletableFuture dengan pelaksana? (untuk membatasi jumlah instance paralel / paralel) Apakah ini akan menjadi petunjuk: cf: mengirimkan-futuretasks-kepada-pelaksana-mengapa-tidak-bekerja- ?
user1767316
47

Gunakan API masa depan yang dapat didengar Guava dan tambahkan callback. Lih dari situs web:

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
  public Explosion call() {
    return pushBigRedButton();
  }
});
Futures.addCallback(explosion, new FutureCallback<Explosion>() {
  // we want this handler to run immediately after we push the big red button!
  public void onSuccess(Explosion explosion) {
    walkAwayFrom(explosion);
  }
  public void onFailure(Throwable thrown) {
    battleArchNemesis(); // escaped the explosion!
  }
});
Pierre-Henri
sumber
hai, tetapi jika saya ingin berhenti setelah onSuccess Thread ini bagaimana saya bisa melakukannya?
DevOps85
24

Anda bisa memperluas FutureTaskkelas, dan mengganti done()metode, lalu menambahkan FutureTaskobjek ke ExecutorService, sehingga done()metode akan memanggil ketika FutureTaskselesai dengan segera.

Auguste
sumber
then add the FutureTask object to the ExecutorService, bisakah Anda memberi tahu saya cara melakukan ini?
Gary Gauh
@GaryGauh melihat ini untuk info lebih lanjut yang dapat Anda perpanjang FutureTask, kami dapat menyebutnya MyFutureTask. Kemudian gunakan ExcutorService untuk mengirimkan MyFutureTask, maka metode menjalankan MyFutureTask akan berjalan, ketika MyFutureTask selesai metode yang Anda lakukan akan dipanggil. Berikut adalah sesuatu yang membingungkan adalah FutureTask dua, dan pada kenyataannya MyFutureTask adalah Runnable yang normal.
Chaojun Zhong
15

ThreadPoolExecutorjuga memiliki beforeExecutedan afterExecutemengaitkan metode yang dapat Anda timpa dan manfaatkan. Berikut adalah deskripsi dari ThreadPoolExecutor's Javadocs .

Metode kait

Kelas ini menyediakan metode yang tidak dapat ditimpa beforeExecute(java.lang.Thread, java.lang.Runnable)dan afterExecute(java.lang.Runnable, java.lang.Throwable)metode yang dipanggil sebelum dan sesudah eksekusi setiap tugas. Ini dapat digunakan untuk memanipulasi lingkungan eksekusi; misalnya, menginisialisasi ulang ThreadLocals, mengumpulkan statistik, atau menambahkan entri log. Selain itu, metode terminated()dapat diganti untuk melakukan pemrosesan khusus yang perlu dilakukan setelah Executorsepenuhnya dihentikan. Jika metode kait atau panggilan balik mengeluarkan pengecualian, utas pekerja internal dapat pada gilirannya gagal dan tiba-tiba berakhir.

Cem Catikkas
sumber
6

Gunakan a CountDownLatch.

Ini dari java.util.concurrentdan ini persis cara untuk menunggu beberapa utas untuk menyelesaikan eksekusi sebelum melanjutkan.

Untuk mencapai efek panggilan balik yang Anda cari, itu memang membutuhkan sedikit kerja ekstra. Yaitu, menangani ini sendiri di utas terpisah yang menggunakan CountDownLatchdan tidak menunggu di atasnya, kemudian lanjutkan memberitahukan apa pun yang perlu Anda beri tahu. Tidak ada dukungan asli untuk panggilan balik, atau apa pun yang mirip dengan efek itu.


EDIT: sekarang saya semakin memahami pertanyaan Anda, saya pikir Anda menjangkau terlalu jauh, tidak perlu. Jika Anda mengambil rutin SingleThreadExecutor, berikan semua tugas, dan itu akan melakukan antrian secara asli.

Yuval Adam
sumber
Menggunakan SingleThreadExecutor apa cara terbaik untuk mengetahui bahwa semua utas telah selesai? Saya melihat sebuah contoh yang menggunakan sementara waktu! Eksekutor. Dikuatkan tetapi ini tampaknya tidak terlalu elegan. Saya menerapkan fitur panggilan balik untuk setiap pekerja dan menambah hitungan yang berfungsi.
Beruang
5

Jika Anda ingin memastikan bahwa tidak ada tugas yang akan berjalan pada saat yang sama maka gunakan SingleThreadedExecutor . Tugas-tugas akan diproses sesuai urutan yang diajukan. Anda bahkan tidak perlu memegang tugas, cukup kirimkan ke eksekutif.

basszero
sumber
2

Kode sederhana untuk menerapkan Callbackmekanisme menggunakanExecutorService

import java.util.concurrent.*;
import java.util.*;

public class CallBackDemo{
    public CallBackDemo(){
        System.out.println("creating service");
        ExecutorService service = Executors.newFixedThreadPool(5);

        try{
            for ( int i=0; i<5; i++){
                Callback callback = new Callback(i+1);
                MyCallable myCallable = new MyCallable((long)i+1,callback);
                Future<Long> future = service.submit(myCallable);
                //System.out.println("future status:"+future.get()+":"+future.isDone());
            }
        }catch(Exception err){
            err.printStackTrace();
        }
        service.shutdown();
    }
    public static void main(String args[]){
        CallBackDemo demo = new CallBackDemo();
    }
}
class MyCallable implements Callable<Long>{
    Long id = 0L;
    Callback callback;
    public MyCallable(Long val,Callback obj){
        this.id = val;
        this.callback = obj;
    }
    public Long call(){
        //Add your business logic
        System.out.println("Callable:"+id+":"+Thread.currentThread().getName());
        callback.callbackMethod();
        return id;
    }
}
class Callback {
    private int i;
    public Callback(int i){
        this.i = i;
    }
    public void callbackMethod(){
        System.out.println("Call back:"+i);
        // Add your business logic
    }
}

keluaran:

creating service
Callable:1:pool-1-thread-1
Call back:1
Callable:3:pool-1-thread-3
Callable:2:pool-1-thread-2
Call back:2
Callable:5:pool-1-thread-5
Call back:5
Call back:3
Callable:4:pool-1-thread-4
Call back:4

Catatan kunci:

  1. Jika Anda ingin memproses tugas secara berurutan dalam urutan FIFO, ganti newFixedThreadPool(5) dengannewFixedThreadPool(1)
  2. Jika Anda ingin memproses tugas berikutnya setelah menganalisis hasil dari callbacktugas sebelumnya, hapus komentar di bawah ini

    //System.out.println("future status:"+future.get()+":"+future.isDone());
  3. Anda dapat mengganti newFixedThreadPool()dengan salah satu dari

    Executors.newCachedThreadPool()
    Executors.newWorkStealingPool()
    ThreadPoolExecutor

    tergantung pada kasus penggunaan Anda.

  4. Jika Anda ingin menangani metode panggilan balik secara tidak sinkron

    Sebuah. Berikan ExecutorService or ThreadPoolExecutortugas yang dibagikan kepada Callable

    b. Ubah Callablemetode Anda menjadi Callable/Runnabletugas

    c. Dorong tugas panggilan balik ke ExecutorService or ThreadPoolExecutor

Ravindra babu
sumber
1

Sekadar menambah jawaban Matt, yang membantu, berikut adalah contoh yang lebih lengkap untuk menunjukkan penggunaan panggilan balik.

private static Primes primes = new Primes();

public static void main(String[] args) throws InterruptedException {
    getPrimeAsync((p) ->
        System.out.println("onPrimeListener; p=" + p));

    System.out.println("Adios mi amigito");
}
public interface OnPrimeListener {
    void onPrime(int prime);
}
public static void getPrimeAsync(OnPrimeListener listener) {
    CompletableFuture.supplyAsync(primes::getNextPrime)
        .thenApply((prime) -> {
            System.out.println("getPrimeAsync(); prime=" + prime);
            if (listener != null) {
                listener.onPrime(prime);
            }
            return prime;
        });
}

Outputnya adalah:

    getPrimeAsync(); prime=241
    onPrimeListener; p=241
    Adios mi amigito
Old Jack
sumber
1

Anda dapat menggunakan implementasi Callable sedemikian rupa sehingga

public class MyAsyncCallable<V> implements Callable<V> {

    CallbackInterface ci;

    public MyAsyncCallable(CallbackInterface ci) {
        this.ci = ci;
    }

    public V call() throws Exception {

        System.out.println("Call of MyCallable invoked");
        System.out.println("Result = " + this.ci.doSomething(10, 20));
        return (V) "Good job";
    }
}

di mana CallbackInterface adalah sesuatu yang sangat mendasar

public interface CallbackInterface {
    public int doSomething(int a, int b);
}

dan sekarang kelas utama akan terlihat seperti ini

ExecutorService ex = Executors.newFixedThreadPool(2);

MyAsyncCallable<String> mac = new MyAsyncCallable<String>((a, b) -> a + b);
ex.submit(mac);
Deepika Anand
sumber
1

Ini adalah ekstensi untuk jawaban Pache menggunakan Guava ListenableFuture.

Secara khusus, Futures.transform()pengembalian ListenableFuturejadi dapat digunakan untuk melakukan panggilan async. Futures.addCallback()kembali void, jadi tidak dapat digunakan untuk rantai, tetapi bagus untuk menangani keberhasilan / kegagalan pada penyelesaian async.

// ListenableFuture1: Open Database
ListenableFuture<Database> database = service.submit(() -> openDatabase());

// ListenableFuture2: Query Database for Cursor rows
ListenableFuture<Cursor> cursor =
    Futures.transform(database, database -> database.query(table, ...));

// ListenableFuture3: Convert Cursor rows to List<Foo>
ListenableFuture<List<Foo>> fooList =
    Futures.transform(cursor, cursor -> cursorToFooList(cursor));

// Final Callback: Handle the success/errors when final future completes
Futures.addCallback(fooList, new FutureCallback<List<Foo>>() {
  public void onSuccess(List<Foo> foos) {
    doSomethingWith(foos);
  }
  public void onFailure(Throwable thrown) {
    log.error(thrown);
  }
});

CATATAN: Selain merantai tugas async, Anda Futures.transform()juga dapat menjadwalkan setiap tugas pada pelaksana yang terpisah (Tidak ditampilkan dalam contoh ini).

bcorso
sumber
Ini sepertinya cukup bagus.
kaiser