Saya ingin membuat ThreadPoolExecutor
sedemikian rupa sehingga ketika telah mencapai ukuran maksimum dan antrian penuh, submit()
metode memblokir ketika mencoba menambahkan tugas baru. Apakah saya perlu menerapkan kustom RejectedExecutionHandler
untuk itu atau adakah cara yang sudah ada untuk melakukan ini menggunakan pustaka Java standar?
java
concurrency
executor
Fixpoint
sumber
sumber
Jawaban:
Salah satu solusi yang mungkin saya temukan:
Apakah ada solusi lain? Saya lebih suka sesuatu yang berdasarkan
RejectedExecutionHandler
karena sepertinya cara standar untuk menangani situasi seperti itu.sumber
throw e;
adalah TIDAK dalam buku. JCIP benar!Anda dapat menggunakan ThreadPoolExecutor dan blockingQueue:
sumber
Anda harus menggunakan
CallerRunsPolicy
, yang menjalankan tugas yang ditolak di thread pemanggil. Dengan cara ini, itu tidak dapat mengirimkan tugas baru apa pun ke pelaksana sampai tugas itu selesai, pada titik mana akan ada beberapa utas kumpulan gratis atau proses akan berulang.http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.CallerRunsPolicy.html
Dari dokumen:
Selain itu, pastikan untuk menggunakan antrean terbatas, seperti ArrayBlockingQueue, saat memanggil
ThreadPoolExecutor
konstruktor. Jika tidak, tidak ada yang akan ditolak.Edit: sebagai tanggapan atas komentar Anda, setel ukuran ArrayBlockingQueue agar sama dengan ukuran maksimal kumpulan utas dan gunakan AbortPolicy.
Edit 2: Oke, saya mengerti apa yang Anda maksud. Bagaimana dengan ini: timpa
beforeExecute()
metode untuk memeriksa yanggetActiveCount()
tidak melebihigetMaximumPoolSize()
, dan jika ya, tidur dan coba lagi?sumber
Hibernate memiliki
BlockPolicy
yang sederhana dan dapat melakukan apa yang Anda inginkan:Lihat: Executors.java
sumber
ThreadPoolExecutor
even secara harfiah mengatakan: "Metode getQueue () memungkinkan akses ke antrian pekerjaan untuk tujuan pemantauan dan debugging. Penggunaan metode ini untuk tujuan lain sangat tidak disarankan.". Bahwa ini tersedia di perpustakaan yang sangat terkenal, sungguh menyedihkan untuk dilihat.The
BoundedExecutor
jawaban yang dikutip di atas dari Jawa Concurrency dalam Praktek hanya bekerja dengan benar jika Anda menggunakan antrian tak terbatas untuk Pelaksana, atau semaphore terikat tidak lebih besar dari ukuran antrian. Semaphore adalah state yang dibagi antara thread pengirim dan thread dalam kumpulan, sehingga memungkinkan untuk memenuhi pelaksana bahkan jika ukuran antrian <terikat <= (ukuran antrian + ukuran kelompok).Penggunaan
CallerRunsPolicy
hanya valid jika tugas Anda tidak berjalan selamanya, dalam hal ini thread pengiriman Anda akan tetap adarejectedExecution
selamanya, dan ide yang buruk jika tugas Anda memakan waktu lama untuk dijalankan, karena thread pengiriman tidak dapat mengirimkan tugas baru atau melakukan hal lain jika menjalankan tugas sendiri.Jika itu tidak dapat diterima, maka saya sarankan untuk memeriksa ukuran antrian terikat pelaksana sebelum mengirimkan tugas. Jika antrian sudah penuh, tunggu beberapa saat sebelum mencoba mengirim lagi. Throughput akan menderita, tetapi saya menyarankan ini adalah solusi yang lebih sederhana daripada banyak solusi lain yang diusulkan dan Anda dijamin tidak ada tugas yang akan ditolak.
sumber
Saya tahu, ini adalah peretasan, tetapi menurut saya peretasan paling bersih di antara yang ditawarkan di sini ;-)
Karena ThreadPoolExecutor menggunakan antrean pemblokiran "offer" daripada "put", mari kita timpa perilaku "offer" dari antrean pemblokiran:
Saya mengujinya dan tampaknya berhasil. Menerapkan beberapa kebijakan batas waktu ditinggalkan sebagai latihan pembaca.
sumber
Kelas berikut membungkus ThreadPoolExecutor dan menggunakan Semaphore untuk memblokir maka antrian pekerjaan penuh:
Kelas pembungkus ini didasarkan pada solusi yang diberikan dalam buku Java Concurrency in Practice oleh Brian Goetz. Solusi dalam buku ini hanya membutuhkan dua parameter konstruktor: an
Executor
dan batas yang digunakan untuk semaphore. Ini ditunjukkan dalam jawaban yang diberikan oleh Fixpoint. Ada masalah dengan pendekatan itu: ia bisa berada dalam keadaan di mana utas kumpulan sibuk, antrean penuh, tetapi semafor baru saja merilis izin. (semaphore.release()
di blok terakhir). Dalam keadaan ini, tugas baru bisa mengambil izin yang baru saja dirilis, tetapi ditolak karena antrean tugas penuh. Tentu saja ini bukanlah sesuatu yang Anda inginkan; yang ingin Anda blokir dalam kasus ini.Untuk mengatasi ini, kita harus menggunakan antrian tak terbatas , seperti yang disebutkan JCiP dengan jelas. Semaphore bertindak sebagai penjaga, memberikan efek ukuran antrian virtual. Ini memiliki efek samping yang memungkinkan bahwa unit dapat berisi
maxPoolSize + virtualQueueSize + maxPoolSize
tugas. Mengapa demikian? Karenasemaphore.release()
di blok akhirnya. Jika semua utas pangkalan memanggil pernyataan ini pada saat yang sama, makamaxPoolSize
izin dilepaskan, memungkinkan jumlah tugas yang sama untuk memasuki unit. Jika kami menggunakan antrian terbatas, itu akan tetap penuh, sehingga tugas ditolak. Sekarang, karena kita tahu bahwa ini hanya terjadi ketika utas kumpulan hampir selesai, ini bukan masalah. Kami tahu bahwa utas kumpulan tidak akan diblokir, jadi tugas akan segera diambil dari antrean.Anda dapat menggunakan antrian terbatas. Pastikan ukurannya sama
virtualQueueSize + maxPoolSize
. Ukuran yang lebih besar tidak berguna, semaphore akan mencegah masuknya lebih banyak item. Ukuran yang lebih kecil akan mengakibatkan tugas ditolak. Kemungkinan tugas ditolak meningkat seiring dengan berkurangnya ukuran. Misalnya, Anda menginginkan eksekutor terikat dengan maxPoolSize = 2 dan virtualQueueSize = 5. Kemudian ambil semaphore dengan 5 + 2 = 7 izin dan ukuran antrian aktual 5 + 2 = 7. Jumlah tugas sebenarnya yang bisa di unit tersebut kemudian 2 + 5 + 2 = 9. Ketika pelaksana penuh (5 tugas dalam antrian, 2 di kumpulan utas, jadi 0 izin tersedia) dan SEMUA utas kumpulan melepaskan izin mereka, maka tepat 2 izin dapat diambil oleh tugas yang masuk.Sekarang solusi dari JCiP agak rumit untuk digunakan karena tidak memberlakukan semua batasan ini (antrian tidak terbatas, atau dibatasi dengan batasan matematika tersebut, dll.). Saya pikir ini hanya berfungsi sebagai contoh yang baik untuk mendemonstrasikan bagaimana Anda dapat membangun kelas aman thread baru berdasarkan bagian-bagian yang sudah tersedia, tetapi bukan sebagai kelas dewasa yang dapat digunakan kembali. Saya tidak berpikir bahwa yang terakhir adalah niat penulisnya.
sumber
Anda dapat menggunakan RejectedExecutionHandler kustom seperti ini
sumber
Buat antrean pemblokiran Anda sendiri untuk digunakan oleh Pelaksana, dengan perilaku pemblokiran yang Anda cari, sambil selalu mengembalikan kapasitas yang tersisa (memastikan pelaksana tidak akan mencoba membuat lebih banyak utas daripada kumpulan intinya, atau memicu penangan penolakan).
Saya yakin ini akan membuat Anda mendapatkan perilaku memblokir yang Anda cari. Penangan penolakan tidak akan pernah sesuai dengan tagihan, karena itu menunjukkan pelaksana tidak dapat melakukan tugas. Apa yang bisa saya bayangkan di sana adalah bahwa Anda mendapatkan semacam 'sibuk menunggu' di pawang. Bukan itu yang Anda inginkan, Anda menginginkan antrian untuk pelaksana yang memblokir pemanggil ...
sumber
ThreadPoolExecutor
menggunakanoffer
metode untuk menambahkan tugas ke antrian. Jika saya membuat kebiasaanBlockingQueue
yang menghalangioffer
, itu akan memutuskanBlockingQueue
kontrak.ThreadPoolExecutor
diimplementasikan untuk digunakanoffer
dan bukanput
(versi pemblokiran)? Selain itu, jika ada cara bagi kode klien untuk menentukan mana yang akan digunakan kapan, banyak orang yang mencoba menggunakan solusi kustom akan merasa legaUntuk menghindari masalah dengan solusi @FixPoint. Seseorang dapat menggunakan ListeningExecutorService dan melepaskan semaphore onSuccess dan onFailure di dalam FutureCallback.
sumber
Runnable
karena metode tersebut masih dipanggil sebelum pekerja membersihkan secara normalThreadPoolExecutor
. Artinya, Anda masih perlu menangani pengecualian penolakan.Baru-baru ini saya menemukan pertanyaan ini mengalami masalah yang sama. OP tidak mengatakannya secara eksplisit, tapi kami tidak ingin menggunakan
RejectedExecutionHandler
yang mengeksekusi tugas di thread pengirim, karena ini akan mengurangi penggunaan thread pekerja jika tugas ini berjalan lama.Membaca semua jawaban dan komentar, khususnya solusi yang cacat dengan semaphore atau menggunakan,
afterExecute
saya telah melihat lebih dekat kode ThreadPoolExecutor untuk melihat apakah ada jalan keluar. Saya kagum melihat ada lebih dari 2000 baris kode (berkomentar), beberapa di antaranya membuat saya pusing . Mengingat persyaratan yang agak sederhana yang sebenarnya saya miliki - satu produsen, beberapa konsumen, biarkan produsen memblokir ketika tidak ada konsumen yang dapat bekerja - saya memutuskan untuk membuat solusi sendiri. Ini bukanExecutorService
hanya sebuahExecutor
. Dan itu tidak menyesuaikan jumlah utas dengan beban kerja, tetapi hanya menampung sejumlah utas, yang juga sesuai dengan kebutuhan saya. Ini kodenya. Jangan ragu untuk mengomel tentang itu :-)sumber
Saya yakin ada cara yang cukup elegan untuk menyelesaikan masalah ini dengan menggunakan
java.util.concurrent.Semaphore
dan mendelegasikan perilakuExecutor.newFixedThreadPool
. Layanan eksekutor baru hanya akan menjalankan tugas baru jika ada utas untuk melakukannya. Pemblokiran dikelola oleh Semaphore dengan jumlah izin sama dengan jumlah utas. Ketika tugas selesai, itu mengembalikan izin.sumber
Saya memiliki kebutuhan yang sama di masa lalu: semacam antrean pemblokiran dengan ukuran tetap untuk setiap klien yang didukung oleh kumpulan utas bersama. Saya akhirnya menulis jenis ThreadPoolExecutor saya sendiri:
UserThreadPoolExecutor (antrian pemblokiran (per klien) + threadpool (dibagikan di antara semua klien))
Lihat: https://github.com/d4rxh4wx/UserThreadPoolExecutor
Setiap UserThreadPoolExecutor diberi jumlah maksimum utas dari ThreadPoolExecutor bersama
Setiap UserThreadPoolExecutor dapat:
sumber
Saya menemukan kebijakan penolakan ini di klien pencarian elastis. Ini memblokir utas pemanggil saat memblokir antrian. Kode di bawah-
sumber
Saya baru-baru ini memiliki kebutuhan untuk mencapai sesuatu yang serupa, tetapi pada a
ScheduledExecutorService
.Saya juga harus memastikan bahwa saya menangani penundaan yang diteruskan pada metode dan memastikan bahwa tugas dikirim untuk dieksekusi pada saat yang diharapkan pemanggil atau hanya gagal sehingga melempar
RejectedExecutionException
.Metode lain
ScheduledThreadPoolExecutor
untuk mengeksekusi atau mengirimkan panggilan tugas secara internal#schedule
yang pada gilirannya akan memanggil metode yang diganti.Saya memiliki kodenya di sini, akan menghargai masukan apa pun. https://github.com/AmitabhAwasthi/BlockingScheduler
sumber
Inilah solusi yang tampaknya bekerja dengan sangat baik. Ini disebut NotifyingBlockingThreadPoolExecutor .
Program demo.
Sunting: Ada masalah dengan kode ini, metode await () bermasalah . Memanggil shutdown () + awaitTermination () tampaknya berfungsi dengan baik.
sumber
Saya tidak selalu menyukai CallerRunsPolicy, terutama karena memungkinkan tugas yang ditolak untuk 'melewati antrian' dan dieksekusi sebelum tugas yang dikirimkan sebelumnya. Selain itu, menjalankan tugas pada thread pemanggil mungkin membutuhkan waktu lebih lama daripada menunggu slot pertama tersedia.
Saya memecahkan masalah ini menggunakan RejectedExecutionHandler kustom, yang hanya memblokir utas panggilan sebentar dan kemudian mencoba mengirimkan tugas lagi:
Kelas ini hanya dapat digunakan dalam eksekutor kumpulan benang sebagai RejectedExecutinHandler seperti yang lain, misalnya:
Satu-satunya downside yang saya lihat adalah bahwa utas panggilan mungkin terkunci sedikit lebih lama dari yang benar-benar diperlukan (hingga 250ms). Selain itu, karena eksekutor ini secara efektif dipanggil secara rekursif, waktu tunggu yang sangat lama hingga utas tersedia (jam) dapat mengakibatkan tumpukan berlebih.
Meskipun demikian, saya pribadi menyukai metode ini. Ringkas, mudah dipahami, dan berfungsi dengan baik.
sumber