Parallel Infinite Java Streaming kehabisan Memori

16

Saya mencoba untuk memahami mengapa program Java berikut memberikan OutOfMemoryError, sedangkan program yang sesuai .parallel()tidak.

System.out.println(Stream
    .iterate(1, i -> i+1)
    .parallel()
    .flatMap(n -> Stream.iterate(n, i -> i+n))
    .mapToInt(Integer::intValue)
    .limit(100_000_000)
    .sum()
);

Saya punya dua pertanyaan:

  1. Apa output yang diinginkan dari program ini?

    Tanpa .parallel()itu tampaknya ini hanya output sum(1+2+3+...)yang berarti bahwa itu hanya "macet" pada aliran pertama di flatMap, yang masuk akal.

    Dengan paralel, saya tidak tahu apakah ada perilaku yang diharapkan, tetapi dugaan saya adalah bahwa entah bagaimana hal itu menyisipkan aliran pertama natau lebih, di mana njumlah pekerja paralel. Bisa juga sedikit berbeda berdasarkan perilaku chunking / buffering.

  2. Apa yang menyebabkannya kehabisan memori? Saya secara khusus mencoba memahami bagaimana aliran ini diterapkan di bawah tenda.

    Saya menduga sesuatu memblokir aliran, sehingga tidak pernah selesai dan dapat menghilangkan nilai yang dihasilkan, tapi saya tidak tahu di mana urutan hal-hal dievaluasi dan di mana buffering terjadi.

Sunting: Dalam kasus yang relevan, saya menggunakan Java 11.

Editt 2: Rupanya hal yang sama terjadi bahkan untuk program yang sederhana IntStream.iterate(1,i->i+1).limit(1000_000_000).parallel().sum(), jadi itu mungkin ada hubungannya dengan kemalasan limitdaripada flatMap.

Thomas Ahle
sumber
parallel () secara internal menggunakan ForkJoinPool. Saya kira ForkJoin Framework ada di Jawa dari Java 7
aravind

Jawaban:

9

Anda mengatakan " tetapi saya tidak tahu di mana urutan hal-hal yang dievaluasi dan di mana buffering terjadi ", yang persisnya adalah apa aliran paralel. Urutan evaluasi tidak ditentukan.

Aspek penting dari contoh Anda adalah .limit(100_000_000). Ini menyiratkan bahwa implementasi tidak bisa hanya menjumlahkan nilai arbitrer, tetapi harus merangkum angka 100.000.000 pertama . Perhatikan bahwa dalam implementasi referensi, .unordered().limit(100_000_000)tidak mengubah hasil, yang menunjukkan bahwa tidak ada implementasi khusus untuk kasus tidak berurutan, tetapi itu adalah detail implementasi.

Sekarang, ketika benang pekerja memproses elemen-elemen tersebut, mereka tidak bisa hanya menjumlahkannya, karena mereka harus tahu elemen mana yang boleh mereka konsumsi, yang tergantung pada berapa banyak elemen yang mendahului beban kerja spesifik mereka. Karena aliran ini tidak mengetahui ukurannya, ini hanya dapat diketahui ketika elemen awalan telah diproses, yang tidak pernah terjadi untuk aliran tanpa batas. Jadi utas pekerja terus buffering untuk saat ini, informasi ini menjadi tersedia.

Pada prinsipnya, ketika sebuah thread pekerja tahu bahwa itu memproses potongan pekerjaan paling kiri¹, itu bisa meringkas elemen segera, menghitungnya, dan memberi sinyal akhir ketika mencapai batas. Jadi Stream dapat berakhir, tetapi ini tergantung pada banyak faktor.

Dalam kasus Anda, skenario yang masuk akal adalah bahwa utas pekerja lainnya lebih cepat dalam mengalokasikan buffer daripada menghitung pekerjaan paling kiri. Dalam skenario ini, perubahan halus pada waktu dapat membuat aliran sesekali kembali dengan nilai.

Saat kami memperlambat semua utas pekerja kecuali yang memproses bungkusan paling kiri, kami dapat membuat aliran berhenti (setidaknya dalam sebagian besar berjalan):

System.out.println(IntStream
    .iterate(1, i -> i+1)
    .parallel()
    .peek(i -> { if(i != 1) LockSupport.parkNanos(1_000_000_000); })
    .flatMap(n -> IntStream.iterate(n, i -> i+n))
    .limit(100_000_000)
    .sum()
);

¹ Saya mengikuti saran dari Stuart Marks untuk menggunakan urutan kiri-ke-kanan ketika berbicara tentang urutan pertemuan daripada urutan pemrosesan.

