Bagaimana cara agar ThreadPoolExecutor meningkatkan utas menjadi maksimal sebelum mengantri?

100

Saya merasa frustrasi selama beberapa waktu dengan perilaku default ThreadPoolExecutoryang mendukung ExecutorServicekumpulan thread yang digunakan oleh banyak dari kita. Mengutip dari Javadocs:

Jika ada lebih dari corePoolSize tetapi kurang dari maximumPoolSize utas yang berjalan, utas baru akan dibuat hanya jika antrian penuh .

Artinya, jika Anda menentukan kumpulan utas dengan kode berikut, itu tidak akan pernah memulai utas ke-2 karena LinkedBlockingQueuetidak dibatasi.

ExecutorService threadPool =
   new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/,
      TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(/* unlimited queue */));

Hanya jika Anda memiliki antrian terbatas dan antrian penuh , barulah utas di atas nomor inti dimulai. Saya menduga sejumlah besar programmer multithread Java junior tidak menyadari perilaku file ThreadPoolExecutor.

Sekarang saya memiliki kasus penggunaan khusus di mana ini tidak optimal. Saya mencari cara, tanpa menulis kelas TPE saya sendiri, untuk mengatasinya.

Persyaratan saya adalah untuk layanan web yang membuat panggilan balik ke pihak ke-3 yang mungkin tidak dapat diandalkan.

  • Saya tidak ingin melakukan panggilan balik secara sinkron dengan permintaan web, jadi saya ingin menggunakan kumpulan utas.
  • Saya biasanya mendapatkan beberapa ini sebentar jadi saya tidak ingin memiliki newFixedThreadPool(...)sejumlah besar utas yang sebagian besar tidak aktif.
  • Seringkali saya mendapatkan lonjakan lalu lintas ini dan saya ingin meningkatkan jumlah utas ke beberapa nilai maksimal (katakanlah 50).
  • Saya perlu melakukan upaya terbaik untuk melakukan semua panggilan balik jadi saya ingin mengantri panggilan tambahan di atas 50. Saya tidak ingin membebani server web saya yang lain dengan menggunakan file newCachedThreadPool().

Bagaimana saya bisa mengatasi batasan ini di ThreadPoolExecutormana antrian perlu dibatasi dan penuh sebelum lebih banyak utas akan dimulai? Bagaimana saya bisa membuatnya memulai lebih banyak utas sebelum mengantri tugas?

Edit:

@Flavio membuat poin yang baik tentang menggunakan ThreadPoolExecutor.allowCoreThreadTimeOut(true)agar utas inti batas waktu dan keluar. Saya mempertimbangkannya tetapi saya masih menginginkan fitur core-threads. Saya tidak ingin jumlah utas di kolam turun di bawah ukuran inti jika memungkinkan.

Abu-abu
sumber
1
Mengingat bahwa contoh Anda membuat maksimal 10 utas, apakah ada penghematan nyata dalam menggunakan sesuatu yang tumbuh / menyusut di atas kumpulan utas berukuran tetap?
bstempi
Poin bagus @bempi. Jumlahnya agak sewenang-wenang. Saya telah meningkatkannya dalam pertanyaan menjadi 50. Tidak yakin berapa banyak utas bersamaan yang sebenarnya ingin saya gunakan sekarang karena saya memiliki solusi ini.
Gray
1
Sial! 10 suara positif jika saya bisa di sini, posisi yang persis sama dengan saya.
Eugene

Jawaban:

51

Bagaimana saya bisa mengatasi batasan ini di ThreadPoolExecutormana antrian harus dibatasi dan penuh sebelum lebih banyak utas akan dimulai.

Saya yakin saya akhirnya menemukan solusi yang agak elegan (mungkin sedikit hacky) untuk batasan ini dengan ThreadPoolExecutor. Ini melibatkan memperluas LinkedBlockingQueueuntuk memilikinya kembali falseuntuk queue.offer(...)saat ini sudah ada beberapa tugas antri. Jika utas saat ini tidak mengikuti tugas yang diantrekan, TPE akan menambahkan utas tambahan. Jika kumpulan sudah di utas maksimal, maka RejectedExecutionHandlerakan dipanggil. Ini adalah pawang yang kemudian memasukkan put(...)ke dalam antrian.

