Java ThreadPoolExecutor: Memperbarui ukuran kumpulan inti secara dinamis menolak tugas yang masuk sebentar-sebentar

13

Saya mengalami masalah di mana jika saya mencoba untuk mengubah ukuran ukuran ThreadPoolExecutorinti kolam ke nomor yang berbeda setelah kolam telah dibuat, maka sebentar-sebentar, beberapa tugas ditolak dengan RejectedExecutionExceptionmeskipun saya tidak pernah mengirimkan lebih dari queueSize + maxPoolSizejumlah tugas.

Masalah yang saya coba pecahkan adalah untuk memperluas ThreadPoolExecutoryang mengubah ukuran utas inti berdasarkan eksekusi yang tertunda dalam antrian kumpulan thread. Saya memerlukan ini karena secara default a ThreadPoolExecutorakan membuat yang baru Threadhanya jika antrian penuh.

Berikut ini adalah program Java 8 mandiri kecil mandiri yang menunjukkan masalah.

import static java.lang.Math.max;
import static java.lang.Math.min;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolResizeTest {

    public static void main(String[] args) throws Exception {
        // increase the number of iterations if unable to reproduce
        // for me 100 iterations have been enough
        int numberOfExecutions = 100;

        for (int i = 1; i <= numberOfExecutions; i++) {
            executeOnce();
        }
    }

    private static void executeOnce() throws Exception {
        int minThreads = 1;
        int maxThreads = 5;
        int queueCapacity = 10;

        ThreadPoolExecutor pool = new ThreadPoolExecutor(
                minThreads, maxThreads,
                0, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(queueCapacity),
                new ThreadPoolExecutor.AbortPolicy()
        );

        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
        scheduler.scheduleAtFixedRate(() -> resizeThreadPool(pool, minThreads, maxThreads),
                0, 10, TimeUnit.MILLISECONDS);
        CompletableFuture<Void> taskBlocker = new CompletableFuture<>();

        try {
            int totalTasksToSubmit = queueCapacity + maxThreads;

            for (int i = 1; i <= totalTasksToSubmit; i++) {
                // following line sometimes throws a RejectedExecutionException
                pool.submit(() -> {
                    // block the thread and prevent it from completing the task
                    taskBlocker.join();
                });
                // Thread.sleep(10); //enabling even a small sleep makes the problem go away
            }
        } finally {
            taskBlocker.complete(null);
            scheduler.shutdown();
            pool.shutdown();
        }
    }

    /**
     * Resize the thread pool if the number of pending tasks are non-zero.
     */
    private static void resizeThreadPool(ThreadPoolExecutor pool, int minThreads, int maxThreads) {
        int pendingExecutions = pool.getQueue().size();
        int approximateRunningExecutions = pool.getActiveCount();

        /*
         * New core thread count should be the sum of pending and currently executing tasks
         * with an upper bound of maxThreads and a lower bound of minThreads.
         */
        int newThreadCount = min(maxThreads, max(minThreads, pendingExecutions + approximateRunningExecutions));

        pool.setCorePoolSize(newThreadCount);
        pool.prestartAllCoreThreads();
    }
}

Mengapa kumpulan ini harus pernah melempar RejectedExecutionException jika saya tidak pernah mengirimkan lebih dari queueCapacity + maxThreads. Saya tidak pernah mengubah utas maksimal, jadi menurut definisi ThreadPoolExecutor, ia harus mengakomodasi tugas dalam Utas atau ke antrian.

Tentu saja, jika saya tidak pernah mengubah ukuran pool, maka pool thread tidak pernah menolak pengiriman. Ini juga sulit untuk di-debug karena menambahkan segala jenis keterlambatan dalam pengiriman membuat masalah hilang.

Adakah petunjuk tentang cara memperbaiki RejectedExecutionException?

