Saya ingin menduplikasi aliran Java 8 sehingga saya bisa mengatasinya dua kali. Saya bisa collect
sebagai daftar dan mendapatkan aliran baru dari itu;
// doSomething() returns a stream
List<A> thing = doSomething().collect(toList());
thing.stream()... // do stuff
thing.stream()... // do other stuff
Tapi menurut saya harus ada cara yang lebih efisien / elegan.
Adakah cara untuk menyalin aliran tanpa mengubahnya menjadi koleksi?
Saya sebenarnya bekerja dengan aliran Either
s, jadi ingin memproses proyeksi kiri dengan satu cara sebelum beralih ke proyeksi kanan dan menangani dengan cara lain. Jenis seperti ini (yang, sejauh ini, saya terpaksa menggunakan toList
trik dengan).
List<Either<Pair<A, Throwable>, A>> results = doSomething().collect(toList());
Stream<Pair<A, Throwable>> failures = results.stream().flatMap(either -> either.left());
failures.forEach(failure -> ... );
Stream<A> successes = results.stream().flatMap(either -> either.right());
successes.forEach(success -> ... );
java
lambda
java-8
java-stream
Toby
sumber
sumber
Jawaban:
Saya pikir asumsi Anda tentang efisiensi agak mundur. Anda akan mendapatkan pengembalian efisiensi yang sangat besar ini jika Anda hanya akan menggunakan data sekali, karena Anda tidak perlu menyimpannya, dan streaming memberi Anda pengoptimalan "loop fusion" yang hebat yang memungkinkan Anda mengalirkan seluruh data secara efisien melalui pipeline.
Jika Anda ingin menggunakan kembali data yang sama, maka menurut definisi Anda harus menghasilkannya dua kali (secara deterministik) atau menyimpannya. Jika kebetulan sudah ada dalam koleksi, bagus; kemudian mengulanginya dua kali lebih murah.
Kami melakukan percobaan dalam desain dengan "aliran bercabang". Apa yang kami temukan adalah bahwa mendukung ini memiliki biaya yang nyata; itu membebani kasus umum (gunakan sekali) dengan mengorbankan kasus yang tidak umum. Masalah besarnya adalah berurusan dengan "apa yang terjadi jika kedua pipeline tidak mengonsumsi data pada kecepatan yang sama". Sekarang Anda kembali ke buffering. Ini adalah fitur yang jelas tidak membawa bobotnya.
Jika Anda ingin mengoperasikan data yang sama berulang kali, simpan, atau susun operasi Anda sebagai Konsumen dan lakukan hal berikut:
Anda juga dapat melihat ke perpustakaan RxJava, karena model pemrosesannya lebih cocok untuk jenis "percabangan aliran" ini.
sumber
toList
) untuk dapat memprosesnya (Either
kasus menjadi contoh)?Anda dapat menggunakan variabel lokal dengan
Supplier
untuk menyiapkan bagian umum dari pipeline streaming.Dari http://winterbe.com/posts/2014/07/31/java8-stream-tutorial-examples/ :
sumber
Supplier
jikaStream
dibangun dengan cara yang "mahal", Anda membayar biaya tersebut untuk setiap panggilan keSupplier.get()
. yaitu jika kueri database ... kueri itu dilakukan setiap waktuSet<Integer>
penggunaancollect(Collectors.toSet())
... dan melakukan beberapa operasi untuk itu. Saya inginmax()
dan jika nilai tertentu ditetapkan sebagai dua operasi ...filter(d -> d == -1).count() == 1;
Gunakan a
Supplier
untuk menghasilkan aliran untuk setiap operasi terminasi.Kapan pun Anda membutuhkan aliran koleksi itu, gunakan
streamSupplier.get()
untuk mendapatkan aliran baru.Contoh:
streamSupplier.get().anyMatch(predicate);
streamSupplier.get().allMatch(predicate2);
sumber
Kami telah menerapkan
duplicate()
metode untuk aliran di jOOλ , pustaka Sumber Terbuka yang kami buat untuk meningkatkan pengujian integrasi untuk jOOQ . Intinya, Anda cukup menulis:Secara internal, ada buffer yang menyimpan semua nilai yang telah dikonsumsi dari satu aliran tetapi tidak dari yang lain. Itu mungkin seefisien yang didapat jika dua aliran Anda dikonsumsi dengan kecepatan yang sama, dan jika Anda dapat hidup dengan kurangnya keamanan utas .
Berikut cara kerja algoritme:
Lebih banyak kode sumber di sini
Tuple2
mungkin seperti AndaPair
jenis, sedangkanSeq
adalahStream
dengan beberapa perangkat tambahan.sumber
Tuple2<Seq<A>>, Seq<A>> t = duplicate(stream); long count = t.collect(counting()); List<A> list = t.collect(toList());
, lebih baik melakukannyaTuple2<Long, List<A>> t = stream.collect(Tuple.collectors(counting(), toList()));
. MenggunakanCollectors.mapping/reducing
satu dapat mengekspresikan operasi aliran lain sebagai kolektor dan elemen proses dengan cara yang sangat berbeda menciptakan tupel yang dihasilkan tunggal. Jadi secara umum Anda dapat melakukan banyak hal yang memakan aliran satu kali tanpa duplikasi dan itu akan ramah paralel.offer()
/poll()
API, tetapiArrayDeque
mungkin melakukan hal yang sama.Anda dapat membuat aliran runnable (misalnya):
Di mana
failure
dansuccess
operasi yang akan diterapkan. Namun ini akan membuat beberapa objek sementara dan mungkin tidak lebih efisien daripada memulai dari pengumpulan dan streaming / iterasi dua kali.sumber
Cara lain untuk menangani elemen beberapa kali adalah dengan menggunakan Stream.peek (Konsumen) :
peek(Consumer)
dapat dirantai sebanyak yang dibutuhkan.sumber
cyclops-react , pustaka tempat saya berkontribusi, memiliki metode statis yang memungkinkan Anda menduplikasi Stream (dan mengembalikan jOOλ Tuple of Streams).
Lihat komentar, ada hukuman kinerja yang akan ditimbulkan saat menggunakan duplikat di Aliran yang ada. Alternatif yang lebih berkinerja adalah menggunakan Streamable: -
Ada juga kelas Streamable (malas) yang dapat dibuat dari Stream, Iterable atau Array dan diputar ulang beberapa kali.
AsStreamable.synchronizedFromStream (stream) - dapat digunakan untuk membuat Streamable yang dengan malas akan mengisi koleksi pendukungnya, sedemikian rupa sehingga dapat dibagikan di seluruh utas. Streamable.fromStream (streaming) tidak akan menimbulkan overhead sinkronisasi.
sumber
List<Integer> list = stream.collect(Collectors.toList()); streams = new Tuple2<>(list.stream(), list.stream())
(seperti yang disarankan OP). Juga ungkapkan secara eksplisit dalam jawaban bahwa Anda adalah penulis cyclop-streams. Baca ini .Untuk masalah khusus ini, Anda juga dapat menggunakan partisi. Sesuatu seperti
sumber
Kami dapat menggunakan Stream Builder pada saat membaca atau mengulang aliran. Berikut dokumen Stream Builder .
https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.Builder.html
Kasus penggunaan
Katakanlah kita memiliki aliran karyawan dan kita perlu menggunakan aliran ini untuk menulis data karyawan dalam file excel dan kemudian memperbarui koleksi / tabel karyawan [Ini hanya kasus penggunaan untuk menunjukkan penggunaan Stream Builder]:
sumber
Saya memiliki masalah yang sama, dan dapat memikirkan tiga struktur perantara yang berbeda untuk membuat salinan aliran: a
List
, array, dan aStream.Builder
. Saya menulis program benchmark kecil, yang menyarankan bahwa dari sudut pandang kinerjaList
sekitar 30% lebih lambat daripada dua lainnya yang cukup mirip.Satu-satunya kelemahan dari mengonversi ke array adalah rumit jika tipe elemen Anda adalah tipe generik (yang dalam kasus saya memang demikian); oleh karena itu saya lebih suka menggunakan a
Stream.Builder
.Saya akhirnya menulis sebuah fungsi kecil yang menciptakan
Collector
:Saya kemudian dapat membuat salinan aliran apa pun
str
dengan melakukanstr.collect(copyCollector())
yang dirasa cukup sesuai dengan penggunaan idiomatik aliran.sumber