Dapat diamati vs mengalir rxJava2

128

Saya telah melihat rx java 2 baru dan saya tidak begitu yakin saya mengerti idenya backpressurelagi ...

Saya sadar bahwa kami memiliki Observableyang tidak memiliki backpressuredukungan dan Flowableyang memilikinya.

Jadi berdasarkan contoh, katakanlah saya punya flowabledengan interval:

        Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    // do smth
                }
            });

Ini akan crash setelah sekitar 128 nilai, dan itu cukup jelas saya memakan lebih lambat daripada mendapatkan item.

Tapi kemudian kita memiliki hal yang sama Observable

     Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    // do smth
                }
            });

Ini tidak akan macet sama sekali, bahkan ketika saya menunda untuk memakannya masih berfungsi. Untuk membuat Flowablepekerjaan katakanlah saya menempatkan onBackpressureDropoperator, crash hilang tetapi tidak semua nilai juga dipancarkan.

Jadi pertanyaan dasar saya tidak dapat menemukan jawaban saat ini di kepala saya adalah mengapa saya harus peduli backpressureketika saya dapat menggunakan polos Observablemasih menerima semua nilai tanpa mengelola buffer? Atau mungkin dari sisi lain, keuntungan apa yang backpressuresaya dapatkan dari pengelolaan dan penanganan konsumsi?

pengguna2141889
sumber

Jawaban:

123

Apa yang dimanifestasikan tekanan balik dalam praktiknya adalah buffer terbatas, Flowable.observeOnmemiliki buffer 128 elemen yang dikosongkan secepat yang dapat dilakukan dowstream. Anda dapat meningkatkan ukuran buffer ini secara individual untuk menangani sumber bursty dan semua praktik manajemen tekanan balik masih berlaku dari 1.x. Observable.observeOnmemiliki buffer tak terbatas yang terus mengumpulkan elemen dan aplikasi Anda mungkin kehabisan memori.

Anda dapat menggunakan Observablemisalnya:

  • menangani acara GUI
  • bekerja dengan urutan pendek (total kurang dari 1000 elemen)

Anda dapat menggunakan Flowablemisalnya:

  • sumber dingin dan tidak berjangka waktu
  • generator seperti sumber
  • pengakses jaringan dan database
akarnokd
sumber
Karena ini memiliki datang di pertanyaan lain - apakah benar bahwa jenis lebih terbatas seperti Maybe, Singledan Completabledapat selalu digunakan sebagai pengganti Flowableketika mereka semantik yang sesuai?
david.mihola
1
Ya, Maybe, Single, dan Completableyang jauh terlalu kecil untuk memiliki kebutuhan konsep tekanan balik. Tidak ada kemungkinan produsen dapat mengeluarkan barang lebih cepat daripada yang dapat mereka konsumsi, karena 0–1 barang akan diproduksi atau dikonsumsi.
AndrewF
Mungkin saya tidak benar, tapi bagi saya Contoh Flowable dan Observable harus ditukar.
Yura Galavay
Saya pikir dalam pertanyaan dia kehilangan strategi backpressure yang perlu kita berikan ke Flowable, yang menjelaskan mengapa pengecualian backpressure yang hilang dilemparkan, juga menjelaskan mengapa pengecualian ini menghilang setelah dia menerapkan .onBackpressureDrop (). Dan untuk Observable, karena tidak memiliki strategi ini dan tidak dapat disediakan, itu hanya akan gagal nanti karena OOM
Haomin
111

Backpressure adalah ketika observable Anda (penerbit) membuat lebih banyak acara daripada yang bisa ditangani pelanggan Anda. Jadi Anda bisa mendapatkan pelanggan yang melewatkan acara, atau Anda bisa mendapatkan antrian acara yang sangat besar yang pada akhirnya hanya akan kehabisan memori. Flowablemempertimbangkan tekanan balik. Observabletidak. Itu dia.

itu mengingatkan saya pada corong yang bila terlalu banyak cairan meluap. Flowable dapat membantu untuk tidak mewujudkannya:

dengan tekanan balik yang luar biasa:

masukkan deskripsi gambar di sini

tetapi dengan menggunakan flowable, tekanan balik jauh lebih sedikit:

masukkan deskripsi gambar di sini

Rxjava2 memiliki beberapa strategi tekanan balik yang dapat Anda gunakan tergantung pada kasus penggunaan Anda. dengan strategi maksud saya Rxjava2 menyediakan cara untuk menangani objek yang tidak dapat diproses karena overflow (backpressure).

Berikut adalah strateginya. Saya tidak akan membahas semuanya, tetapi misalnya jika Anda ingin tidak khawatir tentang item yang meluap Anda dapat menggunakan strategi drop seperti ini:

observable.toFlowable (BackpressureStrategy.DROP)

Sejauh yang saya tahu harus ada batasan 128 item pada antrian, setelah itu bisa terjadi overflow (backpressure). Bahkan jika bukan 128 itu mendekati angka itu. Semoga ini bisa membantu seseorang.

jika Anda perlu mengubah ukuran buffer dari 128 sepertinya itu bisa dilakukan seperti ini (tapi perhatikan batasan memori:

myObservable.toFlowable(BackpressureStrategy.MISSING).buffer(256); //but using MISSING might be slower.  

Dalam pengembangan perangkat lunak biasanya strategi tekanan balik berarti Anda memberi tahu emitor untuk melambat sedikit karena konsumen tidak dapat menangani kecepatan peristiwa yang Anda pancarkan.

j2emanue
sumber
Saya selalu berpikir tekanan balik adalah nama untuk sekumpulan mekanisme yang memungkinkan konsumen memberi tahu produsen untuk memperlambat ...
kboom
Bisa jadi kasusnya. Ya
j2emanue
Apakah ada kerugian menggunakan Flowable?
IgorGanapolsky
Gambar-gambar ini berbohong kepada saya. Acara yang dibatalkan tidak akan berakhir dengan "lebih banyak uang" di bagian bawah.
EpicPandaForce
1
@ j2emanue, Anda bingung dengan ukuran buffer untuk operator dan operator Flowable.buffer (int). Silakan baca javadoc dengan hati-hati dan perbaiki jawaban Anda sesuai: reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html
tomek
15

Fakta bahwa Anda Flowablejatuh setelah memancarkan 128 nilai tanpa penanganan tekanan balik tidak berarti itu akan selalu macet setelah tepat 128 nilai: kadang-kadang akan macet setelah 10, dan kadang-kadang tidak akan macet sama sekali. Saya percaya inilah yang terjadi ketika Anda mencoba contoh dengan Observable- kebetulan tidak ada tekanan balik, jadi kode Anda berfungsi normal, lain kali mungkin tidak. Perbedaan di RxJava 2 adalah tidak ada lagi konsep backpressure dalam Observables, dan tidak ada cara untuk menanganinya. Jika Anda mendesain urutan reaktif yang mungkin memerlukan penanganan tekanan balik eksplisit - maka itu Flowableadalah pilihan terbaik Anda.

Egor
sumber
Ya, saya telah mengamati bahwa kadang-kadang rusak karena nilai-nilai yang lebih rendah, kadang-kadang tidak. Tetapi sekali lagi jika misalnya saya hanya menangani intervaltanpa backpressureapakah saya akan mengharapkan perilaku atau masalah yang aneh?
user2141889
Jika Anda yakin tidak mungkin masalah tekanan balik dapat terjadi dalam urutan tertentu yang Dapat Diamati - maka saya rasa tidak apa-apa untuk mengabaikan tekanan balik.
Egor