Bagaimana cara menunggu semua utas selesai, menggunakan ExecutorService?

381

Saya perlu menjalankan sejumlah tugas 4 sekaligus, seperti ini:

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
    taskExecutor.execute(new MyTask());
}
//...wait for completion somehow

Bagaimana saya bisa diberi tahu setelah semuanya selesai? Untuk saat ini saya tidak dapat memikirkan sesuatu yang lebih baik daripada mengatur beberapa penghitung tugas global dan menguranginya di akhir setiap tugas, kemudian monitor di loop tak terbatas penghitung ini menjadi 0; atau dapatkan daftar Futures dan dalam infinite loop monitor isDone untuk mereka semua. Apa solusi yang lebih baik yang tidak melibatkan loop tak terbatas?

Terima kasih.

serg
sumber

Jawaban:

446

Pada dasarnya pada ExecutorServicepanggilan Anda shutdown()dan kemudian awaitTermination():

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
  taskExecutor.execute(new MyTask());
}
taskExecutor.shutdown();
try {
  taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
  ...
}
cletus
sumber
9
inilah yang dimaksud shutdown / awaitTermination untuk
matt b
31
Ini adalah pola yang baik jika penanganan tugas ini adalah acara satu kali. Namun, jika ini dilakukan berulang kali selama runtime yang sama, itu tidak optimal, karena Anda akan membuat dan meruntuhkan utas berulang kali setiap kali dijalankan.
sjlee
44
Saya mencari dokumentasi resmi yang Long.MAX_VALUE, TimeUnit.NANOSECONDSsetara dengan tidak memiliki batas waktu.
Sam Harwell
15
Saya tidak percaya bahwa Anda harus menggunakan shutdown untuk bergabung di semua utas saat ini (setelah menggunakan shutdown, Anda tidak dapat menggunakan pelaksana lagi). Sarankan menggunakan daftar Masa Depan sebagai gantinya ...
rogerdpack
20
@SamHarwell lihat dokumentasijava.util.concurrent paket di bawah bagian: Untuk menunggu "selamanya", Anda dapat menggunakan nilaiTimingLong.MAX_VALUE
beluchin
174

Gunakan CountDownLatch :

CountDownLatch latch = new CountDownLatch(totalNumberOfTasks);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
  taskExecutor.execute(new MyTask());
}

try {
  latch.await();
} catch (InterruptedException E) {
   // handle
}

dan dalam tugas Anda (lampirkan di coba / akhirnya)

latch.countDown();
ChssPly76
sumber
3
Tidak ada 4 tugas. Ada "sejumlah tugas" yang dilakukan 4 sekaligus.
cletus
2
Maaf, saya salah paham pertanyaannya. Ya, sejumlah tugas harus menjadi argumen untuk konstruktor
CountDownLatch
3
Saya menemukan solusi ini lebih elegan daripada yang lain, sepertinya dibuat untuk tujuan ini, dan itu sederhana dan mudah.
wvdschel
3
Bagaimana jika Anda tidak tahu jumlah tugas sebelum memulai?
cletus
11
@cletus - maka Anda tidak menggunakan CountDownLatch :-) Ingatlah, saya tidak berpendapat bahwa pendekatan ini lebih baik daripada Anda. Namun, saya menemukan bahwa dalam skenario kehidupan nyata saya tidak tahu jumlah tugas, pengaturan kolam thread yang perlu dikonfigurasi per penyebaran, dan kolam renang dapat digunakan kembali. Jadi saya biasanya menyuruh kolam ulir disuntikkan oleh Spring dan menetapkannya sebagai prototipe dan mematikannya secara manual hanya untuk menunggu ulir selesai tampaknya kurang ideal.
ChssPly76
82