Holger
sumber
Jawaban yang sangat bagus! Saya bertanya-tanya apakah ada risiko bahwa semua utas mulai menjalankan operasi flatMap, dan tidak ada yang dialokasikan untuk benar-benar mengosongkan buffer (menjumlahkan)? Dalam kasus saya yang sebenarnya, infinite stream malah file yang terlalu besar untuk disimpan dalam memori. Saya bertanya-tanya bagaimana saya dapat menulis ulang aliran untuk menjaga penggunaan memori tetap rendah?
Thomas Ahle
1
Apakah Anda menggunakan Files.lines(…)? Ini telah meningkat secara signifikan di Jawa 9.
Holger
1
Inilah yang dilakukan di Java 8. Di JRE yang lebih baru, masih akan kembali ke BufferedReader.lines()keadaan tertentu (bukan sistem file default, charset khusus, atau ukuran lebih besar dari Integer.MAX_FILES). Jika salah satu dari ini berlaku, solusi khusus dapat membantu. Ini akan bernilai tanya jawab baru ...
Holger
1
Integer.MAX_VALUE, tentu saja ...
Holger
1
Apa aliran luar, aliran file? Apakah ukurannya dapat diprediksi?
Holger
5

Dugaan terbaik saya adalah menambahkan parallel()perubahan perilaku internal flatMap()yang sudah memiliki masalah dievaluasi malas sebelumnya .

The OutOfMemoryErrorkesalahan yang Anda memperoleh dilaporkan dalam [JDK-8202307] Mendapatkan java.lang.OutOfMemoryError:. Tumpukan ruang Java saat memanggil Stream.iterator () berikutnya () pada aliran yang menggunakan tak terbatas / Streaming sangat besar di flatMap . Jika Anda melihat tiketnya kurang lebih jejak tumpukan yang sama yang Anda dapatkan. Tiket ditutup sebagai Won't Fix dengan alasan berikut:

Metode iterator()dan spliterator()adalah "pintu darurat" untuk digunakan ketika tidak mungkin menggunakan operasi lain. Mereka memiliki beberapa keterbatasan karena mereka mengubah apa yang merupakan model pendorong implementasi aliran menjadi model tarikan. Transisi semacam itu membutuhkan buffering dalam kasus-kasus tertentu, seperti ketika suatu elemen (datar) dipetakan ke dua atau lebih elemen . Ini akan secara signifikan mempersulit implementasi aliran, kemungkinan dengan mengorbankan kasus-kasus umum, untuk mendukung gagasan tekanan balik untuk mengkomunikasikan berapa banyak elemen untuk menarik lapisan berlapis dari produksi elemen.

Karol Dowbecki
sumber
Ini sangat menarik! Masuk akal bahwa transisi push / pull memerlukan buffering yang mungkin menghabiskan memori. Namun dalam kasus saya tampaknya hanya menggunakan push harus berfungsi dengan baik dan hanya membuang elemen yang tersisa saat muncul? Atau mungkin Anda mengatakan bahwa flapmap menyebabkan iterator dibuat?
Thomas Ahle
3

OOME disebabkan bukan oleh aliran yang tak terbatas, tetapi oleh fakta bahwa itu bukan .

Yaitu, jika Anda berkomentar .limit(...), itu tidak akan pernah kehabisan memori - tetapi tentu saja, itu tidak akan pernah berakhir juga.

Setelah dipisah, aliran hanya dapat melacak jumlah elemen jika mereka terakumulasi di dalam setiap utas (sepertinya akumulator sebenarnya Spliterators$ArraySpliterator#array).

Sepertinya Anda dapat mereproduksinya tanpa flatMap, jalankan saja yang berikut ini-Xmx128m :

    System.out.println(Stream
            .iterate(1, i -> i + 1)
            .parallel()
      //    .flatMap(n -> Stream.iterate(n, i -> i+n))
            .mapToInt(Integer::intValue)
            .limit(100_000_000)
            .sum()
    );

Namun, setelah berkomentar limit(), itu akan berjalan dengan baik sampai Anda memutuskan untuk menyimpan laptop Anda.

Selain detail implementasi aktual, inilah yang saya pikir sedang terjadi:

Dengan limit, sumperedam ingin elemen X pertama untuk dijumlahkan, jadi tidak ada utas yang dapat memancarkan jumlah parsial. Setiap "irisan" (utas) harus mengumpulkan elemen dan meneruskannya. Tanpa batas, tidak ada kendala seperti itu sehingga setiap "irisan" hanya akan menghitung jumlah parsial dari elemen yang didapatnya (selamanya), dengan asumsi itu akan memancarkan hasilnya pada akhirnya.

Costi Ciudatu
sumber
Apa maksudmu "setelah itu terpecah"? Apakah batas membaginya?
Thomas Ahle
@ThomasAhle parallel()akan digunakan secara ForkJoinPoolinternal untuk mencapai paralelisme. The Spliteratorakan digunakan untuk pekerjaan assign ke masing-masing ForkJointugas, saya kira kita dapat memanggil unit kerja di sini sebagai "split".
Karol Dowbecki
Tetapi mengapa itu hanya terjadi dengan batas?
Thomas Ahle
@ Thomas, saya mengedit jawabannya dengan dua sen.
Costi Ciudatu
1
@ Thomas mengatur breakpoint Integer.sum(), yang digunakan oleh IntStream.sumperedam. Anda akan melihat bahwa panggilan versi tanpa batas berfungsi sepanjang waktu, sementara versi terbatas tidak pernah memanggilnya sebelum OOM.
Costi Ciudatu