Memang aneh menulis antrian di mana offer(...)bisa kembali falsedan put()tidak pernah diblokir jadi itu bagian hacknya. Tapi ini bekerja dengan baik dengan penggunaan antrian TPE jadi saya tidak melihat ada masalah dengan melakukan ini.

Berikut kodenya:

// extend LinkedBlockingQueue to force offer() to return false conditionally
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    private static final long serialVersionUID = -6903933921423432194L;
    @Override
    public boolean offer(Runnable e) {
        // Offer it to the queue if there is 0 items already queued, else
        // return false so the TPE will add another thread. If we return false
        // and max threads have been reached then the RejectedExecutionHandler
        // will be called which will do the put into the queue.
        if (size() == 0) {
            return super.offer(e);
        } else {
            return false;
        }
    }
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/,
        60 /*secs*/, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            // This does the actual put into the queue. Once the max threads
            //  have been reached, the tasks will then queue up.
            executor.getQueue().put(r);
            // we do this after the put() to stop race conditions
            if (executor.isShutdown()) {
                throw new RejectedExecutionException(
                    "Task " + r + " rejected from " + e);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
    }
});

Dengan mekanisme ini, ketika saya mengirimkan tugas ke antrian, ThreadPoolExecutorkeinginan:

  1. Skala jumlah utas hingga ukuran inti awalnya (di sini 1).
  2. Tawarkan ke antrean. Jika antrian kosong maka antrian akan ditangani oleh thread yang ada.
  3. Jika antrian sudah memiliki 1 atau lebih elemen, offer(...)akan mengembalikan false.
  4. Jika false dikembalikan, naikkan jumlah utas di kumpulan hingga mencapai jumlah maksimal (di sini 50).
  5. Jika di maks maka itu memanggil RejectedExecutionHandler
  6. The RejectedExecutionHandlermenempatkan maka tugas ke dalam antrian untuk diproses oleh thread pertama yang tersedia dalam rangka FIFO.

Meskipun dalam kode contoh saya di atas, antrian tidak dibatasi, Anda juga dapat mendefinisikannya sebagai antrian berbatas. Misalnya, jika Anda menambahkan kapasitas 1000 LinkedBlockingQueuemaka itu akan:

  1. skala utas hingga maks
  2. lalu antri hingga penuh dengan 1000 tugas
  3. kemudian blokir penelepon hingga tersedia ruang untuk antrian.

Juga, jika Anda perlu menggunakan offer(...)dalam RejectedExecutionHandlermaka Anda bisa menggunakan offer(E, long, TimeUnit)metode sebagai gantinya dengan Long.MAX_VALUEsebagai batas waktu.

Peringatan:

Jika Anda mengharapkan tugas ditambahkan ke eksekutor setelah dimatikan, maka Anda mungkin ingin lebih pintar membuang RejectedExecutionExceptionkebiasaan kami RejectedExecutionHandlerketika layanan-pelaksana telah dimatikan. Terima kasih kepada @RaduToader karena telah menunjukkan hal ini.

Edit:

Perubahan lain untuk jawaban ini adalah menanyakan TPE jika ada utas yang tidak digunakan dan hanya mengantrekan item jika ada. Anda harus membuat kelas yang benar untuk ini dan menambahkan ourQueue.setThreadPoolExecutor(tpe);metode di atasnya.

Maka offer(...)metode Anda mungkin terlihat seperti:

  1. Periksa untuk melihat apakah tpe.getPoolSize() == tpe.getMaximumPoolSize()dalam hal ini panggil saja super.offer(...).
  2. Lain jika tpe.getPoolSize() > tpe.getActiveCount()kemudian panggil super.offer(...)karena tampaknya ada utas yang menganggur.
  3. Jika tidak kembali falseke garpu utas lain.

Mungkin ini:

int poolSize = tpe.getPoolSize();
int maximumPoolSize = tpe.getMaximumPoolSize();
if (poolSize >= maximumPoolSize || poolSize > tpe.getActiveCount()) {
    return super.offer(e);
} else {
    return false;
}

Perhatikan bahwa metode get pada TPE mahal karena mereka mengakses volatilebidang atau (dalam kasus getActiveCount()) mengunci TPE dan menjalankan daftar utas. Selain itu, ada kondisi balapan di sini yang dapat menyebabkan tugas diantrekan dengan tidak semestinya atau utas lain bercabang ketika ada utas yang tidak aktif.

Abu-abu
sumber
Saya juga berjuang dengan masalah yang sama, berakhir dengan mengesampingkan metode eksekusi. Tapi ini benar-benar solusi yang bagus. :)
Batty
Meskipun saya tidak suka gagasan melanggar kontrak Queueuntuk mencapai ini, Anda tentu tidak sendirian dalam gagasan Anda: groovy-programming.com/post/26923146865
bstempi
3
Tidakkah Anda mendapatkan keanehan di sini karena beberapa tugas pertama akan diantrekan, dan hanya setelah utas baru itu muncul? Misalnya, jika satu utas inti Anda sibuk dengan satu tugas yang berjalan lama, dan Anda memanggil execute(runnable), maka runnablehanya ditambahkan ke antrian. Jika Anda menelepon execute(secondRunnable), kemudian secondRunnableditambahkan ke antrian. Tapi sekarang jika Anda menelepon execute(thirdRunnable), maka thirdRunnableakan dijalankan di thread baru. The runnabledan secondRunnablehanya berjalan sekali thirdRunnable(atau asli lama berjalan tugas) selesai.
Robert Tupelo-Schneck
1
Ya, Robert benar, dalam lingkungan yang memiliki banyak multithread, antrean terkadang bertambah sementara ada utas gratis untuk digunakan. Solusi di bawah ini yang memperluas TPE - bekerja jauh lebih baik. Saya pikir saran Robert harus ditandai sebagai jawaban, meskipun peretasan di atas menarik
Ingin Tahu Semua
1
The "RejectedExecutionHandler" membantu eksekutor saat mematikan. Sekarang Anda dipaksa untuk menggunakan shutdownNow () karena shutdown () tidak mencegah tugas baru ditambahkan (karena reque)
Radu Toader
28

Tetapkan ukuran inti dan ukuran maksimal ke nilai yang sama, dan izinkan utas inti dilepas dari kumpulan allowCoreThreadTimeOut(true).

Flavio
sumber
+1 Ya, saya memikirkannya tetapi saya masih ingin memiliki fitur inti-utas. Saya tidak ingin kumpulan utas beralih ke 0 utas selama periode tidak aktif. Saya akan mengedit pertanyaan saya untuk menunjukkannya. Tapi poin yang bagus.
Gray
Terima kasih! Itu hanya cara termudah untuk melakukan itu.
Dmitry Ovchinnikov
28

Saya sudah mendapat dua jawaban lain untuk pertanyaan ini, tetapi saya rasa yang ini adalah yang terbaik.

Ini berdasarkan teknik jawaban yang diterima saat ini , yaitu:

  1. Ganti metode antrian offer()untuk (terkadang) mengembalikan false,
  2. yang menyebabkan ThreadPoolExecutormunculnya utas baru atau menolak tugas, dan
  3. setel RejectedExecutionHandleruntuk benar - benar mengantrekan tugas pada penolakan.

Masalahnya adalah kapan offer()harus mengembalikan false. Jawaban yang diterima saat ini mengembalikan false ketika antrian memiliki beberapa tugas di atasnya, tetapi seperti yang saya tunjukkan dalam komentar saya di sana, ini menyebabkan efek yang tidak diinginkan. Bergantian, jika Anda selalu mengembalikan false, Anda akan terus memunculkan utas baru bahkan saat utas menunggu di antrean.