ExecutorService.invokeAll() melakukannya untukmu.

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
List<Callable<?>> tasks; // your tasks
// invokeAll() returns when all tasks are complete
List<Future<?>> futures = taskExecutor.invokeAll(tasks);
sjlee
sumber
Kesulitan datang jika / ketika Anda memulai thread "4" satu per satu, sepotong demi sepotong, kemudian bergabung / biarkan selesaikan semua 4 ...
rogerdpack
@rogerdpack: Saya masih mempelajari ini dan sebagainya. Tetapi sebagai tanggapan atas apa yang Anda minta. Haruskah 4 utas sekaligus tidak menjadi bagian dari tugas batch yang dijalankan menggunakan jawaban di atas?
Mukul Goel
3
Metode ini hanya akan berfungsi jika Anda mengetahui jumlah tugas sebelumnya.
Konstantin
3
Saya pikir ketika futuresdikembalikan, tugas belum selesai. Mereka mungkin selesai di masa mendatang dan Anda akan memiliki tautan ke hasilnya. Itu sebabnya disebut a Future. Anda memiliki metode Future.get () , yang akan menunggu tugas selesai untuk mendapatkan hasil.
AlikElzin-kilaka
5
@ AlikElzin-kilaka Kutipan dari JavaDocs (ditautkan dalam jawaban): "Jalankan tugas yang diberikan, kembalikan daftar Futures yang memegang status dan hasil ketika semuanya selesai. Future.isDone () berlaku untuk setiap elemen dari daftar yang dikembalikan. "
Hulk
47

Anda dapat menggunakan Daftar Futures, juga:

List<Future> futures = new ArrayList<Future>();
// now add to it:
futures.add(executorInstance.submit(new Callable<Void>() {
  public Void call() throws IOException {
     // do something
    return null;
  }
}));

kemudian ketika Anda ingin bergabung dengan mereka semua, itu pada dasarnya setara dengan bergabung pada masing-masingnya, (dengan manfaat tambahan yang dimunculkan kembali pengecualian dari utas anak ke utama):

for(Future f: this.futures) { f.get(); }

Pada dasarnya triknya adalah dengan memanggil .get () pada masing-masing Masa Depan satu per satu, alih-alih panggilan putaran tak terbatas isDone () on (semua atau masing-masing). Jadi, Anda dijamin "melanjutkan" melalui dan melewati blok ini segera setelah utas terakhir selesai. Peringatannya adalah bahwa sejak panggilan .get () membangkitkan kembali pengecualian, jika salah satu utas mati, Anda akan mengangkat dari ini mungkin sebelum utas lainnya selesai untuk menyelesaikan [untuk menghindari ini, Anda dapat menambahkan catch ExecutionExceptionsekitar panggilan panggil ] Peringatan lainnya adalah ia membuat referensi ke semua utas jadi jika mereka memiliki variabel lokal utas mereka tidak akan dikumpulkan sampai setelah Anda melewati blok ini (meskipun Anda mungkin bisa mengatasi ini, jika itu menjadi masalah, dengan menghapus Masa depan dari ArrayList). Jika Anda ingin tahu Masa Depan mana yang "selesai dulu"https://stackoverflow.com/a/31885029/32453

rogerdpack
sumber
3
Untuk mengetahui "selesai terlebih dahulu", gunakan ExecutorCompletionService.take: stackoverflow.com/a/11872604/199364
ToolmakerSteve
34

Di Java8 Anda bisa melakukannya dengan CompletableFuture :

ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
                               .map(task -> CompletableFuture.runAsync(task, es))
                               .toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();    