Swaranga Sarma
sumber
Mengapa tidak menyediakan implementasi Anda sendiri ExecutorServicedengan membungkus yang sudah ada, yang mengirimkan kembali tugas-tugas yang gagal diajukan karena pengubahan ukuran?
daniu
@aniu itu solusinya. Inti dari pertanyaan adalah mengapa kolam harus pernah melemparkan RejectedExecutionException jika saya tidak pernah mengirimkan lebih dari queueCapacity + maxThreads. Saya tidak pernah mengubah utas maksimal, jadi menurut definisi ThreadPoolExecutor, ia harus mengakomodasi tugas dalam Utas atau ke antrian.
Swaranga Sarma
Oke, sepertinya saya salah mengerti pertanyaan Anda. Apa itu? Apakah Anda ingin tahu mengapa perilaku itu terjadi atau bagaimana Anda menyiasatinya menyebabkan masalah bagi Anda?
daniu
Ya, mengubah implementasi saya ke layanan pelaksana tidak layak karena banyak kode mengacu pada ThreadPoolExecutor. Jadi jika saya masih ingin memiliki ThreadPoolExecutor yang dapat diubah ukurannya, saya perlu tahu bagaimana cara memperbaikinya. Semoga cara yang tepat untuk melakukan hal seperti ini adalah untuk memperpanjang ThreadPoolExecutor dan mendapatkan akses ke beberapa variabel yang dilindungi dan memperbarui ukuran kumpulan dalam blok yang disinkronkan pada kunci bersama oleh kelas super.
Swaranga Sarma
Memperluas ThreadPoolExecutormungkin adalah ide yang buruk, dan tidakkah Anda perlu mengubah kode yang ada dalam kasus ini juga? Sebaiknya Anda memberikan beberapa contoh bagaimana kode aktual Anda mengakses pelaksana. Saya akan terkejut jika menggunakan banyak metode khusus ThreadPoolExecutor(yaitu tidak di ExecutorService).
daniu

Jawaban:

5

Berikut ini adalah skenario mengapa ini terjadi:

Dalam contoh saya, saya menggunakan minThreads = 0, maxThreads = 2 dan queueCapacity = 2 untuk membuatnya lebih pendek. Perintah pertama dikirimkan, ini dilakukan dalam metode eksekusi:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

untuk perintah ini workQueue.offer (perintah) daripada addWorker (null, false) dieksekusi. Utas pekerja pertama-tama mengeluarkan perintah ini dari antrian dalam metode ulir, jadi saat ini antrian masih memiliki satu perintah,

Perintah kedua dikirimkan kali ini workQueue.offer (perintah) dijalankan. Sekarang antrian sudah penuh

Sekarang ScheduledExecutorService menjalankan metode resizeThreadPool yang memanggil setCorePoolSize dengan maxThreads. Berikut ini metode setCorePoolSize:

 public void setCorePoolSize(int corePoolSize) {
    if (corePoolSize < 0)
        throw new IllegalArgumentException();
    int delta = corePoolSize - this.corePoolSize;
    this.corePoolSize = corePoolSize;
    if (workerCountOf(ctl.get()) > corePoolSize)
        interruptIdleWorkers();
    else if (delta > 0) {
        // We don't really know how many new threads are "needed".
        // As a heuristic, prestart enough new workers (up to new
        // core size) to handle the current number of tasks in
        // queue, but stop if queue becomes empty while doing so.
        int k = Math.min(delta, workQueue.size());
        while (k-- > 0 && addWorker(null, true)) {
            if (workQueue.isEmpty())
                break;
        }
    }
}

Metode ini menambahkan satu pekerja menggunakan addWorker (null, true). Tidak ada 2 antrian pekerja berjalan, maksimum dan antrian penuh.

Perintah ketiga dikirimkan dan gagal karena workQueue.offer (perintah) dan addWorker (perintah, false) gagal, yang mengarah ke Pengecualian:

java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@24c22fe rejected from java.util.concurrent.ThreadPoolExecutor@cd1e646[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at ThreadPoolResizeTest.executeOnce(ThreadPoolResizeTest.java:60)
at ThreadPoolResizeTest.runTest(ThreadPoolResizeTest.java:28)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:69)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:48)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
at org.junit.runners.ParentRunner.run(ParentRunner.java:292)
at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)

Saya pikir untuk menyelesaikan masalah ini Anda harus mengatur kapasitas antrian ke maksimum perintah yang ingin Anda jalankan.