Solusinya adalah dengan menggunakan Java 7 LinkedTransferQueuedan memiliki offer()panggilan tryTransfer(). Ketika ada utas konsumen yang menunggu, tugas hanya akan diteruskan ke utas itu. Jika tidak, offer()akan mengembalikan false dan ThreadPoolExecutorakan menelurkan utas baru.

    BlockingQueue<Runnable> queue = new LinkedTransferQueue<Runnable>() {
        @Override
        public boolean offer(Runnable e) {
            return tryTransfer(e);
        }
    };
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue);
    threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                executor.getQueue().put(r);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    });
Robert Tupelo-Schneck
sumber
Saya harus setuju, ini terlihat paling bersih bagi saya. Satu-satunya downside dengan solusi ini adalah bahwa LinkedTransferQueue tidak dibatasi sehingga Anda tidak mendapatkan antrian tugas yang dibatasi kapasitas tanpa pekerjaan tambahan.
Yeroc
Ada masalah saat kolam tumbuh ke ukuran maksimum. Katakanlah kumpulan ditingkatkan ke ukuran maksimum dan setiap utas saat ini menjalankan tugas, ketika runnable yang dikirimkan, penawaran ini akan mengembalikan false dan ThreadPoolExecutor mencoba menambahkan utas addWorker, tetapi kumpulan sudah mencapai maksimum sehingga runnable akan ditolak. Sesuai yang ditolakExceHandler yang Anda tulis, itu akan ditawarkan ke antrean lagi sehingga tarian monyet ini terjadi dari awal lagi.
Sudheera
1
@Sudheera Saya yakin Anda salah. queue.offer(), karena sebenarnya memanggil LinkedTransferQueue.tryTransfer(), akan mengembalikan false dan tidak mengantrekan tugas. Namun RejectedExecutionHandlerpanggilan queue.put(), yang tidak gagal dan mengantrekan tugas.
Robert Tupelo-Schneck
1
@ RobertTupelo-Schneck sangat berguna dan menyenangkan!
Eugene
1
@ RobertTupelo-Schneck Bekerja seperti pesona! Saya tidak tahu mengapa tidak ada yang seperti itu di luar kotak di java
Georgi Peev
7

Catatan: Sekarang saya lebih suka dan merekomendasikan jawaban saya yang lain .

Berikut adalah versi yang menurut saya jauh lebih mudah: Tingkatkan corePoolSize (hingga batas maximumPoolSize) setiap kali tugas baru dijalankan, lalu kurangi corePoolSize (turun ke batas "ukuran kumpulan inti" yang ditentukan pengguna) setiap kali tugas selesai.

Dengan kata lain, lacak jumlah tugas yang berjalan atau diantrekan, dan pastikan corePoolSize sama dengan jumlah tugas selama berada di antara "ukuran kumpulan inti" yang ditentukan pengguna dan maximumPoolSize.

public class GrowBeforeQueueThreadPoolExecutor extends ThreadPoolExecutor {
    private int userSpecifiedCorePoolSize;
    private int taskCount;

    public GrowBeforeQueueThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        userSpecifiedCorePoolSize = corePoolSize;
    }

    @Override
    public void execute(Runnable runnable) {
        synchronized (this) {
            taskCount++;
            setCorePoolSizeToTaskCountWithinBounds();
        }
        super.execute(runnable);
    }

    @Override
    protected void afterExecute(Runnable runnable, Throwable throwable) {
        super.afterExecute(runnable, throwable);
        synchronized (this) {
            taskCount--;
            setCorePoolSizeToTaskCountWithinBounds();
        }
    }

    private void setCorePoolSizeToTaskCountWithinBounds() {
        int threads = taskCount;
        if (threads < userSpecifiedCorePoolSize) threads = userSpecifiedCorePoolSize;
        if (threads > getMaximumPoolSize()) threads = getMaximumPoolSize();
        setCorePoolSize(threads);
    }
}

Seperti yang tertulis, kelas tidak mendukung pengubahan corePoolSize atau maximumPoolSize yang ditentukan pengguna setelah konstruksi, dan tidak mendukung manipulasi antrean pekerjaan secara langsung atau melalui remove()atau purge().