es.shutdown();
AdamSkywalker
sumber
3
Ini adalah solusi yang sangat elegan.
mattvonb
ExecutorService es = Executors.newFixedThreadPool(4); List< Future<?>> futures = new ArrayList<>(); for(Runnable task : taskList) { futures.add(es.submit(task)); } for(Future<?> future : futures) { try { future.get(); }catch(Exception e){ // do logging and nothing else } }
user2862544
@AdamSkywalker perlu menungguTerminasi () diperlukan setelah es.shutdown ()?
gaurav
@gaurav saat Anda menelepon shutdown, beberapa tugas mungkin belum selesai. Jadi awaitTermination akan memblokir utas panggilan sampai semuanya selesai. Itu tergantung pada apakah Anda perlu menunggu hasil di utas ini atau tidak.
AdamSkywalker
@AdamSkywalker jawaban yang bagus. masuk akal untuk tidak memanggil awaitTermination () jika saya tidak perlu menunggu hasilnya.
gaurav
26

Hanya dua sen saya. Untuk mengatasi kebutuhan CountDownLatchuntuk mengetahui jumlah tugas sebelumnya, Anda bisa melakukannya dengan cara lama dengan menggunakan yang sederhana Semaphore.

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
int numberOfTasks=0;
Semaphore s=new Semaphore(0);
while(...) {
    taskExecutor.execute(new MyTask());
    numberOfTasks++;
}

try {
    s.aquire(numberOfTasks);
...

Dalam tugas Anda, panggil saja s.release()seperti yang Anda inginkanlatch.countDown();

stryba
sumber
Saat melihat ini, saya pertama kali bertanya-tanya apakah akan menjadi masalah jika beberapa releasepanggilan terjadi sebelum acquirepanggilan, tetapi setelah membaca dokumentasi Semaphore, saya melihat itu tidak masalah.
ToolmakerSteve
13

Agak terlambat untuk pertandingan tapi demi penyelesaian ...

Alih-alih 'menunggu' untuk menyelesaikan semua tugas, Anda dapat berpikir dalam hal prinsip Hollywood, "jangan panggil aku, aku akan memanggilmu" - ketika aku selesai. Saya pikir kode yang dihasilkan lebih elegan ...

Jambu biji menawarkan beberapa alat yang menarik untuk mencapai hal ini.

Sebuah contoh ::

Bungkus ExecutorService ke dalam ListeningExecutorService ::

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));

Kirim koleksi perlengkapan untuk eksekusi ::

for (Callable<Integer> callable : callables) {
  ListenableFuture<Integer> lf = service.submit(callable);
  // listenableFutures is a collection
  listenableFutures.add(lf)
});

Sekarang bagian penting:

ListenableFuture<List<Integer>> lf = Futures.successfulAsList(listenableFutures);

Lampirkan panggilan balik ke ListenableFuture, yang dapat Anda gunakan untuk diberi tahu ketika semua masa depan selesai ::

        Futures.addCallback(lf, new FutureCallback<List<Integer>>() {
        @Override
        public void onSuccess(List<Integer> result) {
            log.info("@@ finished processing {} elements", Iterables.size(result));
            // do something with all the results
        }

        @Override
        public void onFailure(Throwable t) {
            log.info("@@ failed because of :: {}", t);
        }
    });

Ini juga menawarkan keuntungan bahwa Anda dapat mengumpulkan semua hasil di satu tempat setelah pemrosesan selesai ...

Informasi lebih lanjut di sini

Răzvan Petruescu
sumber
2
Sangat bersih. Apakah bekerja dengan sempurna bahkan di Android. Hanya harus menggunakan runOnUiThread()di onSuccess().
DSchmidt
12

Kelas CyclicBarrier di Java 5 dan yang lebih baru dirancang untuk hal semacam ini.

Pekka Enberg
sumber
7
Keren, tidak pernah bisa mengingat nama struktur data ini. Namun, hanya cocok jika Anda tahu sebelumnya jumlah tugas yang akan di antri.
ᆼ ᆺ ᆼ
ya Anda akan berpikir Anda akan bisa mengenai penghalang dengan utas saat ini, dan semua utas anak, maka ketika Anda melewatinya Anda akan tahu bahwa
ulas
Sebenarnya itu jawaban yang salah. CyclicBarrier dirancang untuk porsi. CountDownLatch dirancang untuk acara tunggu
gstackoverflow
7

Ikuti salah satu pendekatan di bawah ini.

  1. Iterate melalui semua tugas Masa Depan , kembali dari submitaktif ExecutorServicedan periksa status dengan memblokir panggilan get()pada Futureobjek seperti yang disarankan olehKiran
  2. Gunakan invokeAll()pada ExecutorService
  3. CountDownLatch
  4. ForkJoinPool atau Executors.html # newWorkStealingPool
  5. Gunakan shutdown, awaitTermination, shutdownNowAPI ThreadPoolExecutor dalam urutan yang benar

Pertanyaan SE terkait:

Bagaimana CountDownLatch digunakan dalam Java Multithreading?

