Saya merasa frustrasi selama beberapa waktu dengan perilaku default ThreadPoolExecutor
yang mendukung ExecutorService
kumpulan thread yang digunakan oleh banyak dari kita. Mengutip dari Javadocs:
Jika ada lebih dari corePoolSize tetapi kurang dari maximumPoolSize utas yang berjalan, utas baru akan dibuat hanya jika antrian penuh .
Artinya, jika Anda menentukan kumpulan utas dengan kode berikut, itu tidak akan pernah memulai utas ke-2 karena LinkedBlockingQueue
tidak dibatasi.
ExecutorService threadPool =
new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(/* unlimited queue */));
Hanya jika Anda memiliki antrian terbatas dan antrian penuh , barulah utas di atas nomor inti dimulai. Saya menduga sejumlah besar programmer multithread Java junior tidak menyadari perilaku file ThreadPoolExecutor
.
Sekarang saya memiliki kasus penggunaan khusus di mana ini tidak optimal. Saya mencari cara, tanpa menulis kelas TPE saya sendiri, untuk mengatasinya.
Persyaratan saya adalah untuk layanan web yang membuat panggilan balik ke pihak ke-3 yang mungkin tidak dapat diandalkan.
- Saya tidak ingin melakukan panggilan balik secara sinkron dengan permintaan web, jadi saya ingin menggunakan kumpulan utas.
- Saya biasanya mendapatkan beberapa ini sebentar jadi saya tidak ingin memiliki
newFixedThreadPool(...)
sejumlah besar utas yang sebagian besar tidak aktif. - Seringkali saya mendapatkan lonjakan lalu lintas ini dan saya ingin meningkatkan jumlah utas ke beberapa nilai maksimal (katakanlah 50).
- Saya perlu melakukan upaya terbaik untuk melakukan semua panggilan balik jadi saya ingin mengantri panggilan tambahan di atas 50. Saya tidak ingin membebani server web saya yang lain dengan menggunakan file
newCachedThreadPool()
.
Bagaimana saya bisa mengatasi batasan ini di ThreadPoolExecutor
mana antrian perlu dibatasi dan penuh sebelum lebih banyak utas akan dimulai? Bagaimana saya bisa membuatnya memulai lebih banyak utas sebelum mengantri tugas?
Edit:
@Flavio membuat poin yang baik tentang menggunakan ThreadPoolExecutor.allowCoreThreadTimeOut(true)
agar utas inti batas waktu dan keluar. Saya mempertimbangkannya tetapi saya masih menginginkan fitur core-threads. Saya tidak ingin jumlah utas di kolam turun di bawah ukuran inti jika memungkinkan.
Jawaban:
Saya yakin saya akhirnya menemukan solusi yang agak elegan (mungkin sedikit hacky) untuk batasan ini dengan
ThreadPoolExecutor
. Ini melibatkan memperluasLinkedBlockingQueue
untuk memilikinya kembalifalse
untukqueue.offer(...)
saat ini sudah ada beberapa tugas antri. Jika utas saat ini tidak mengikuti tugas yang diantrekan, TPE akan menambahkan utas tambahan. Jika kumpulan sudah di utas maksimal, makaRejectedExecutionHandler
akan dipanggil. Ini adalah pawang yang kemudian memasukkanput(...)
ke dalam antrian.Memang aneh menulis antrian di mana
offer(...)
bisa kembalifalse
danput()
tidak pernah diblokir jadi itu bagian hacknya. Tapi ini bekerja dengan baik dengan penggunaan antrian TPE jadi saya tidak melihat ada masalah dengan melakukan ini.Berikut kodenya:
Dengan mekanisme ini, ketika saya mengirimkan tugas ke antrian,
ThreadPoolExecutor
keinginan:offer(...)
akan mengembalikan false.RejectedExecutionHandler
RejectedExecutionHandler
menempatkan maka tugas ke dalam antrian untuk diproses oleh thread pertama yang tersedia dalam rangka FIFO.Meskipun dalam kode contoh saya di atas, antrian tidak dibatasi, Anda juga dapat mendefinisikannya sebagai antrian berbatas. Misalnya, jika Anda menambahkan kapasitas 1000
LinkedBlockingQueue
maka itu akan:Juga, jika Anda perlu menggunakan
offer(...)
dalamRejectedExecutionHandler
maka Anda bisa menggunakanoffer(E, long, TimeUnit)
metode sebagai gantinya denganLong.MAX_VALUE
sebagai batas waktu.Peringatan:
Jika Anda mengharapkan tugas ditambahkan ke eksekutor setelah dimatikan, maka Anda mungkin ingin lebih pintar membuang
RejectedExecutionException
kebiasaan kamiRejectedExecutionHandler
ketika layanan-pelaksana telah dimatikan. Terima kasih kepada @RaduToader karena telah menunjukkan hal ini.Edit:
Perubahan lain untuk jawaban ini adalah menanyakan TPE jika ada utas yang tidak digunakan dan hanya mengantrekan item jika ada. Anda harus membuat kelas yang benar untuk ini dan menambahkan
ourQueue.setThreadPoolExecutor(tpe);
metode di atasnya.Maka
offer(...)
metode Anda mungkin terlihat seperti:tpe.getPoolSize() == tpe.getMaximumPoolSize()
dalam hal ini panggil sajasuper.offer(...)
.tpe.getPoolSize() > tpe.getActiveCount()
kemudian panggilsuper.offer(...)
karena tampaknya ada utas yang menganggur.false
ke garpu utas lain.Mungkin ini:
Perhatikan bahwa metode get pada TPE mahal karena mereka mengakses
volatile
bidang atau (dalam kasusgetActiveCount()
) mengunci TPE dan menjalankan daftar utas. Selain itu, ada kondisi balapan di sini yang dapat menyebabkan tugas diantrekan dengan tidak semestinya atau utas lain bercabang ketika ada utas yang tidak aktif.sumber
Queue
untuk mencapai ini, Anda tentu tidak sendirian dalam gagasan Anda: groovy-programming.com/post/26923146865execute(runnable)
, makarunnable
hanya ditambahkan ke antrian. Jika Anda meneleponexecute(secondRunnable)
, kemudiansecondRunnable
ditambahkan ke antrian. Tapi sekarang jika Anda meneleponexecute(thirdRunnable)
, makathirdRunnable
akan dijalankan di thread baru. Therunnable
dansecondRunnable
hanya berjalan sekalithirdRunnable
(atau asli lama berjalan tugas) selesai.Tetapkan ukuran inti dan ukuran maksimal ke nilai yang sama, dan izinkan utas inti dilepas dari kumpulan
allowCoreThreadTimeOut(true)
.sumber
Saya sudah mendapat dua jawaban lain untuk pertanyaan ini, tetapi saya rasa yang ini adalah yang terbaik.
Ini berdasarkan teknik jawaban yang diterima saat ini , yaitu:
offer()
untuk (terkadang) mengembalikan false,ThreadPoolExecutor
munculnya utas baru atau menolak tugas, danRejectedExecutionHandler
untuk benar - benar mengantrekan tugas pada penolakan.Masalahnya adalah kapan
offer()
harus mengembalikan false. Jawaban yang diterima saat ini mengembalikan false ketika antrian memiliki beberapa tugas di atasnya, tetapi seperti yang saya tunjukkan dalam komentar saya di sana, ini menyebabkan efek yang tidak diinginkan. Bergantian, jika Anda selalu mengembalikan false, Anda akan terus memunculkan utas baru bahkan saat utas menunggu di antrean.Solusinya adalah dengan menggunakan Java 7
LinkedTransferQueue
dan memilikioffer()
panggilantryTransfer()
. Ketika ada utas konsumen yang menunggu, tugas hanya akan diteruskan ke utas itu. Jika tidak,offer()
akan mengembalikan false danThreadPoolExecutor
akan menelurkan utas baru.sumber
queue.offer()
, karena sebenarnya memanggilLinkedTransferQueue.tryTransfer()
, akan mengembalikan false dan tidak mengantrekan tugas. NamunRejectedExecutionHandler
panggilanqueue.put()
, yang tidak gagal dan mengantrekan tugas.Catatan: Sekarang saya lebih suka dan merekomendasikan jawaban saya yang lain .
Berikut adalah versi yang menurut saya jauh lebih mudah: Tingkatkan corePoolSize (hingga batas maximumPoolSize) setiap kali tugas baru dijalankan, lalu kurangi corePoolSize (turun ke batas "ukuran kumpulan inti" yang ditentukan pengguna) setiap kali tugas selesai.
Dengan kata lain, lacak jumlah tugas yang berjalan atau diantrekan, dan pastikan corePoolSize sama dengan jumlah tugas selama berada di antara "ukuran kumpulan inti" yang ditentukan pengguna dan maximumPoolSize.
Seperti yang tertulis, kelas tidak mendukung pengubahan corePoolSize atau maximumPoolSize yang ditentukan pengguna setelah konstruksi, dan tidak mendukung manipulasi antrean pekerjaan secara langsung atau melalui
remove()
ataupurge()
.sumber
synchronized
bloknya. Bisakah Anda menelepon ke antrian untuk mendapatkan jumlah tugas. Atau mungkin menggunakanAtomicInteger
?execute()
panggilan di utas terpisah, masing-masing akan (1) mencari tahu berapa banyak utas yang dibutuhkan, (2)setCorePoolSize
ke nomor itu, dan (3) panggilansuper.execute()
. Jika langkah (1) dan (2) tidak disinkronkan, saya tidak yakin bagaimana mencegah pengurutan yang tidak menguntungkan di mana Anda mengatur ukuran kumpulan inti ke angka yang lebih rendah setelah angka yang lebih tinggi. Dengan akses langsung ke bidang superclass ini bisa dilakukan dengan menggunakan bandingkan-dan-set, tapi saya tidak melihat cara yang bersih untuk melakukannya di subclass tanpa sinkronisasi.taskCount
lapangannya valid (mis. AAtomicInteger
). Jika dua utas menghitung ulang ukuran kumpulan segera setelah satu sama lain, itu harus mendapatkan nilai yang tepat. Jika yang kedua menyusutkan utas inti maka itu pasti melihat penurunan dalam antrian atau sesuatu.execute()
. Masing-masing akan meneleponatomicTaskCount.incrementAndGet()
dan mereka akan mendapatkan 10 dan 11 masing-masing. Tetapi tanpa sinkronisasi (lebih dari mendapatkan jumlah tugas dan mengatur ukuran kumpulan inti), Anda kemudian bisa mendapatkan (1) tugas 11 menetapkan ukuran kumpulan inti menjadi 11, (2) tugas 10 menetapkan ukuran kumpulan inti menjadi 10, (3) tugas 10 panggilansuper.execute()
, (4) tugas 11 panggilansuper.execute()
dan antrean.Kami memiliki subclass
ThreadPoolExecutor
yang membutuhkan tambahancreationThreshold
dan penggantianexecute
.mungkin itu membantu juga, tapi milikmu tentu saja terlihat lebih berseni ...
sumber
offer(...)
metode hanya mengembalikan secarafalse
bersyarat. Terima kasih!Jawaban yang disarankan hanya menyelesaikan satu (1) masalah dengan kumpulan utas JDK:
Kumpulan benang JDK bias terhadap antrian. Jadi, alih-alih menelurkan utas baru, mereka akan mengantrekan tugas. Hanya jika antrian mencapai batasnya, kumpulan utas akan menelurkan utas baru.
Penarikan benang tidak terjadi saat beban berkurang. Misalnya jika kita memiliki ledakan pekerjaan yang mengenai kumpulan yang menyebabkan kumpulan menjadi maksimal, diikuti dengan beban ringan maks 2 tugas sekaligus, kumpulan akan menggunakan semua utas untuk melayani beban ringan yang mencegah penarikan utas. (hanya dibutuhkan 2 utas ...)
Tidak senang dengan perilaku di atas, saya melanjutkan dan menerapkan kumpulan untuk mengatasi kekurangan di atas.
Untuk mengatasi 2) Menggunakan penjadwalan Lifo menyelesaikan masalah. Ide ini dipresentasikan oleh Ben Maurer pada konferensi aplikatif ACM 2015: Skala Sistem @ Facebook
Maka lahirlah implementasi baru:
LifoThreadPoolExecutorSQP
Sejauh ini, implementasi ini meningkatkan kinerja eksekusi asinkron untuk ZEL .
Implementasinya spin mampu mengurangi overhead pengalih konteks, menghasilkan kinerja yang unggul untuk kasus penggunaan tertentu.
Semoga membantu ...
PS: JDK Fork Join Pool mengimplementasikan ExecutorService dan berfungsi sebagai kumpulan thread "normal", Implementasinya berkinerja baik, Menggunakan penjadwalan LIFO Thread, namun tidak ada kontrol atas ukuran antrian internal, batas waktu pensiun ..., dan yang terpenting tugas tidak dapat dilakukan terputus saat membatalkannya
sumber
Catatan: Sekarang saya lebih suka dan merekomendasikan jawaban saya yang lain .
Saya punya proposal lain, mengikuti ide asli untuk mengubah antrian menjadi salah. Dalam hal ini semua tugas dapat memasuki antrean, tetapi setiap kali tugas diantrekan setelahnya
execute()
, kami mengikutinya dengan tugas no-op sentinel yang ditolak antrian, menyebabkan utas baru muncul, yang akan mengeksekusi no-op segera diikuti oleh sesuatu dari antrian.Karena thread pekerja mungkin sedang melakukan polling
LinkedBlockingQueue
untuk tugas baru, tugas mungkin akan diantrekan bahkan saat ada thread yang tersedia. Untuk menghindari pemijahan utas baru bahkan ketika ada utas yang tersedia, kita perlu melacak berapa banyak utas yang menunggu tugas baru di antrean, dan hanya menelurkan utas baru ketika ada lebih banyak tugas di antrean daripada utas menunggu.sumber
Solusi terbaik yang dapat saya pikirkan adalah memperpanjang.
ThreadPoolExecutor
menawarkan beberapa metode hook:beforeExecute
danafterExecute
. Dalam ekstensi Anda, Anda dapat mempertahankan menggunakan antrian terbatas untuk memasukkan tugas dan antrian tak terbatas kedua untuk menangani overflow. Saat seseorang meneleponsubmit
, Anda dapat mencoba menempatkan permintaan ke dalam antrian terbatas. Jika Anda bertemu dengan pengecualian, Anda cukup menempelkan tugas di antrian overflow Anda. Anda kemudian dapat menggunakanafterExecute
hook untuk melihat apakah ada sesuatu dalam antrian overflow setelah menyelesaikan tugas. Dengan cara ini, pelaksana akan menangani hal-hal yang ada di antrean terbatasnya terlebih dahulu, dan secara otomatis menarik dari antrean tak terbatas ini jika waktu mengizinkan.Sepertinya lebih banyak pekerjaan daripada solusi Anda, tetapi setidaknya ini tidak melibatkan pemberian perilaku yang tidak terduga. Saya juga membayangkan bahwa ada cara yang lebih baik untuk memeriksa status antrian dan utas daripada mengandalkan pengecualian, yang cukup lambat untuk dilempar.
sumber
Catatan: Untuk JDK ThreadPoolExecutor saat Anda memiliki antrean terbatas, Anda hanya membuat utas baru saat penawaran mengembalikan false. Anda mungkin mendapatkan sesuatu yang berguna dengan CallerRunsPolicy yang membuat sedikit BackPressure dan panggilan langsung dijalankan di thread pemanggil.
Saya membutuhkan tugas untuk dieksekusi dari utas yang dibuat oleh kolam dan memiliki antrian di mana-mana untuk penjadwalan, sementara jumlah utas di dalam kumpulan dapat bertambah atau menyusut antara corePoolSize dan maximumPoolSize jadi ...
Saya akhirnya melakukan copy paste penuh dari ThreadPoolExecutor dan mengubah sedikit metode eksekusi karena sayangnya ini tidak dapat dilakukan dengan ekstensi (ini disebut metode pribadi).
Saya tidak ingin menelurkan utas baru segera ketika permintaan baru tiba dan semua utas sibuk (karena saya memiliki tugas umum yang berumur pendek). Saya telah menambahkan ambang tetapi jangan ragu untuk mengubahnya sesuai kebutuhan Anda (mungkin untuk sebagian besar IO lebih baik menghapus ambang ini)
sumber