Robert Tupelo-Schneck
sumber
Saya suka kecuali synchronizedbloknya. Bisakah Anda menelepon ke antrian untuk mendapatkan jumlah tugas. Atau mungkin menggunakan AtomicInteger?
Gray
Saya ingin menghindari mereka, tetapi masalahnya adalah ini. Jika ada sejumlah execute()panggilan di utas terpisah, masing-masing akan (1) mencari tahu berapa banyak utas yang dibutuhkan, (2) setCorePoolSizeke nomor itu, dan (3) panggilan super.execute(). Jika langkah (1) dan (2) tidak disinkronkan, saya tidak yakin bagaimana mencegah pengurutan yang tidak menguntungkan di mana Anda mengatur ukuran kumpulan inti ke angka yang lebih rendah setelah angka yang lebih tinggi. Dengan akses langsung ke bidang superclass ini bisa dilakukan dengan menggunakan bandingkan-dan-set, tapi saya tidak melihat cara yang bersih untuk melakukannya di subclass tanpa sinkronisasi.
Robert Tupelo-Schneck
Saya pikir penalti untuk kondisi balapan itu relatif rendah selama taskCountlapangannya valid (mis. A AtomicInteger). Jika dua utas menghitung ulang ukuran kumpulan segera setelah satu sama lain, itu harus mendapatkan nilai yang tepat. Jika yang kedua menyusutkan utas inti maka itu pasti melihat penurunan dalam antrian atau sesuatu.
Gray
1
Sayangnya saya pikir itu lebih buruk dari itu. Misalkan tugas 10 dan 11 memanggil execute(). Masing-masing akan menelepon atomicTaskCount.incrementAndGet()dan mereka akan mendapatkan 10 dan 11 masing-masing. Tetapi tanpa sinkronisasi (lebih dari mendapatkan jumlah tugas dan mengatur ukuran kumpulan inti), Anda kemudian bisa mendapatkan (1) tugas 11 menetapkan ukuran kumpulan inti menjadi 11, (2) tugas 10 menetapkan ukuran kumpulan inti menjadi 10, (3) tugas 10 panggilan super.execute(), (4) tugas 11 panggilan super.execute()dan antrean.
Robert Tupelo-Schneck
2
Saya memberikan solusi ini beberapa pengujian serius dan ini jelas yang terbaik. Dalam lingkungan yang sangat multithread, terkadang masih akan mengantrekan ketika ada utas gratis (karena sifat TPE.execute untaian bebas), tetapi ini jarang terjadi, berlawanan dengan solusi yang ditandai sebagai jawaban, di mana kondisi balapan memiliki lebih banyak peluang untuk terjadi, jadi ini terjadi cukup banyak pada setiap proses multi-thread.
Ingin Tahu Semua
6

Kami memiliki subclass ThreadPoolExecutoryang membutuhkan tambahan creationThresholddan penggantian execute.

public void execute(Runnable command) {
    super.execute(command);
    final int poolSize = getPoolSize();
    if (poolSize < getMaximumPoolSize()) {
        if (getQueue().size() > creationThreshold) {
            synchronized (this) {
                setCorePoolSize(poolSize + 1);
                setCorePoolSize(poolSize);
            }
        }
    }
}

mungkin itu membantu juga, tapi milikmu tentu saja terlihat lebih berseni ...

Ralf H.
sumber
Menarik. Terima kasih untuk ini. Saya sebenarnya tidak tahu bahwa ukuran inti bisa berubah.
Gray
Sekarang setelah saya memikirkannya lagi, solusi ini lebih baik daripada solusi saya dalam hal memeriksa ukuran antrian. Saya telah mengubah jawaban saya agar offer(...)metode hanya mengembalikan secara falsebersyarat. Terima kasih!
Gray
4

