Saya mengalami masalah di mana jika saya mencoba untuk mengubah ukuran ukuran ThreadPoolExecutor
inti kolam ke nomor yang berbeda setelah kolam telah dibuat, maka sebentar-sebentar, beberapa tugas ditolak dengan RejectedExecutionException
meskipun saya tidak pernah mengirimkan lebih dari queueSize + maxPoolSize
jumlah tugas.
Masalah yang saya coba pecahkan adalah untuk memperluas ThreadPoolExecutor
yang mengubah ukuran utas inti berdasarkan eksekusi yang tertunda dalam antrian kumpulan thread. Saya memerlukan ini karena secara default a ThreadPoolExecutor
akan membuat yang baru Thread
hanya jika antrian penuh.
Berikut ini adalah program Java 8 mandiri kecil mandiri yang menunjukkan masalah.
import static java.lang.Math.max;
import static java.lang.Math.min;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolResizeTest {
public static void main(String[] args) throws Exception {
// increase the number of iterations if unable to reproduce
// for me 100 iterations have been enough
int numberOfExecutions = 100;
for (int i = 1; i <= numberOfExecutions; i++) {
executeOnce();
}
}
private static void executeOnce() throws Exception {
int minThreads = 1;
int maxThreads = 5;
int queueCapacity = 10;
ThreadPoolExecutor pool = new ThreadPoolExecutor(
minThreads, maxThreads,
0, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(queueCapacity),
new ThreadPoolExecutor.AbortPolicy()
);
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(() -> resizeThreadPool(pool, minThreads, maxThreads),
0, 10, TimeUnit.MILLISECONDS);
CompletableFuture<Void> taskBlocker = new CompletableFuture<>();
try {
int totalTasksToSubmit = queueCapacity + maxThreads;
for (int i = 1; i <= totalTasksToSubmit; i++) {
// following line sometimes throws a RejectedExecutionException
pool.submit(() -> {
// block the thread and prevent it from completing the task
taskBlocker.join();
});
// Thread.sleep(10); //enabling even a small sleep makes the problem go away
}
} finally {
taskBlocker.complete(null);
scheduler.shutdown();
pool.shutdown();
}
}
/**
* Resize the thread pool if the number of pending tasks are non-zero.
*/
private static void resizeThreadPool(ThreadPoolExecutor pool, int minThreads, int maxThreads) {
int pendingExecutions = pool.getQueue().size();
int approximateRunningExecutions = pool.getActiveCount();
/*
* New core thread count should be the sum of pending and currently executing tasks
* with an upper bound of maxThreads and a lower bound of minThreads.
*/
int newThreadCount = min(maxThreads, max(minThreads, pendingExecutions + approximateRunningExecutions));
pool.setCorePoolSize(newThreadCount);
pool.prestartAllCoreThreads();
}
}
Mengapa kumpulan ini harus pernah melempar RejectedExecutionException jika saya tidak pernah mengirimkan lebih dari queueCapacity + maxThreads. Saya tidak pernah mengubah utas maksimal, jadi menurut definisi ThreadPoolExecutor, ia harus mengakomodasi tugas dalam Utas atau ke antrian.
Tentu saja, jika saya tidak pernah mengubah ukuran pool, maka pool thread tidak pernah menolak pengiriman. Ini juga sulit untuk di-debug karena menambahkan segala jenis keterlambatan dalam pengiriman membuat masalah hilang.
Adakah petunjuk tentang cara memperbaiki RejectedExecutionException?
sumber
ExecutorService
dengan membungkus yang sudah ada, yang mengirimkan kembali tugas-tugas yang gagal diajukan karena pengubahan ukuran?ThreadPoolExecutor
mungkin adalah ide yang buruk, dan tidakkah Anda perlu mengubah kode yang ada dalam kasus ini juga? Sebaiknya Anda memberikan beberapa contoh bagaimana kode aktual Anda mengakses pelaksana. Saya akan terkejut jika menggunakan banyak metode khususThreadPoolExecutor
(yaitu tidak diExecutorService
).Jawaban:
Berikut ini adalah skenario mengapa ini terjadi:
Dalam contoh saya, saya menggunakan minThreads = 0, maxThreads = 2 dan queueCapacity = 2 untuk membuatnya lebih pendek. Perintah pertama dikirimkan, ini dilakukan dalam metode eksekusi:
untuk perintah ini workQueue.offer (perintah) daripada addWorker (null, false) dieksekusi. Utas pekerja pertama-tama mengeluarkan perintah ini dari antrian dalam metode ulir, jadi saat ini antrian masih memiliki satu perintah,
Perintah kedua dikirimkan kali ini workQueue.offer (perintah) dijalankan. Sekarang antrian sudah penuh
Sekarang ScheduledExecutorService menjalankan metode resizeThreadPool yang memanggil setCorePoolSize dengan maxThreads. Berikut ini metode setCorePoolSize:
Metode ini menambahkan satu pekerja menggunakan addWorker (null, true). Tidak ada 2 antrian pekerja berjalan, maksimum dan antrian penuh.
Perintah ketiga dikirimkan dan gagal karena workQueue.offer (perintah) dan addWorker (perintah, false) gagal, yang mengarah ke Pengecualian:
Saya pikir untuk menyelesaikan masalah ini Anda harus mengatur kapasitas antrian ke maksimum perintah yang ingin Anda jalankan.
sumber
Tidak yakin apakah ini memenuhi syarat sebagai bug. Ini adalah perilaku ketika utas pekerja tambahan dibuat setelah antrian penuh tetapi ini telah dicatat dalam java docs bahwa pemanggil harus berurusan dengan tugas yang ditolak.
Dokumen Java
Saat Anda mengubah ukuran ukuran kumpulan inti, katakanlah tambah, pekerja tambahan dibuat (
addWorker
metode masuksetCorePoolSize
) dan panggilan untuk membuat pekerjaan tambahan (addWorker
metode dariexecute
) ditolak ketikaaddWorker
pengembalian salah (add Worker
cuplikan kode terakhir) karena cukup banyak pekerja tambahan sudah dibuat olehsetCorePoolSize
tetapi belum berjalan untuk mencerminkan pembaruan dalam antrian .Bagian yang relevan
Membandingkan
Gunakan pengendali eksekusi ulangan kustom coba ulang (ini harus bekerja untuk kasus Anda karena Anda memiliki batas atas sebagai ukuran kumpulan max). Harap sesuaikan sesuai kebutuhan.
Perhatikan juga bahwa penggunaan shutdown Anda tidak benar karena ini tidak akan menunggu tugas yang diajukan untuk menyelesaikan eksekusi tetapi digunakan bersama
awaitTermination
.sumber