Bagaimana cara menunggu sejumlah utas selesai?

109

Apa cara untuk menunggu semua proses berulir selesai? Misalnya, saya punya:

public class DoSomethingInAThread implements Runnable{

    public static void main(String[] args) {
        for (int n=0; n<1000; n++) {
            Thread t = new Thread(new DoSomethingInAThread());
            t.start();
        }
        // wait for all threads' run() methods to complete before continuing
    }

    public void run() {
        // do something here
    }


}

Bagaimana cara mengubah ini sehingga main()metode berhenti di komentar sampai semua run()metode utas keluar? Terima kasih!

DivideByHero
sumber

Jawaban:

163

Anda meletakkan semua utas dalam sebuah array, memulai semuanya, dan kemudian memiliki loop

for(i = 0; i < threads.length; i++)
  threads[i].join();

Setiap gabungan akan diblokir hingga utas masing-masing selesai. Utas mungkin selesai dalam urutan yang berbeda dari saat Anda menggabungkannya, tetapi itu bukan masalah: ketika loop keluar, semua utas selesai.

Martin v. Löwis
sumber
1
@Mykola: apa sebenarnya keuntungan menggunakan grup utas? Hanya karena API ada di sana, bukan berarti Anda harus menggunakannya ...
Martin v. Löwis
2
Lihat: "Grup utas mewakili sekumpulan utas." Ini semantik benar untuk kasus penggunaan ini! Dan: "Sebuah utas diizinkan untuk mengakses informasi tentang grup utasnya sendiri"
Martin K.
4
Buku “Java Efektif” merekomendasikan untuk menghindari grup utas (item 73).
Bastien Léonard
2
Bug yang disebutkan di Java Efektif seharusnya sudah diperbaiki di Java 6. Jika versi java yang lebih baru bukan batasan, lebih baik menggunakan Futures untuk memecahkan masalah utas. Martin v. Löwis: Anda benar. Ini tidak relevan untuk masalah itu, tetapi bagus untuk mendapatkan informasi lebih lanjut tentang thread yang berjalan dari satu Object (seperti ExecutorService). Saya pikir bagus menggunakan fitur yang diberikan untuk memecahkan masalah; mungkin Anda akan membutuhkan lebih banyak fleksibilitas (informasi utas) di masa mendatang. Juga tepat untuk menyebutkan kelas buggy lama di JDK lama.
Martin K.
5
ThreadGroup tidak mengimplementasikan gabungan tingkat grup, jadi mengapa orang-orang mendorong ThreadGroup agak membingungkan. Apakah orang benar-benar menggunakan kunci putar & menanyakan activeCount grup? Anda akan kesulitan untuk meyakinkan saya bahwa melakukannya lebih baik dengan cara apa pun jika dibandingkan dengan hanya memanggil bergabung di semua utas.
41

Salah satu cara untuk membuat Listdari Threads, membuat dan meluncurkan setiap thread, sambil menambahkan ke dalam daftar. Setelah semuanya diluncurkan, putar kembali daftar dan panggil join()masing-masing. Tidak peduli apa urutan utas selesai dieksekusi, yang perlu Anda ketahui adalah bahwa pada saat loop kedua selesai dieksekusi, setiap utas akan selesai.

Pendekatan yang lebih baik adalah dengan menggunakan ExecutorService dan metode terkaitnya:

List<Callable> callables = ... // assemble list of Callables here
                               // Like Runnable but can return a value
ExecutorService execSvc = Executors.newCachedThreadPool();
List<Future<?>> results = execSvc.invokeAll(callables);
// Note: You may not care about the return values, in which case don't
//       bother saving them

Menggunakan ExecutorService (dan semua hal baru dari utilitas konkurensi Java 5 ) sangat fleksibel, dan contoh di atas bahkan hampir tidak menyentuh permukaan.