Jawaban yang disarankan hanya menyelesaikan satu (1) masalah dengan kumpulan utas JDK:

  1. Kumpulan benang JDK bias terhadap antrian. Jadi, alih-alih menelurkan utas baru, mereka akan mengantrekan tugas. Hanya jika antrian mencapai batasnya, kumpulan utas akan menelurkan utas baru.

  2. Penarikan benang tidak terjadi saat beban berkurang. Misalnya jika kita memiliki ledakan pekerjaan yang mengenai kumpulan yang menyebabkan kumpulan menjadi maksimal, diikuti dengan beban ringan maks 2 tugas sekaligus, kumpulan akan menggunakan semua utas untuk melayani beban ringan yang mencegah penarikan utas. (hanya dibutuhkan 2 utas ...)

Tidak senang dengan perilaku di atas, saya melanjutkan dan menerapkan kumpulan untuk mengatasi kekurangan di atas.

Untuk mengatasi 2) Menggunakan penjadwalan Lifo menyelesaikan masalah. Ide ini dipresentasikan oleh Ben Maurer pada konferensi aplikatif ACM 2015: Skala Sistem @ Facebook

Maka lahirlah implementasi baru:

LifoThreadPoolExecutorSQP

Sejauh ini, implementasi ini meningkatkan kinerja eksekusi asinkron untuk ZEL .

Implementasinya spin mampu mengurangi overhead pengalih konteks, menghasilkan kinerja yang unggul untuk kasus penggunaan tertentu.

Semoga membantu ...

PS: JDK Fork Join Pool mengimplementasikan ExecutorService dan berfungsi sebagai kumpulan thread "normal", Implementasinya berkinerja baik, Menggunakan penjadwalan LIFO Thread, namun tidak ada kontrol atas ukuran antrian internal, batas waktu pensiun ..., dan yang terpenting tugas tidak dapat dilakukan terputus saat membatalkannya

pengguna2179737
sumber
1
Sayang sekali implementasi ini memiliki begitu banyak dependensi eksternal. Membuatnya tidak berguna bagi saya: - /
Martin L.
1
Ini poin yang sangat bagus (ke-2). Sayangnya, implementasinya tidak jelas dari dependensi eksternal, tetapi masih dapat diadopsi jika Anda mau.
Alexey Vlasov
1

Catatan: Sekarang saya lebih suka dan merekomendasikan jawaban saya yang lain .

Saya punya proposal lain, mengikuti ide asli untuk mengubah antrian menjadi salah. Dalam hal ini semua tugas dapat memasuki antrean, tetapi setiap kali tugas diantrekan setelahnya execute(), kami mengikutinya dengan tugas no-op sentinel yang ditolak antrian, menyebabkan utas baru muncul, yang akan mengeksekusi no-op segera diikuti oleh sesuatu dari antrian.

Karena thread pekerja mungkin sedang melakukan polling LinkedBlockingQueueuntuk tugas baru, tugas mungkin akan diantrekan bahkan saat ada thread yang tersedia. Untuk menghindari pemijahan utas baru bahkan ketika ada utas yang tersedia, kita perlu melacak berapa banyak utas yang menunggu tugas baru di antrean, dan hanya menelurkan utas baru ketika ada lebih banyak tugas di antrean daripada utas menunggu.

final Runnable SENTINEL_NO_OP = new Runnable() { public void run() { } };

final AtomicInteger waitingThreads = new AtomicInteger(0);

BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    @Override
    public boolean offer(Runnable e) {
        // offer returning false will cause the executor to spawn a new thread
        if (e == SENTINEL_NO_OP) return size() <= waitingThreads.get();
        else return super.offer(e);
    }

    @Override
    public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
        try {
            waitingThreads.incrementAndGet();
            return super.poll(timeout, unit);
        } finally {
            waitingThreads.decrementAndGet();
        }
    }

    @Override
    public Runnable take() throws InterruptedException {
        try {
            waitingThreads.incrementAndGet();
            return super.take();
        } finally {
            waitingThreads.decrementAndGet();
        }
    }
};

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue) {
    @Override
    public void execute(Runnable command) {
        super.execute(command);
        if (getQueue().size() > waitingThreads.get()) super.execute(SENTINEL_NO_OP);
    }
};
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (r == SENTINEL_NO_OP) return;
        else throw new RejectedExecutionException();            
    }
});
Robert Tupelo-Schneck
sumber
0

