Saya mencoba untuk memahami mengapa program Java berikut memberikan OutOfMemoryError
, sedangkan program yang sesuai .parallel()
tidak.
System.out.println(Stream
.iterate(1, i -> i+1)
.parallel()
.flatMap(n -> Stream.iterate(n, i -> i+n))
.mapToInt(Integer::intValue)
.limit(100_000_000)
.sum()
);
Saya punya dua pertanyaan:
Apa output yang diinginkan dari program ini?
Tanpa
.parallel()
itu tampaknya ini hanya outputsum(1+2+3+...)
yang berarti bahwa itu hanya "macet" pada aliran pertama di flatMap, yang masuk akal.Dengan paralel, saya tidak tahu apakah ada perilaku yang diharapkan, tetapi dugaan saya adalah bahwa entah bagaimana hal itu menyisipkan aliran pertama
n
atau lebih, di manan
jumlah pekerja paralel. Bisa juga sedikit berbeda berdasarkan perilaku chunking / buffering.Apa yang menyebabkannya kehabisan memori? Saya secara khusus mencoba memahami bagaimana aliran ini diterapkan di bawah tenda.
Saya menduga sesuatu memblokir aliran, sehingga tidak pernah selesai dan dapat menghilangkan nilai yang dihasilkan, tapi saya tidak tahu di mana urutan hal-hal dievaluasi dan di mana buffering terjadi.
Sunting: Dalam kasus yang relevan, saya menggunakan Java 11.
Editt 2: Rupanya hal yang sama terjadi bahkan untuk program yang sederhana IntStream.iterate(1,i->i+1).limit(1000_000_000).parallel().sum()
, jadi itu mungkin ada hubungannya dengan kemalasan limit
daripada flatMap
.
sumber
Jawaban:
Anda mengatakan " tetapi saya tidak tahu di mana urutan hal-hal yang dievaluasi dan di mana buffering terjadi ", yang persisnya adalah apa aliran paralel. Urutan evaluasi tidak ditentukan.
Aspek penting dari contoh Anda adalah
.limit(100_000_000)
. Ini menyiratkan bahwa implementasi tidak bisa hanya menjumlahkan nilai arbitrer, tetapi harus merangkum angka 100.000.000 pertama . Perhatikan bahwa dalam implementasi referensi,.unordered().limit(100_000_000)
tidak mengubah hasil, yang menunjukkan bahwa tidak ada implementasi khusus untuk kasus tidak berurutan, tetapi itu adalah detail implementasi.Sekarang, ketika benang pekerja memproses elemen-elemen tersebut, mereka tidak bisa hanya menjumlahkannya, karena mereka harus tahu elemen mana yang boleh mereka konsumsi, yang tergantung pada berapa banyak elemen yang mendahului beban kerja spesifik mereka. Karena aliran ini tidak mengetahui ukurannya, ini hanya dapat diketahui ketika elemen awalan telah diproses, yang tidak pernah terjadi untuk aliran tanpa batas. Jadi utas pekerja terus buffering untuk saat ini, informasi ini menjadi tersedia.
Pada prinsipnya, ketika sebuah thread pekerja tahu bahwa itu memproses potongan pekerjaan paling kiri¹, itu bisa meringkas elemen segera, menghitungnya, dan memberi sinyal akhir ketika mencapai batas. Jadi Stream dapat berakhir, tetapi ini tergantung pada banyak faktor.
Dalam kasus Anda, skenario yang masuk akal adalah bahwa utas pekerja lainnya lebih cepat dalam mengalokasikan buffer daripada menghitung pekerjaan paling kiri. Dalam skenario ini, perubahan halus pada waktu dapat membuat aliran sesekali kembali dengan nilai.
Saat kami memperlambat semua utas pekerja kecuali yang memproses bungkusan paling kiri, kami dapat membuat aliran berhenti (setidaknya dalam sebagian besar berjalan):
¹ Saya mengikuti saran dari Stuart Marks untuk menggunakan urutan kiri-ke-kanan ketika berbicara tentang urutan pertemuan daripada urutan pemrosesan.
sumber
Files.lines(…)
? Ini telah meningkat secara signifikan di Jawa 9.BufferedReader.lines()
keadaan tertentu (bukan sistem file default, charset khusus, atau ukuran lebih besar dariInteger.MAX_FILES
). Jika salah satu dari ini berlaku, solusi khusus dapat membantu. Ini akan bernilai tanya jawab baru ...Integer.MAX_VALUE
, tentu saja ...Dugaan terbaik saya adalah menambahkan
parallel()
perubahan perilaku internalflatMap()
yang sudah memiliki masalah dievaluasi malas sebelumnya .The
OutOfMemoryError
kesalahan yang Anda memperoleh dilaporkan dalam [JDK-8202307] Mendapatkan java.lang.OutOfMemoryError:. Tumpukan ruang Java saat memanggil Stream.iterator () berikutnya () pada aliran yang menggunakan tak terbatas / Streaming sangat besar di flatMap . Jika Anda melihat tiketnya kurang lebih jejak tumpukan yang sama yang Anda dapatkan. Tiket ditutup sebagai Won't Fix dengan alasan berikut:sumber
OOME disebabkan bukan oleh aliran yang tak terbatas, tetapi oleh fakta bahwa itu bukan .
Yaitu, jika Anda berkomentar
.limit(...)
, itu tidak akan pernah kehabisan memori - tetapi tentu saja, itu tidak akan pernah berakhir juga.Setelah dipisah, aliran hanya dapat melacak jumlah elemen jika mereka terakumulasi di dalam setiap utas (sepertinya akumulator sebenarnya
Spliterators$ArraySpliterator#array
).Sepertinya Anda dapat mereproduksinya tanpa
flatMap
, jalankan saja yang berikut ini-Xmx128m
:Namun, setelah berkomentar
limit()
, itu akan berjalan dengan baik sampai Anda memutuskan untuk menyimpan laptop Anda.Selain detail implementasi aktual, inilah yang saya pikir sedang terjadi:
Dengan
limit
,sum
peredam ingin elemen X pertama untuk dijumlahkan, jadi tidak ada utas yang dapat memancarkan jumlah parsial. Setiap "irisan" (utas) harus mengumpulkan elemen dan meneruskannya. Tanpa batas, tidak ada kendala seperti itu sehingga setiap "irisan" hanya akan menghitung jumlah parsial dari elemen yang didapatnya (selamanya), dengan asumsi itu akan memancarkan hasilnya pada akhirnya.sumber
parallel()
akan digunakan secaraForkJoinPool
internal untuk mencapai paralelisme. TheSpliterator
akan digunakan untuk pekerjaan assign ke masing-masingForkJoin
tugas, saya kira kita dapat memanggil unit kerja di sini sebagai "split".Integer.sum()
, yang digunakan olehIntStream.sum
peredam. Anda akan melihat bahwa panggilan versi tanpa batas berfungsi sepanjang waktu, sementara versi terbatas tidak pernah memanggilnya sebelum OOM.