Adam Batkin
sumber
ThreadGroup adalah caranya! Dengan Daftar yang bisa berubah Anda akan mendapat masalah (sinkronisasi)
Martin K.
3
Apa? Bagaimana Anda bisa mendapat masalah? Ini hanya bisa berubah (hanya dapat dibaca) oleh utas yang melakukan peluncuran, jadi selama itu tidak mengubah daftar saat mengulanginya, tidak apa-apa.
Adam Batkin
Itu tergantung bagaimana Anda menggunakannya. Jika Anda akan menggunakan kelas pemanggil di utas, Anda akan mengalami masalah.
Martin K.
27
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class DoSomethingInAThread implements Runnable
{
   public static void main(String[] args) throws ExecutionException, InterruptedException
   {
      //limit the number of actual threads
      int poolSize = 10;
      ExecutorService service = Executors.newFixedThreadPool(poolSize);
      List<Future<Runnable>> futures = new ArrayList<Future<Runnable>>();

      for (int n = 0; n < 1000; n++)
      {
         Future f = service.submit(new DoSomethingInAThread());
         futures.add(f);
      }

      // wait for all tasks to complete before continuing
      for (Future<Runnable> f : futures)
      {
         f.get();
      }

      //shut down the executor service so that this thread can exit
      service.shutdownNow();
   }

   public void run()
   {
      // do something here
   }
}
jt.
sumber
bekerja seperti pesona ... saya memiliki dua rangkaian utas yang seharusnya tidak berjalan secara bersamaan karena masalah pada beberapa cookie. Saya menggunakan contoh Anda untuk menjalankan rangkaian utas satu per satu .. terima kasih telah membagikan pengetahuan Anda ...
arn-arn
@Dantalian - Di kelas Runnable Anda (kemungkinan dalam metode run), Anda ingin menangkap pengecualian apa pun yang terjadi dan menyimpannya secara lokal (atau menyimpan pesan / kondisi kesalahan). Dalam contoh, f.get () mengembalikan objek yang Anda kirimkan ke ExecutorService. Objek Anda mungkin memiliki metode untuk mengambil pengecualian / kondisi kesalahan apa pun. Bergantung pada bagaimana Anda memodifikasi contoh yang diberikan, Anda mungkin perlu mentransmisikan objek yang diubah oleh f.get () ke tipe yang Anda harapkan.
jt.
12

alih-alih join(), yang merupakan API lama, Anda bisa menggunakan CountDownLatch . Saya telah mengubah kode Anda seperti di bawah ini untuk memenuhi kebutuhan Anda.

import java.util.concurrent.*;
class DoSomethingInAThread implements Runnable{
    CountDownLatch latch;
    public DoSomethingInAThread(CountDownLatch latch){
        this.latch = latch;
    } 
    public void run() {
        try{
            System.out.println("Do some thing");
            latch.countDown();
        }catch(Exception err){
            err.printStackTrace();
        }
    }
}

public class CountDownLatchDemo {
    public static void main(String[] args) {
        try{
            CountDownLatch latch = new CountDownLatch(1000);
            for (int n=0; n<1000; n++) {
                Thread t = new Thread(new DoSomethingInAThread(latch));
                t.start();
            }
            latch.await();
            System.out.println("In Main thread after completion of 1000 threads");
        }catch(Exception err){
            err.printStackTrace();
        }
    }
}

Penjelasan :

  1. CountDownLatch telah diinisialisasi dengan hitungan 1000 sesuai kebutuhan Anda.

  2. Setiap thread pekerja DoSomethingInAThread akan mengurangi CountDownLatch, yang telah diteruskan dalam konstruktor.

  3. Utas utama CountDownLatchDemo await()sampai hitungannya menjadi nol. Setelah hitungan menjadi nol, Anda akan mendapatkan baris di bawah ini dalam keluaran.

    In Main thread after completion of 1000 threads

Info lebih lanjut dari halaman dokumentasi oracle

public void await()
           throws InterruptedException

Menyebabkan utas saat ini menunggu hingga kait menghitung mundur ke nol, kecuali utas terputus.

Lihat pertanyaan SE terkait untuk opsi lain:

tunggu sampai semua thread menyelesaikan pekerjaannya di java

Ravindra babu
sumber
8

Hindari kelas Thread sama sekali dan sebagai gantinya gunakan abstraksi yang lebih tinggi yang disediakan di java.util.concurrent

