Apakah mungkin untuk menentukan kumpulan utas khusus untuk aliran paralel Java 8 ? Saya tidak dapat menemukannya di mana pun.
Bayangkan saya memiliki aplikasi server dan saya ingin menggunakan aliran paralel. Tetapi aplikasinya besar dan multi-utas jadi saya ingin mengelompokkannya. Saya tidak ingin tugas berjalan lambat dalam satu modul tugas aplikasiblock dari modul lain.
Jika saya tidak dapat menggunakan kumpulan utas yang berbeda untuk modul yang berbeda, itu berarti saya tidak dapat menggunakan aliran paralel secara aman di sebagian besar situasi dunia nyata.
Coba contoh berikut ini. Ada beberapa tugas intensif CPU yang dijalankan dalam utas terpisah. Tugas memanfaatkan aliran paralel. Tugas pertama rusak, sehingga setiap langkah membutuhkan waktu 1 detik (disimulasikan oleh thread sleep). Masalahnya adalah bahwa utas lainnya macet dan menunggu tugas yang rusak selesai. Ini adalah contoh yang dibuat-buat, tetapi bayangkan aplikasi servlet dan seseorang yang mengirimkan tugas yang sudah berjalan lama ke pool bersama garpu gabungan.
public class ParallelTest {
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
es.execute(() -> runTask(1000)); //incorrect task
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.shutdown();
es.awaitTermination(60, TimeUnit.SECONDS);
}
private static void runTask(int delay) {
range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
.ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
}
public static boolean isPrime(long n) {
return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
}
}
Jawaban:
Sebenarnya ada trik bagaimana menjalankan operasi paralel di kumpulan fork-join tertentu. Jika Anda menjalankannya sebagai tugas di kumpulan garpu-bergabung, itu tetap di sana dan tidak menggunakan yang umum.
Trik ini didasarkan pada ForkJoinTask.fork yang menetapkan: "Mengatur untuk menjalankan tugas ini secara serempak di kumpulan tugas yang sedang berjalan, jika ada, atau menggunakan ForkJoinPool.commonPool () jika tidak diForkJoinPool ()"
sumber
ForkJoinPool
atau apakah itu detail implementasi? Tautan ke dokumentasi akan lebih baik.ForkJoinPool
instance harusshutdown()
ketika tidak diperlukan lagi untuk menghindari kebocoran thread. (contoh)Aliran paralel menggunakan default
ForkJoinPool.commonPool
yang secara default memiliki satu utas lebih sedikit saat Anda memiliki prosesor , seperti yang dikembalikan olehRuntime.getRuntime().availableProcessors()
(Ini berarti aliran paralel menggunakan semua prosesor Anda karena mereka juga menggunakan utas utama):Ini juga berarti jika Anda memiliki stream paralel bersarang atau beberapa stream paralel mulai bersamaan, mereka semua akan berbagi kumpulan yang sama. Keuntungan: Anda tidak akan pernah menggunakan lebih dari standar (jumlah prosesor yang tersedia). Kerugian: Anda mungkin tidak mendapatkan "semua prosesor" yang ditetapkan untuk setiap aliran paralel yang Anda mulai (jika Anda memiliki lebih dari satu). (Rupanya Anda dapat menggunakan ManagedBlocker untuk mengelak dari itu.)
Untuk mengubah cara aliran paralel dieksekusi, Anda bisa
yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get();
atauSystem.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20")
untuk paralelisme target 20 utas. Namun, ini tidak lagi berfungsi setelah tambalan backported https://bugs.openjdk.java.net/browse/JDK-8190974 .Contoh yang terakhir pada mesin saya yang memiliki 8 prosesor. Jika saya menjalankan program berikut:
Outputnya adalah:
Jadi, Anda dapat melihat bahwa aliran paralel memproses 8 item sekaligus, yaitu menggunakan 8 utas. Namun, jika saya batalkan komentar pada baris komentar, hasilnya adalah:
Kali ini, aliran paralel telah menggunakan 20 utas dan semua 20 elemen dalam aliran telah diproses secara bersamaan.
sumber
commonPool
memiliki sebenarnya salah satu kurang dariavailableProcessors
, sehingga total paralelisme sama untukavailableProcessors
karena jumlah benang memanggil sebagai salah satu.ForkJoinTask
. Untuk meniruparallel()
get()
diperlukan:stream.parallel().forEach(soSomething)).get();
ForkJoinPool.submit(() -> stream.forEach(...))
akan menjalankan tindakan Stream saya dengan yang diberikanForkJoinPool
. Saya berharap bahwa seluruh Stream-Action dijalankan di ForJoinPool sebagai SATU tindakan, tetapi secara internal masih menggunakan default / umum ForkJoinPool. Di mana Anda melihat, bahwa ForkJoinPool.submit () akan melakukan apa yang Anda katakan?Atau dengan trik memicu komputasi paralel di dalam forkJoinPool Anda sendiri, Anda juga dapat meneruskan kumpulan itu ke metode CompletableFuture.supplyAsync seperti di:
sumber
Solusi asli (pengaturan properti paralelisme umum ForkJoinPool) tidak lagi berfungsi. Melihat tautan dalam jawaban asli, pembaruan yang memecah ini telah kembali porting ke Java 8. Seperti yang disebutkan dalam utas yang tertaut, solusi ini tidak dijamin berfungsi selamanya. Berdasarkan itu, solusinya adalah forkjoinpool.submit dengan .get solusi yang dibahas dalam jawaban yang diterima. Saya pikir backport memperbaiki tidak dapat diandalkannya solusi ini juga.
sumber
ForkJoinPool.commonPool().getParallelism()
dalam mode debug.unreported exception InterruptedException; must be caught or declared to be thrown
bahkan dengan semuacatch
pengecualian di loop.Kami dapat mengubah paralelisme default menggunakan properti berikut:
yang dapat mengatur untuk menggunakan lebih banyak paralelisme.
sumber
Untuk mengukur jumlah utas yang sebenarnya digunakan, Anda dapat memeriksa
Thread.activeCount()
:Ini dapat menghasilkan output CPU 4-core seperti:
Tanpa
.parallel()
itu memberi:sumber
Sampai sekarang, saya menggunakan solusi yang dijelaskan dalam jawaban dari pertanyaan ini. Sekarang, saya datang dengan perpustakaan kecil bernama Parallel Stream Support untuk itu:
Tetapi seperti yang ditunjukkan oleh @PabloMatiasGomez dalam komentar, ada kelemahan terkait mekanisme pemisahan aliran paralel yang sangat bergantung pada ukuran kumpulan umum. Lihat Aliran paralel dari HashSet tidak berjalan secara paralel .
Saya menggunakan solusi ini hanya untuk memiliki kolam yang terpisah untuk berbagai jenis pekerjaan tetapi saya tidak dapat mengatur ukuran kolam yang umum menjadi 1 bahkan jika saya tidak menggunakannya.
sumber
Catatan: Tampaknya ada perbaikan yang diterapkan di JDK 10 yang memastikan Custom Thread Pool menggunakan jumlah utas yang diharapkan.
Eksekusi aliran paralel dalam ForkJoinPool khusus harus mematuhi paralelisme https://bugs.openjdk.java.net/browse/JDK-8190974
sumber
Saya mencoba ForkJoinPool kustom sebagai berikut untuk menyesuaikan ukuran kolam:
Ini adalah output yang mengatakan bahwa pool menggunakan lebih banyak thread daripada default 4 .
Tetapi sebenarnya ada yang aneh , ketika saya mencoba untuk mencapai hasil yang sama menggunakan
ThreadPoolExecutor
sebagai berikut:tapi saya gagal.
Ini hanya akan memulai parallelStream di utas baru dan kemudian yang lainnya sama, yang sekali lagi membuktikan bahwa
parallelStream
akan menggunakan ForkJoinPool untuk memulai utas anaknya.sumber
Pergi untuk mendapatkan AbacusUtil . Nomor utas dapat ditentukan untuk aliran paralel. Berikut ini contoh kode:
Pengungkapan : Saya pengembang AbacusUtil.
sumber
Jika Anda tidak ingin mengandalkan hacks implementasi, selalu ada cara untuk mencapai hal yang sama dengan menerapkan kolektor khusus yang akan menggabungkan
map
dancollect
semantik ... dan Anda tidak akan terbatas pada ForkJoinPool:Untungnya, sudah selesai di sini dan tersedia di Maven Central: http://github.com/pivovarit/parallel-collectors
Penafian: Saya menulisnya dan bertanggung jawab untuk itu.
sumber
Jika Anda tidak keberatan menggunakan perpustakaan pihak ketiga, dengan cyclops-react Anda dapat mencampur Streaming berurutan dan paralel dalam pipa yang sama dan memberikan ForkJoinPools khusus. Sebagai contoh
Atau jika kami ingin melanjutkan pemrosesan dalam aliran berurutan
[Pengungkapan Saya adalah pengembang utama cyclops-react]
sumber
Jika Anda tidak memerlukan ThreadPool khusus tetapi Anda ingin membatasi jumlah tugas bersamaan, Anda dapat menggunakan:
(Pertanyaan rangkap yang meminta ini dikunci, jadi tolong bawa saya ke sini)
sumber
Anda dapat mencoba mengimplementasikan ForkJoinWorkerThreadFactory ini dan menyuntikkannya ke kelas Fork-Join.
Anda dapat menggunakan konstruktor kolam Fork-Join ini untuk melakukan ini.
Catatan: - 1. jika Anda menggunakan ini, pertimbangkan bahwa berdasarkan implementasi Anda dari thread baru, penjadwalan dari JVM akan terpengaruh, yang umumnya menjadwalkan fork-join threads ke core yang berbeda (diperlakukan sebagai utas komputasi). 2. penjadwalan tugas dengan garpu-gabung ke utas tidak akan terpengaruh. 3. Belum mengetahui bagaimana stream paralel memetik thread dari fork-join (tidak dapat menemukan dokumentasi yang tepat di dalamnya), jadi coba gunakan pabrik threadNaming yang berbeda untuk memastikan, jika thread dalam stream paralel diambil dari customThreadFactory yang Anda berikan. 4. commonThreadPool tidak akan menggunakan customThreadFactory ini.
sumber