Apakah aliran Java 8 mirip dengan yang dapat diamati RxJava?
Definisi Java 8 stream:
Kelas-kelas dalam
java.util.stream
paket baru menyediakan Stream API untuk mendukung operasi gaya fungsional pada aliran elemen.
java-8
java-stream
rx-java
observable
rahulrv
sumber
sumber
Jawaban:
TL; DR : Semua lib pemrosesan urutan / aliran menawarkan API yang sangat mirip untuk pembangunan saluran pipa. Perbedaannya ada di API untuk menangani multi-threading dan komposisi saluran pipa.
RxJava sangat berbeda dari Stream. Dari semua hal JDK, yang terdekat dengan rx.Observable mungkin
java.util.stream.CollectorStream + CompletableFuture combo (yang datang dengan biaya berurusan dengan lapisan monad tambahan, yaitu harus menangani konversi antaraStream<CompletableFuture<T>>
danCompletableFuture<Stream<T>>
).Ada perbedaan yang signifikan antara Observable dan Stream:
Stream#parallel()
Membagi urutan menjadi partisi,Observable#subscribeOn()
danObservable#observeOn()
tidak; sulit untuk meniruStream#parallel()
perilaku dengan Observable, ia pernah memiliki.parallel()
metode tetapi metode ini menyebabkan banyak kebingungan sehingga.parallel()
dukungan dipindahkan ke repositori terpisah di github, RxJavaParallel. Lebih detail ada di jawaban lain .Stream#parallel()
tidak mengizinkan untuk menentukan kumpulan utas untuk digunakan, tidak seperti kebanyakan metode RxJava menerima Penjadwal opsional. Karena semua instance stream dalam JVM menggunakan fork-join pool yang sama, menambahkan.parallel()
secara tidak sengaja dapat mempengaruhi perilaku di modul lain dari program AndaObservable#interval()
,Observable#window()
dan banyak lainnya; ini sebagian besar karena Streaming berbasis tarik, dan hulu tidak memiliki kontrol kapan harus memancarkan elemen berikutnya hilirtakeWhile()
,takeUntil()
); penggunaan solusiStream#anyMatch()
terbatas: ini adalah operasi terminal, jadi Anda tidak dapat menggunakannya lebih dari sekali per aliranStreaming sulit untuk dibangun sendiri, Dapat diamati dapat dibangun dengan banyak cara.EDIT: Seperti disebutkan dalam komentar, ada cara untuk membangun Stream. Namun, karena tidak ada hubungan arus pendek non-terminal, Anda tidak dapat misalnya dengan mudah menghasilkan Stream of lines dalam file (JDK menyediakan baris Files # dan BufferedReader # di luar kotak sekalipun, dan skenario serupa lainnya dapat dikelola dengan membangun Stream dari Iterator).Observable#using()
); Anda dapat membungkus aliran IO atau mutex dengannya dan memastikan bahwa pengguna tidak akan lupa untuk membebaskan sumber daya - itu akan dibuang secara otomatis pada penghentian berlangganan; Streaming memilikionClose(Runnable)
metode, tetapi Anda harus menyebutnya secara manual atau melalui coba-dengan-sumber daya. E. g. Anda harus ingat bahwa Files # lines () harus dilampirkan dalam blok try-with-resources.Round-up: RxJava berbeda dari Streaming secara signifikan. Alternatif RxJava nyata adalah implementasi lain dari ReactiveStreams , misalnya bagian yang relevan dari Akka.
Perbarui . Ada trik untuk menggunakan pool-fork-join non-default untuk
Stream#parallel
, lihat pool thread kustom di Java 8 stream paralelPerbarui . Semua hal di atas didasarkan pada pengalaman dengan RxJava 1.x. Sekarang RxJava 2.x ada di sini , jawaban ini mungkin kedaluwarsa.
sumber
Stream.generate()
dan meneruskanSupplier<U>
implementasi Anda sendiri , hanya satu metode sederhana dari mana Anda memberikan item berikutnya dalam aliran. Ada banyak metode lain. Untuk dengan mudah membangun urutanStream
yang tergantung pada nilai-nilai sebelumnya Anda dapat menggunakaninterate()
metode ini, setiapCollection
memilikistream()
metode danStream.of()
membangunStream
dari varargs atau array. AkhirnyaStreamSupport
memiliki dukungan untuk pembuatan aliran yang lebih maju menggunakan spliterator atau untuk aliran jenis primitif.takeWhile()
,takeUntil()
);" - JDK9 memiliki ini, saya percaya, di takeWhile () dan dropWhile ()Java 8 Stream dan RxJava terlihat sangat mirip. Mereka memiliki operator yang mirip (filter, peta, flatMap ...) tetapi tidak dibuat untuk penggunaan yang sama.
Anda dapat melakukan tugas asynchonus menggunakan RxJava.
Dengan streaming Java 8, Anda akan melintasi item koleksi Anda.
Anda dapat melakukan hal yang hampir sama di RxJava (melintasi item koleksi) tetapi, karena RxJava difokuskan pada tugas bersamaan, ..., ia menggunakan sinkronisasi, kait, ... Jadi tugas yang sama menggunakan RxJava mungkin lebih lambat daripada dengan Java 8 stream.
RxJava dapat dibandingkan dengan
CompletableFuture
, tetapi itu bisa dapat menghitung lebih dari satu nilai.sumber
parallelStream
mendukung sinkronisasi serupa dari lintasan sederhana / peta / penyaringan dll.Ada beberapa perbedaan teknis dan konseptual, misalnya, aliran Java 8 adalah penggunaan tunggal, tarik berbasis, urutan nilai sinkron sedangkan RxJava Observable dapat diamati kembali, adaptif dorong-tarik berbasis, berpotensi urutan nilai asinkron. RxJava ditujukan untuk Java 6+ dan bekerja di Android juga.
sumber
Java 8 Streaming berbasis tarik. Anda beralih menggunakan streaming Java 8 untuk setiap item. Dan itu bisa menjadi aliran tanpa akhir.
RXJava
Observable
secara default berbasis push. Anda berlangganan Observable dan Anda akan diberitahu ketika item berikutnya tiba (onNext
), atau ketika aliran selesai (onCompleted
), atau ketika kesalahan terjadi (onError
). Karena denganObservable
Anda menerimaonNext
,onCompleted
,onError
peristiwa, Anda dapat melakukan beberapa fungsi yang kuat seperti menggabungkan berbedaObservable
s ke yang baru (zip
,merge
,concat
). Hal lain yang dapat Anda lakukan adalah caching, pembatasan, ... Dan ia menggunakan API yang kurang lebih sama dalam berbagai bahasa (RxJava, RX dalam C #, RxJS, ...)Secara default RxJava adalah utas tunggal. Kecuali Anda mulai menggunakan Penjadwal, semuanya akan terjadi pada utas yang sama.
sumber
Jawaban yang ada komprehensif dan benar, tetapi contoh yang jelas untuk pemula masih kurang. Izinkan saya untuk meletakkan beberapa konkret di belakang istilah seperti "push / pull-based" dan "re-observable". Catatan : Saya benci istilah
Observable
(itu aliran demi Tuhan), jadi cukup merujuk ke aliran J8 vs RX.Pertimbangkan daftar bilangan bulat,
J8 Stream adalah utilitas untuk memodifikasi koleksi. Misalnya digit bahkan dapat diekstraksi sebagai,
Ini pada dasarnya adalah peta Python , filter, kurangi , tambahan yang sangat bagus (dan lama tertunda) ke Java. Tetapi bagaimana jika digit tidak dikumpulkan sebelumnya - bagaimana jika digitnya mengalir saat aplikasi sedang berjalan - dapatkah kita menyaring bahkan dalam waktu nyata.
Bayangkan proses utas terpisah menghasilkan bilangan bulat secara acak saat aplikasi sedang berjalan (
---
menunjukkan waktu)Di RX,
even
dapat bereaksi terhadap setiap digit baru dan menerapkan filter secara real-timeTidak perlu menyimpan daftar input dan output. Jika Anda menginginkan daftar keluaran, tidak ada masalah yang dapat dialirkan juga. Faktanya, semuanya adalah aliran.
Inilah sebabnya mengapa istilah seperti "stateless" dan "fungsional" lebih terkait dengan RX
sumber
RxJava juga terkait erat dengan inisiatif stream reaktif dan menganggapnya sebagai implementasi sederhana dari API stream reaktif (misalnya dibandingkan dengan implementasi stream Akka ). Perbedaan utama adalah, bahwa aliran reaktif dirancang untuk dapat menangani tekanan balik, tetapi jika Anda melihat halaman aliran reaktif, Anda akan mendapatkan ide. Mereka menggambarkan tujuan mereka dengan cukup baik dan alirannya juga terkait erat dengan manifesto reaktif .
Java 8 stream cukup banyak penerapan koleksi tak terbatas, sangat mirip dengan Scala Stream atau Clojure lazy seq .
sumber
Java 8 Streaming memungkinkan pemrosesan koleksi yang sangat besar secara efisien, sambil meningkatkan arsitektur multicore. Sebaliknya, RxJava adalah single-threaded secara default (tanpa Penjadwal). Jadi RxJava tidak akan mengambil keuntungan dari mesin multi-core kecuali jika Anda membuat kode logika sendiri.
sumber