Kelas ExecutorService menyediakan metode invokeAll yang tampaknya melakukan apa yang Anda inginkan.

henrik
sumber
6

Pertimbangkan untuk menggunakan java.util.concurrent.CountDownLatch. Contoh di javadocs

Pablo Cavalieri
sumber
Adalah kait untuk utas, kunci kait bekerja dengan hitungan mundur. Dalam metode run () utas Anda secara eksplisit menyatakan untuk menunggu CountDownLatch mencapai hitungan mundurnya ke 0. Anda bisa menggunakan CountDownLatch yang sama di lebih dari satu utas untuk merilisnya secara bersamaan. Saya tidak tahu apakah itu yang Anda butuhkan, hanya ingin menyebutkannya karena berguna saat bekerja di lingkungan multithread.
Pablo Cavalieri
Mungkin Anda harus meletakkan penjelasan itu di tubuh jawaban Anda?
Aaron Hall
Contoh di Javadoc sangat deskriptif, itulah mengapa saya tidak menambahkannya. docs.oracle.com/javase/7/docs/api/java/util/concurrent/… . Dalam contoh pertama, semua utas Pekerja adalah rilis secara bersamaan karena mereka menunggu CountdownLatch startSignal mencapai nol, yang terjadi di startSignal.countDown (). Kemudian, thread mian menunggu hingga semua pekerjaan selesai menggunakan instruksi doneSignal.await (). doneSignal menurunkan nilainya di setiap pekerja.
Pablo Cavalieri
6

Seperti yang disarankan Martin K java.util.concurrent.CountDownLatchtampaknya menjadi solusi yang lebih baik untuk ini. Hanya menambahkan contoh yang sama

     public class CountDownLatchDemo
{