Solusi terbaik yang dapat saya pikirkan adalah memperpanjang.

ThreadPoolExecutormenawarkan beberapa metode hook: beforeExecutedan afterExecute. Dalam ekstensi Anda, Anda dapat mempertahankan menggunakan antrian terbatas untuk memasukkan tugas dan antrian tak terbatas kedua untuk menangani overflow. Saat seseorang menelepon submit, Anda dapat mencoba menempatkan permintaan ke dalam antrian terbatas. Jika Anda bertemu dengan pengecualian, Anda cukup menempelkan tugas di antrian overflow Anda. Anda kemudian dapat menggunakan afterExecutehook untuk melihat apakah ada sesuatu dalam antrian overflow setelah menyelesaikan tugas. Dengan cara ini, pelaksana akan menangani hal-hal yang ada di antrean terbatasnya terlebih dahulu, dan secara otomatis menarik dari antrean tak terbatas ini jika waktu mengizinkan.

Sepertinya lebih banyak pekerjaan daripada solusi Anda, tetapi setidaknya ini tidak melibatkan pemberian perilaku yang tidak terduga. Saya juga membayangkan bahwa ada cara yang lebih baik untuk memeriksa status antrian dan utas daripada mengandalkan pengecualian, yang cukup lambat untuk dilempar.

bstempi
sumber
Saya tidak menyukai solusi ini. Saya cukup yakin bahwa ThreadPoolExecutor tidak dirancang untuk warisan.
scottb
Sebenarnya ada contoh ekstensi tepat di JavaDoc. Mereka menyatakan bahwa sebagian besar mungkin hanya akan menerapkan metode hook, tetapi mereka memberi tahu Anda apa lagi yang perlu Anda perhatikan saat memperluas.
bstempi
0

Catatan: Untuk JDK ThreadPoolExecutor saat Anda memiliki antrean terbatas, Anda hanya membuat utas baru saat penawaran mengembalikan false. Anda mungkin mendapatkan sesuatu yang berguna dengan CallerRunsPolicy yang membuat sedikit BackPressure dan panggilan langsung dijalankan di thread pemanggil.

Saya membutuhkan tugas untuk dieksekusi dari utas yang dibuat oleh kolam dan memiliki antrian di mana-mana untuk penjadwalan, sementara jumlah utas di dalam kumpulan dapat bertambah atau menyusut antara corePoolSize dan maximumPoolSize jadi ...

Saya akhirnya melakukan copy paste penuh dari ThreadPoolExecutor dan mengubah sedikit metode eksekusi karena sayangnya ini tidak dapat dilakukan dengan ekstensi (ini disebut metode pribadi).

Saya tidak ingin menelurkan utas baru segera ketika permintaan baru tiba dan semua utas sibuk (karena saya memiliki tugas umum yang berumur pendek). Saya telah menambahkan ambang tetapi jangan ragu untuk mengubahnya sesuai kebutuhan Anda (mungkin untuk sebagian besar IO lebih baik menghapus ambang ini)

private final AtomicInteger activeWorkers = new AtomicInteger(0);
private volatile double threshold = 0.7d;

protected void beforeExecute(Thread t, Runnable r) {
    activeWorkers.incrementAndGet();
}
protected void afterExecute(Runnable r, Throwable t) {
    activeWorkers.decrementAndGet();
}
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }

        if (isRunning(c) && this.workQueue.offer(command)) {
            int recheck = this.ctl.get();
            if (!isRunning(recheck) && this.remove(command)) {
                this.reject(command);
            } else if (workerCountOf(recheck) == 0) {
                this.addWorker((Runnable) null, false);
            }
            //>>change start
            else if (workerCountOf(recheck) < maximumPoolSize //
                && (activeWorkers.get() > workerCountOf(recheck) * threshold
                    || workQueue.size() > workerCountOf(recheck) * threshold)) {
                this.addWorker((Runnable) null, false);
            }
            //<<change end
        } else if (!this.addWorker(command, false)) {
            this.reject(command);
        }
    }
Radu Toader
sumber