Thomas Krieger
sumber
Benar. Saya dapat repro dengan menyalin kode ke kelas saya sendiri dan menambahkan penebang. Pada dasarnya, ketika antrian penuh, dan saya mengirimkan tugas baru, itu akan mencoba untuk membuat Pekerja baru. Sementara itu jika pada saat itu, resizer saya juga memanggil setCorePoolSize ke 2, yang juga membuat Pekerja baru. Pada titik ini, dua Pekerja bersaing untuk ditambahkan tetapi mereka berdua tidak bisa karena itu akan melanggar batasan ukuran max-pool-size sehingga pengajuan tugas baru ditolak. Saya pikir ini adalah kondisi lomba dan saya mengajukan laporan bug ke OpenJDK. Ayo lihat. Tapi Anda menjawab pertanyaan saya sehingga Anda mendapatkan hadiahnya. Terima kasih.
Swaranga Sarma
2

Tidak yakin apakah ini memenuhi syarat sebagai bug. Ini adalah perilaku ketika utas pekerja tambahan dibuat setelah antrian penuh tetapi ini telah dicatat dalam java docs bahwa pemanggil harus berurusan dengan tugas yang ditolak.

Dokumen Java

Pabrik untuk utas baru. Semua utas dibuat menggunakan pabrik ini (melalui metode addWorker). Semua penelepon harus siap agar addWorker gagal, yang mungkin mencerminkan sistem atau kebijakan pengguna yang membatasi jumlah utas. Meskipun tidak diperlakukan sebagai kesalahan, kegagalan untuk membuat utas dapat mengakibatkan tugas baru ditolak atau yang sudah ada terjebak dalam antrian.

Saat Anda mengubah ukuran ukuran kumpulan inti, katakanlah tambah, pekerja tambahan dibuat ( addWorkermetode masuk setCorePoolSize) dan panggilan untuk membuat pekerjaan tambahan (addWorker metode dari execute) ditolak ketika addWorkerpengembalian salah ( add Workercuplikan kode terakhir) karena cukup banyak pekerja tambahan sudah dibuat oleh setCorePoolSize tetapi belum berjalan untuk mencerminkan pembaruan dalam antrian .

Bagian yang relevan

Membandingkan

public void setCorePoolSize(int corePoolSize) {
    ....
    int k = Math.min(delta, workQueue.size());
    while (k-- > 0 && addWorker(null, true)) {
        if (workQueue.isEmpty())
             break;
    }
}

public void execute(Runnable command) {
    ...
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

private boolean addWorker(Runnable firstTask, boolean core) {
....
   if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
     return false;             
}

Gunakan pengendali eksekusi ulangan kustom coba ulang (ini harus bekerja untuk kasus Anda karena Anda memiliki batas atas sebagai ukuran kumpulan max). Harap sesuaikan sesuai kebutuhan.

public static class RetryRejectionPolicy implements RejectedExecutionHandler {
    public RetryRejectionPolicy () {}

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
           while(true)
            if(e.getQueue().offer(r)) break;
        }
    }
}

ThreadPoolExecutor pool = new ThreadPoolExecutor(
      minThreads, maxThreads,
      0, TimeUnit.SECONDS,
      new LinkedBlockingQueue<Runnable>(queueCapacity),
      new ThreadPoolResizeTest.RetryRejectionPolicy()
 );

Perhatikan juga bahwa penggunaan shutdown Anda tidak benar karena ini tidak akan menunggu tugas yang diajukan untuk menyelesaikan eksekusi tetapi digunakan bersama awaitTermination.

Sagar Veeram
sumber
Saya pikir shutdown menunggu tugas yang sudah diserahkan, menurut JavaDoc: shutdown () Memulai shutdown tertib di mana tugas yang sebelumnya dikirimkan dieksekusi, tetapi tidak ada tugas baru yang akan diterima.
Thomas Krieger
@ThomasKrieger - Ini akan menjalankan tugas yang sudah diserahkan tetapi tidak akan menunggu sampai selesai - dari docs docs.oracle.com/javase/7/docs/api/java/util/concurrent/… - Metode ini tidak menunggu untuk yang sebelumnya dikirimkan tugas untuk menyelesaikan eksekusi. GunakanTerminasi menunggu untuk melakukan itu.
Sagar Veeram