Cara mematikan java ExecutorService dengan benar

Ravindra babu
sumber
6

di sini ada dua pilihan, hanya sedikit bingung mana yang terbaik untuk pergi.

Pilihan 1:

ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
                               .map(task -> CompletableFuture.runAsync(task, es))
                               .toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();    
es.shutdown();

Pilihan 2:

ExecutorService es = Executors.newFixedThreadPool(4);
List< Future<?>> futures = new ArrayList<>();
for(Runnable task : taskList) {
    futures.add(es.submit(task));
}

for(Future<?> future : futures) {
    try {
        future.get();
    }catch(Exception e){
        // do logging and nothing else
    }
}
es.shutdown();

Di sini menempatkan future.get (); di coba tangkap adalah ide bagus bukan?

pengguna2862544
sumber
5

Anda bisa membungkus tugas Anda di runnable lain, yang akan mengirim pemberitahuan:

taskExecutor.execute(new Runnable() {
  public void run() {
    taskStartedNotification();
    new MyTask().run();
    taskFinishedNotification();
  }
});
Zed
sumber
1
Butuh waktu beberapa saat untuk melihat bagaimana ini akan menyelesaikan pertanyaan OP. Pertama, perhatikan bahwa pembungkus ini adalah untuk setiap tugas, bukan dari kode yang memulai semua tugas. Agaknya, setiap awal akan menambah penghitung, dan setiap selesai akan mengurangi penghitung itu, atau akan menambah completedpenghitung. Jadi setelah memulai semuanya, pada setiap notifikasi, dapat menentukan apakah semua tugas telah selesai. Perhatikan bahwa sangat penting untuk digunakan try/finallysehingga pemberitahuan selesai (atau pemberitahuan alternatif di catchblok) diberikan bahkan jika tugas gagal. Kalau tidak, akan menunggu selamanya.
ToolmakerSteve
3

Saya baru saja menulis contoh program yang memecahkan masalah Anda. Tidak ada implementasi singkat yang diberikan, jadi saya akan menambahkannya. Meskipun Anda dapat menggunakan executor.shutdown()dan executor.awaitTermination(), ini bukan praktik terbaik karena waktu yang diambil oleh utas yang berbeda tidak dapat diprediksi.

ExecutorService es = Executors.newCachedThreadPool();
    List<Callable<Integer>> tasks = new ArrayList<>();

    for (int j = 1; j <= 10; j++) {
        tasks.add(new Callable<Integer>() {

            @Override
            public Integer call() throws Exception {
                int sum = 0;
                System.out.println("Starting Thread "
                        + Thread.currentThread().getId());

                for (int i = 0; i < 1000000; i++) {
                    sum += i;
                }

                System.out.println("Stopping Thread "
                        + Thread.currentThread().getId());
                return sum;
            }

        });
    }

    try {
        List<Future<Integer>> futures = es.invokeAll(tasks);
        int flag = 0;

        for (Future<Integer> f : futures) {
            Integer res = f.get();
            System.out.println("Sum: " + res);
            if (!f.isDone()) 
                flag = 1;
        }

        if (flag == 0)
            System.out.println("SUCCESS");
        else
            System.out.println("FAILED");

    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
Kiran
sumber
Adalah baik bahwa Anda menunjukkan penggunaan future.get - alternatif yang baik untuk diketahui. Tetapi mengapa Anda menganggap lebih baik menunggu selamanya , daripada menetapkan batas waktu maksimum yang dapat diterima? Lebih penting lagi, tidak ada alasan untuk melakukan semua logika ini, ketika seseorang dapat memberikan waktu yang sangat, sangat lama untuk menunggu, jika Anda ingin menunggu (pada dasarnya selamanya) sampai semua tugas selesai.
ToolmakerSteve
Ini tidak berbeda dengan solusi yang sudah disajikan di sini. Solusi Anda yang adil sama dengan yang disajikan oleh @sjlee
pulp_fiction
Tidak yakin mengapa Anda perlu memeriksa untuk dilakukan ketika menurut oracle doc, memohonSemua akan kembali hanya "ketika semua selesai atau batas waktu berakhir, mana yang terjadi terlebih dahulu"
Mashrur
3

Hanya untuk memberikan lebih banyak alternatif di sini berbeda dengan menggunakan kait / penghalang. Anda juga bisa mendapatkan hasil parsial sampai semuanya selesai menggunakan CompletionService .

Dari Java Concurrency dalam praktiknya: "Jika Anda memiliki banyak perhitungan untuk diserahkan kepada Pelaksana dan Anda ingin mengambil hasilnya saat tersedia, Anda dapat mempertahankan Masa Depan yang terkait dengan setiap tugas dan berulang kali memilih penyelesaian dengan menelepon memanggil dengan batas waktu nol. Ini mungkin, tetapi membosankan . Untungnya ada cara yang lebih baik : layanan penyelesaian. "

Di sini implementasinya

public class TaskSubmiter {
    private final ExecutorService executor;
    TaskSubmiter(ExecutorService executor) { this.executor = executor; }
    void doSomethingLarge(AnySourceClass source) {
        final List<InterestedResult> info = doPartialAsyncProcess(source);
        CompletionService<PartialResult> completionService = new ExecutorCompletionService<PartialResult>(executor);
        for (final InterestedResult interestedResultItem : info)
            completionService.submit(new Callable<PartialResult>() {
                public PartialResult call() {
                    return InterestedResult.doAnOperationToGetPartialResult();
                }
        });

    try {
        for (int t = 0, n = info.size(); t < n; t++) {
            Future<PartialResult> f = completionService.take();
            PartialResult PartialResult = f.get();
            processThisSegment(PartialResult);
            }
        } 
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } 
        catch (ExecutionException e) {
            throw somethinghrowable(e.getCause());
        }
    }
}
Alberto Gurrion
sumber
3