    public static void main (String[] args)
    {
        int noOfThreads = 5;
        // Declare the count down latch based on the number of threads you need
        // to wait on
        final CountDownLatch executionCompleted = new CountDownLatch(noOfThreads);
        for (int i = 0; i < noOfThreads; i++)
        {
            new Thread()
            {

                @Override
                public void run ()
                {

                    System.out.println("I am executed by :" + Thread.currentThread().getName());
                    try
                    {
                        // Dummy sleep
                        Thread.sleep(3000);
                        // One thread has completed its job
                        executionCompleted.countDown();
                    }
                    catch (InterruptedException e)
                    {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }

            }.start();
        }

        try
        {
            // Wait till the count down latch opens.In the given case till five
            // times countDown method is invoked
            executionCompleted.await();
            System.out.println("All over");
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
    }

}
Freaky Thommi
sumber
4

Bergantung pada kebutuhan Anda, Anda mungkin juga ingin memeriksa kelas CountDownLatch dan CyclicBarrier dalam paket java.util.concurrent. Mereka dapat berguna jika Anda ingin agar thread Anda menunggu satu sama lain, atau jika Anda ingin kontrol yang lebih cermat atas cara thread Anda dieksekusi (misalnya, menunggu dalam eksekusi internalnya untuk thread lain untuk menyetel beberapa status). Anda juga dapat menggunakan CountDownLatch untuk memberi sinyal semua utas Anda untuk memulai pada waktu yang sama, daripada memulainya satu per satu saat Anda mengulang melalui loop Anda. Dokumen API standar memiliki contohnya, ditambah menggunakan CountDownLatch lain untuk menunggu semua utas menyelesaikan eksekusinya.

Jeff
sumber
1

Buat objek utas di dalam loop for pertama.

for (int i = 0; i < threads.length; i++) {
     threads[i] = new Thread(new Runnable() {
         public void run() {
             // some code to run in parallel
         }
     });
     threads[i].start();
 }

Dan kemudian apa yang dikatakan semua orang di sini.

for(i = 0; i < threads.length; i++)
  threads[i].join();
optimus0127
sumber
0

Anda dapat melakukannya dengan Object "ThreadGroup" dan parameternya activeCount :

Martin K.
sumber
Tidak yakin bagaimana tepatnya Anda mengusulkan untuk melakukannya. Jika Anda mengusulkan untuk melakukan polling activeCount dalam satu putaran: itu buruk, karena ini sibuk-tunggu (bahkan jika Anda tidur di antara polling - Anda kemudian mendapatkan tradeoff antara bisnis dan daya tanggap).
Martin v. Löwis
@Martin v. Löwis: "Bergabung hanya akan menunggu satu utas. Solusi yang lebih baik mungkin adalah java.util.concurrent.CountDownLatch. Cukup inisialisasi kait dengan jumlah yang disetel ke jumlah utas pekerja. Setiap utas pekerja harus memanggil countDown () tepat sebelum keluar, dan utas utama hanya memanggil await (), yang akan memblokir hingga penghitung mencapai nol. Masalah dengan join () adalah juga bahwa Anda tidak dapat mulai menambahkan lebih banyak utas secara dinamis. Daftar akan meledak dengan Modifikasi Bersamaan. " Solusi Anda berfungsi dengan baik untuk Masalah tetapi tidak untuk tujuan umum.
Martin K.
0

Sebagai alternatif dari CountDownLatch, Anda juga dapat menggunakan CyclicBarrier mis

public class ThreadWaitEx {
    static CyclicBarrier barrier = new CyclicBarrier(100, new Runnable(){
        public void run(){
            System.out.println("clean up job after all tasks are done.");
        }
    });
    public static void main(String[] args) {
        for (int i = 0; i < 100; i++) {
            Thread t = new Thread(new MyCallable(barrier));
            t.start();
        }       
    }

}    

class MyCallable implements Runnable{
    private CyclicBarrier b = null;
    public MyCallable(CyclicBarrier b){
        this.b = b;
    }
    @Override
    public void run(){
        try {
            //do something
            System.out.println(Thread.currentThread().getName()+" is waiting for barrier after completing his job.");
            b.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }       
}

Untuk menggunakan CyclicBarrier dalam hal ini barrier.await () harus menjadi pernyataan terakhir yaitu saat utas Anda selesai dengan tugasnya. CyclicBarrier dapat digunakan lagi dengan metode reset (). Mengutip javadocs:

CyclicBarrier mendukung perintah Runnable opsional yang dijalankan satu kali per titik penghalang, setelah utas terakhir dalam pesta tiba, tetapi sebelum utas apa pun dilepaskan. Tindakan penghalang ini berguna untuk memperbarui keadaan bersama sebelum salah satu pihak melanjutkan.

shailendra 1118
sumber
Saya rasa itu bukan contoh yang baik untuk CyclicBarrier. Mengapa Anda menggunakan panggilan Thread.sleep ()?
Guenther
@Guenther - ya, saya mengubah kode agar sesuai dengan persyaratan.
shailendra1118
CyclicBarrier bukanlah alternatif dari CountDownLatch. Ketika utas harus berulang kali menghitung mundur, Anda harus membuat CyclicBarrier, jika tidak default ke CountDownLatch (kecuali jika memerlukan abstraksi Eksekusi tambahan, di mana Anda harus melihat ke tingkat yang lebih tinggi, Layanan).
Elysiumplain
0

Itu join()tidak membantu saya. lihat contoh ini di Kotlin:

    val timeInMillis = System.currentTimeMillis()
    ThreadUtils.startNewThread(Runnable {
        for (i in 1..5) {
            val t = Thread(Runnable {
                Thread.sleep(50)
                var a = i
                kotlin.io.println(Thread.currentThread().name + "|" + "a=$a")
                Thread.sleep(200)
                for (j in 1..5) {
                    a *= j
                    Thread.sleep(100)
                    kotlin.io.println(Thread.currentThread().name + "|" + "$a*$j=$a")
                }
                kotlin.io.println(Thread.currentThread().name + "|TaskDurationInMillis = " + (System.currentTimeMillis() - timeInMillis))
            })
            t.start()
        }
    })

Hasil:

Thread-5|a=5
Thread-1|a=1
Thread-3|a=3
Thread-2|a=2
Thread-4|a=4
Thread-2|2*1=2
Thread-3|3*1=3
Thread-1|1*1=1
Thread-5|5*1=5
Thread-4|4*1=4
Thread-1|2*2=2
Thread-5|10*2=10
Thread-3|6*2=6
Thread-4|8*2=8
Thread-2|4*2=4
Thread-3|18*3=18
Thread-1|6*3=6
Thread-5|30*3=30
Thread-2|12*3=12
Thread-4|24*3=24
Thread-4|96*4=96
Thread-2|48*4=48
Thread-5|120*4=120
Thread-1|24*4=24
Thread-3|72*4=72
Thread-5|600*5=600
Thread-4|480*5=480
Thread-3|360*5=360
Thread-1|120*5=120
Thread-2|240*5=240
Thread-1|TaskDurationInMillis = 765
Thread-3|TaskDurationInMillis = 765
Thread-4|TaskDurationInMillis = 765
Thread-5|TaskDurationInMillis = 765
Thread-2|TaskDurationInMillis = 765

Sekarang izinkan saya menggunakan join()for utas:

    val timeInMillis = System.currentTimeMillis()
    ThreadUtils.startNewThread(Runnable {
        for (i in 1..5) {
            val t = Thread(Runnable {
                Thread.sleep(50)
                var a = i
                kotlin.io.println(Thread.currentThread().name + "|" + "a=$a")
                Thread.sleep(200)
                for (j in 1..5) {
                    a *= j
                    Thread.sleep(100)
                    kotlin.io.println(Thread.currentThread().name + "|" + "$a*$j=$a")
                }
                kotlin.io.println(Thread.currentThread().name + "|TaskDurationInMillis = " + (System.currentTimeMillis() - timeInMillis))
            })
            t.start()
            t.join()
        }
    })

Dan hasilnya:

Thread-1|a=1
Thread-1|1*1=1
Thread-1|2*2=2
Thread-1|6*3=6
Thread-1|24*4=24
Thread-1|120*5=120
Thread-1|TaskDurationInMillis = 815
Thread-2|a=2
Thread-2|2*1=2
Thread-2|4*2=4
Thread-2|12*3=12
Thread-2|48*4=48
Thread-2|240*5=240
Thread-2|TaskDurationInMillis = 1568
Thread-3|a=3
Thread-3|3*1=3
Thread-3|6*2=6
Thread-3|18*3=18
Thread-3|72*4=72
Thread-3|360*5=360
Thread-3|TaskDurationInMillis = 2323
Thread-4|a=4
Thread-4|4*1=4
Thread-4|8*2=8
Thread-4|24*3=24
Thread-4|96*4=96
Thread-4|480*5=480
Thread-4|TaskDurationInMillis = 3078
Thread-5|a=5
Thread-5|5*1=5
Thread-5|10*2=10
Thread-5|30*3=30
Thread-5|120*4=120
Thread-5|600*5=600
Thread-5|TaskDurationInMillis = 3833

Seperti yang jelas saat kami menggunakan join:

  1. Utas berjalan secara berurutan.
  2. Sampel pertama membutuhkan waktu 765 Milidetik sedangkan sampel kedua membutuhkan waktu 3833 Milidetik.

Solusi kami untuk mencegah pemblokiran utas lain adalah membuat ArrayList:

val threads = ArrayList<Thread>()

Sekarang ketika kita ingin memulai utas baru, kita paling menambahkannya ke ArrayList:

addThreadToArray(
    ThreadUtils.startNewThread(Runnable {
        ...
    })
)

The addThreadToArrayFungsi:

@Synchronized
fun addThreadToArray(th: Thread) {
    threads.add(th)
}

The startNewThreadfunstion:

fun startNewThread(runnable: Runnable) : Thread {
    val th = Thread(runnable)
    th.isDaemon = false
    th.priority = Thread.MAX_PRIORITY
    th.start()
    return th
}

Periksa penyelesaian utas seperti di bawah ini di mana pun itu diperlukan:

val notAliveThreads = ArrayList<Thread>()
for (t in threads)
    if (!t.isAlive)
        notAliveThreads.add(t)
threads.removeAll(notAliveThreads)
if (threads.size == 0){
    // The size is 0 -> there is no alive threads.
}
Misagh
sumber