AtomicInteger recordNumber = new AtomicInteger();
Files.lines(inputFile.toPath(), StandardCharsets.UTF_8)
.map(record -> new Record(recordNumber.incrementAndGet(), record))
.parallel()
.filter(record -> doSomeOperation())
.findFirst()
Ketika saya menulis ini, saya berasumsi bahwa utas akan memunculkan hanya panggilan peta karena paralel ditempatkan setelah peta. Tetapi beberapa baris dalam file mendapatkan nomor catatan yang berbeda untuk setiap eksekusi.
Saya membaca dokumentasi stream Java resmi dan beberapa situs web untuk memahami bagaimana stream bekerja di bawah tenda.
Beberapa pertanyaan:
Java parallel stream bekerja berdasarkan SplitIterator , yang diimplementasikan oleh setiap koleksi seperti ArrayList, LinkedList dll. Ketika kita membangun aliran paralel dari koleksi-koleksi itu, iterator split yang sesuai akan digunakan untuk membagi dan mengulangi koleksi. Ini menjelaskan mengapa paralelisme terjadi pada tingkat sumber input asli (File lines) bukan pada hasil peta (yaitu Rekam pojo). Apakah pemahaman saya benar?
Dalam kasus saya, inputnya adalah aliran file IO. Iterator split mana yang akan digunakan?
Tidak masalah di mana kita menempatkan
parallel()
pipa. Sumber input asli akan selalu dipecah dan operasi perantara yang tersisa akan diterapkan.Dalam hal ini, Java seharusnya tidak mengizinkan pengguna untuk menempatkan operasi paralel di mana saja dalam pipa kecuali pada sumber aslinya. Karena, ini memberikan pemahaman yang salah bagi mereka yang tidak tahu bagaimana java stream bekerja secara internal. Saya tahu
parallel()
operasi akan ditentukan untuk tipe objek Stream dan karenanya, ia bekerja dengan cara ini. Tetapi, lebih baik memberikan beberapa solusi alternatif.Dalam cuplikan kode di atas, saya mencoba menambahkan nomor baris ke setiap catatan dalam file input dan karenanya harus dipesan. Namun, saya ingin menerapkannya
doSomeOperation()
secara paralel karena ini adalah logika yang berat. Satu-satunya cara untuk mencapai adalah menulis iterator split saya sendiri. Apakah ada cara lain?
sumber
parallel()
tidak lebih dari permintaan pengubah umum yang diterapkan ke objek aliran yang mendasarinya. Ingat bahwa hanya ada satu sumber-aliran jika Anda tidak menerapkan operasi akhir ke pipa, yaitu selama tidak ada yang "dieksekusi". Karena itu, Anda pada dasarnya hanya mempertanyakan pilihan desain Java. Yang berdasarkan opini dan kami tidak bisa membantu dengan itu.Stream
antarmuka secara langsung dan karena cascading yang bagus setiap operasi memberikan kembaliStream
lagi. Bayangkan seseorang ingin memberi AndaStream
tetapi sudah menerapkan beberapa operasi sepertimap
itu. Anda, sebagai pengguna, masih ingin dapat memutuskan apakah akan menjalankannya secara paralel atau tidak. Jadi Anda harusparallel()
tetap menelepon , meskipun streaming sudah ada.flatMap
atau jika Anda menjalankan metode thread-tidak aman atau serupa.Path
ada pada sistem file lokal dan Anda menggunakan JDK baru-baru ini, spliterator akan memiliki kemampuan pemrosesan paralel yang lebih baik daripada kumpulan batch 1024. Tetapi pemisahan yang seimbang mungkin bahkan kontraproduktif dalam beberapafindFirst
skenario ...Jawaban:
Seluruh aliran bersifat paralel atau berurutan. Kami tidak memilih subset operasi untuk dijalankan secara berurutan atau paralel.
Seperti yang Anda sebutkan, stream paralel menggunakan iterator split. Jelas, ini untuk mempartisi data sebelum operasi mulai berjalan.
Melihat sumbernya, saya melihatnya menggunakan
java.nio.file.FileChannelLinesSpliterator
Baik. Anda bahkan dapat menelepon
parallel()
dansequential()
beberapa kali. Yang dipanggil terakhir akan menang. Saat kami meneleponparallel()
, kami mengaturnya untuk aliran yang dikembalikan; dan seperti yang dinyatakan di atas, semua operasi berjalan secara berurutan atau paralel.Ini menjadi masalah pendapat. Saya pikir Zabuza memberikan alasan yang bagus untuk mendukung pilihan desainer JDK.
Ini tergantung pada operasi Anda
findFirst()
operasi terminal Anda sebenarnya, maka Anda bahkan tidak perlu khawatir tentang eksekusi paralel, karena bagaimanapun juga tidak akan ada banyak panggilandoSomething()
(findFirst()
adalah hubungan arus pendek)..parallel()
sebenarnya dapat menyebabkan lebih dari satu elemen diproses, sementarafindFirst()
pada aliran sekuensial akan mencegahnya.Jika operasi terminal Anda tidak menghasilkan banyak data, maka mungkin Anda bisa membuat
Record
objek menggunakan aliran sekuensial, kemudian memproses hasilnya secara paralel:Jika pipa Anda akan memuat banyak data dalam memori (yang mungkin menjadi alasan Anda menggunakan
Files.lines()
), maka mungkin Anda akan memerlukan iterator pemecah kustom. Sebelum saya pergi ke sana, saya akan melihat opsi lain (seperti garis simpanan dengan kolom id untuk memulai - itu hanya pendapat saya).Saya juga mencoba memproses catatan dalam kumpulan yang lebih kecil, seperti ini:
Ini dijalankan
doSomeOperation()
secara paralel tanpa memuat semua data ke dalam memori. Tetapi perhatikan bahwabatchSize
perlu dipikirkan.sumber
Spliterator
Implementasi kustom tidak akan lebih rumit dari ini, sementara memungkinkan pemrosesan paralel yang lebih efisien ...parallelStream
operasi internal Anda memiliki overhead tetap untuk memulai operasi dan menunggu hasil akhirnya, sementara dibatasi pada paralelismebatchSize
. Pertama, Anda perlu kelipatan jumlah inti CPU yang tersedia saat ini untuk menghindari utas menganggur. Kemudian, jumlahnya harus cukup tinggi untuk mengkompensasi overhead tetap, tetapi semakin tinggi jumlahnya, semakin tinggi jeda yang diberlakukan oleh operasi pembacaan sekuensial terjadi sebelum pemrosesan paralel dimulai.Stream.generate
menghasilkan aliran tidak teratur, yang tidak berfungsi dengan kasus penggunaan yang dimaksudkan OP sepertifindFirst()
. Sebaliknya, aliran paralel tunggal dengan spliterator yang mengembalikan bongkahan dalamtrySplit
pekerjaan lurus ke depan dan memungkinkan pekerja thread untuk memproses bongkahan berikutnya tanpa menunggu penyelesaian sebelumnya.findFirst()
operasi hanya akan memproses sejumlah kecil elemen. Kecocokan pertama mungkin masih terjadi setelah memproses 90% dari semua elemen. Selanjutnya, ketika memiliki sepuluh juta baris, bahkan menemukan kecocokan setelah 10% masih membutuhkan pemrosesan satu juta baris.Desain Stream asli menyertakan ide untuk mendukung tahapan pipa berikutnya dengan pengaturan eksekusi paralel yang berbeda, tetapi ide ini telah ditinggalkan. API dapat berasal dari saat ini, tetapi di sisi lain, desain API yang memaksa penelepon untuk membuat keputusan tunggal untuk eksekusi paralel atau berurutan akan jauh lebih rumit.
Yang sebenarnya
Spliterator
digunakan olehFiles.lines(…)
tergantung pada implementasi. Di Java 8 (Oracle atau OpenJDK), Anda selalu mendapatkan yang sama denganBufferedReader.lines()
. Di JDK yang lebih baru, jikaPath
milik filesystem default dan charset adalah salah satu yang didukung untuk fitur ini, Anda mendapatkan Stream denganSpliterator
implementasi khusus , thejava.nio.file.FileChannelLinesSpliterator
. Jika prasyarat tidak terpenuhi, Anda mendapatkan yang sama denganBufferedReader.lines()
, yang masih didasarkan pada yangIterator
diterapkan di dalamBufferedReader
dan dibungkus melaluiSpliterators.spliteratorUnknownSize
.Tugas spesifik Anda paling baik ditangani dengan kebiasaan
Spliterator
yang dapat melakukan penomoran baris langsung pada sumbernya, sebelum pemrosesan paralel, untuk memungkinkan pemrosesan paralel berikutnya tanpa batasan.sumber
Dan berikut ini adalah demonstrasi sederhana ketika penerapan paralel diterapkan. Output dari mengintip dengan jelas menunjukkan perbedaan antara dua contoh. Catatan:
map
Panggilan baru saja dilemparkan untuk menambahkan metode lain sebelumparallel
.sumber