Ini adalah solusi saya, berbasis di tip "AdamSkywalker", dan berhasil

package frss.main;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestHilos {

    void procesar() {
        ExecutorService es = Executors.newFixedThreadPool(4);
        List<Runnable> tasks = getTasks();
        CompletableFuture<?>[] futures = tasks.stream().map(task -> CompletableFuture.runAsync(task, es)).toArray(CompletableFuture[]::new);
        CompletableFuture.allOf(futures).join();
        es.shutdown();

        System.out.println("FIN DEL PROCESO DE HILOS");
    }

    private List<Runnable> getTasks() {
        List<Runnable> tasks = new ArrayList<Runnable>();

        Hilo01 task1 = new Hilo01();
        tasks.add(task1);

        Hilo02 task2 = new Hilo02();
        tasks.add(task2);
        return tasks;
    }

    private class Hilo01 extends Thread {

        @Override
        public void run() {
            System.out.println("HILO 1");
        }

    }

    private class Hilo02 extends Thread {

        @Override
        public void run() {
            try {
                sleep(2000);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("HILO 2");
        }

    }


    public static void main(String[] args) {
        TestHilos test = new TestHilos();
        test.procesar();
    }
}
frss-soft.com
sumber
2

Anda bisa menggunakan kode ini:

public class MyTask implements Runnable {

    private CountDownLatch countDownLatch;

    public MyTask(CountDownLatch countDownLatch {
         this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
         try {
             //Do somethings
             //
             this.countDownLatch.countDown();//important
         } catch (InterruptedException ex) {
              Thread.currentThread().interrupt();
         }
     }
}

CountDownLatch countDownLatch = new CountDownLatch(NUMBER_OF_TASKS);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
for (int i = 0; i < NUMBER_OF_TASKS; i++){
     taskExecutor.execute(new MyTask(countDownLatch));
}
countDownLatch.await();
System.out.println("Finish tasks");
Tuan Pham
sumber
2

Saya membuat contoh kerja berikut. Idenya adalah memiliki cara untuk memproses kumpulan tugas (saya menggunakan antrian sebagai contoh) dengan banyak Thread (ditentukan secara terprogram oleh numberOfTasks / threshold), dan tunggu sampai semua Thread selesai untuk melanjutkan dengan beberapa pemrosesan lainnya.

import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/** Testing CountDownLatch and ExecutorService to manage scenario where
 * multiple Threads work together to complete tasks from a single
 * resource provider, so the processing can be faster. */
public class ThreadCountDown {

private CountDownLatch threadsCountdown = null;
private static Queue<Integer> tasks = new PriorityQueue<>();

public static void main(String[] args) {
    // Create a queue with "Tasks"
    int numberOfTasks = 2000;
    while(numberOfTasks-- > 0) {
        tasks.add(numberOfTasks);
    }

    // Initiate Processing of Tasks
    ThreadCountDown main = new ThreadCountDown();
    main.process(tasks);
}

/* Receiving the Tasks to process, and creating multiple Threads
* to process in parallel. */
private void process(Queue<Integer> tasks) {
    int numberOfThreads = getNumberOfThreadsRequired(tasks.size());
    threadsCountdown = new CountDownLatch(numberOfThreads);
    ExecutorService threadExecutor = Executors.newFixedThreadPool(numberOfThreads);

    //Initialize each Thread
    while(numberOfThreads-- > 0) {
        System.out.println("Initializing Thread: "+numberOfThreads);
        threadExecutor.execute(new MyThread("Thread "+numberOfThreads));
    }

    try {
        //Shutdown the Executor, so it cannot receive more Threads.
        threadExecutor.shutdown();
        threadsCountdown.await();
        System.out.println("ALL THREADS COMPLETED!");
        //continue With Some Other Process Here
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
}

/* Determine the number of Threads to create */
private int getNumberOfThreadsRequired(int size) {
    int threshold = 100;
    int threads = size / threshold;
    if( size > (threads*threshold) ){
        threads++;
    }
    return threads;
}

/* Task Provider. All Threads will get their task from here */
private synchronized static Integer getTask(){
    return tasks.poll();
}

/* The Threads will get Tasks and process them, while still available.
* When no more tasks available, the thread will complete and reduce the threadsCountdown */
private class MyThread implements Runnable {

    private String threadName;

    protected MyThread(String threadName) {
        super();
        this.threadName = threadName;
    }

    @Override
    public void run() {
        Integer task;
        try{
            //Check in the Task pool if anything pending to process
            while( (task = getTask()) != null ){
                processTask(task);
            }
        }catch (Exception ex){
            ex.printStackTrace();
        }finally {
            /*Reduce count when no more tasks to process. Eventually all
            Threads will end-up here, reducing the count to 0, allowing
            the flow to continue after threadsCountdown.await(); */
            threadsCountdown.countDown();
        }
    }

    private void processTask(Integer task){
        try{
            System.out.println(this.threadName+" is Working on Task: "+ task);
        }catch (Exception ex){
            ex.printStackTrace();
        }
    }
}
}

Semoga ini bisa membantu!

Fernando Gil
sumber
1

Anda dapat menggunakan subclass Anda sendiri dari ExecutorCompletionService untuk membungkus taskExecutor, dan implementasi Anda sendiri BlockingQueue untuk mendapatkan informasi ketika setiap tugas menyelesaikan dan melakukan panggilan balik atau tindakan apa pun yang Anda inginkan ketika jumlah tugas yang diselesaikan mencapai tujuan yang Anda inginkan.

Alex Martelli
sumber
1

Anda harus menggunakan executorService.shutdown()dan executorService.awaitTerminationmetode.

Contohnya sebagai berikut:

public class ScheduledThreadPoolExample {

    public static void main(String[] args) throws InterruptedException {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
        executorService.scheduleAtFixedRate(() -> System.out.println("process task."),
                0, 1, TimeUnit.SECONDS);

        TimeUnit.SECONDS.sleep(10);
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);
    }

}
Rollen Holt
sumber
sedang menungguTerminasi () diperlukan setelah shutdown () /
gaurav
1

Jadi saya posting jawaban saya dari pertanyaan terkait di sini, memetikan seseorang ingin cara yang lebih sederhana untuk melakukan ini

ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture[] futures = new CompletableFuture[10];
int i = 0;
while (...) {
    futures[i++] =  CompletableFuture.runAsync(runner, executor);
}

CompletableFuture.allOf(futures).join(); // THis will wait until all future ready.
Mạnh Quyết Nguyễn
sumber
0

Java 8 - Kita bisa menggunakan stream API untuk memproses stream. Silakan lihat cuplikan di bawah ini

final List<Runnable> tasks = ...; //or any other functional interface
tasks.stream().parallel().forEach(Runnable::run) // Uses default pool

//alternatively to specify parallelism 
new ForkJoinPool(15).submit(
          () -> tasks.stream().parallel().forEach(Runnable::run) 
    ).get();
Vlad
sumber
2
Hai Vlad, selamat datang di StackOverflow. Bisakah Anda mengedit jawaban Anda untuk menjelaskan bagaimana ini menjawab pertanyaan, dan apa kodenya? Kode hanya jawaban tidak disarankan di sini. Terima kasih!
Tim Malone
Posting ini berbicara tentang konkurensi. Parallelism! = Concurrency
GabrielBB
0

ExecutorService WORKER_THREAD_POOL 
  = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
    WORKER_THREAD_POOL.submit(() -> {
        try {
            // doSomething();
            latch.countDown();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    });
}

// wait for the latch to be decremented by the two remaining threads
latch.await();

Jika doSomething()membuang beberapa pengecualian lain, latch.countDown()sepertinya tidak akan mengeksekusi, jadi apa yang harus saya lakukan?

Pengfei Zhan
sumber
0

jika Anda menggunakan lebih banyak utas, ExecutionServices SEQUENTIALLY dan ingin menunggu SETIAP EXECUTIONSERVICE selesai. Cara terbaik adalah seperti di bawah ini;

ExecutorService executer1 = Executors.newFixedThreadPool(THREAD_SIZE1);
for (<loop>) {
   executer1.execute(new Runnable() {
            @Override
            public void run() {
                ...
            }
        });
} 
executer1.shutdown();

try{
   executer1.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);

   ExecutorService executer2 = Executors.newFixedThreadPool(THREAD_SIZE2);
   for (true) {
      executer2.execute(new Runnable() {
            @Override
            public void run() {
                 ...
            }
        });
   } 
   executer2.shutdown();
} catch (Exception e){
 ...
}
X
sumber
-1

Ini mungkin bisa membantu

Log.i(LOG_TAG, "shutting down executor...");
executor.shutdown();
while (true) {
                try {
                    Log.i(LOG_TAG, "Waiting for executor to terminate...");
                    if (executor.isTerminated())
                        break;
                    if (executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
                        break;
                    }
                } catch (InterruptedException ignored) {}
            }
Amol Desai
sumber
-1

Anda bisa memanggil waitTillDone () di kelas Runner ini :

Runner runner = Runner.runner(4); // create pool with 4 threads in thread pool

while(...) {
    runner.run(new MyTask()); // here you submit your task
}


runner.waitTillDone(); // and this blocks until all tasks are finished (or failed)


runner.shutdown(); // once you done you can shutdown the runner

Anda dapat menggunakan kembali kelas ini dan memanggil waitTillDone () sebanyak yang Anda inginkan sebelum memanggil shutdown (), ditambah kode Anda sangat sederhana . Juga Anda tidak harus tahu dengan jumlah tugas dimuka.

Untuk menggunakannya cukup tambahkan compile 'com.github.matejtymes:javafixes:1.3.1'dependensi gradle / maven ini ke proyek Anda.

Lebih detail dapat ditemukan di sini:

https://github.com/MatejTymes/JavaFixes

Matej Tymes
sumber
-2

Ada metode dalam pelaksana getActiveCount()- yang memberikan jumlah utas aktif.

Setelah merentangkan utas, kita dapat memeriksa apakah activeCount()nilainya 0. Setelah nilainya nol, artinya tidak ada utas aktif yang sedang berjalan yang berarti tugas selesai:

while (true) {
    if (executor.getActiveCount() == 0) {
    //ur own piece of code
    break;
    }
}
pengguna
sumber
3
Bukan ide yang bagus, lihat stackoverflow.com/a/7271685/1166992 dan javadoc: "Mengembalikan perkiraan jumlah utas yang secara aktif menjalankan tugas."
Olivier Faucheux