Gabungkan daftar Observable dan tunggu sampai semua selesai

91

TL; DR Bagaimana cara mengubahnya Task.whenAll(List<Task>)menjadi RxJava?

Kode saya yang ada menggunakan Bolts untuk membuat daftar tugas asinkron dan menunggu hingga semua tugas tersebut selesai sebelum melakukan langkah lain. Pada dasarnya, ini membangun List<Task>dan mengembalikan satu Taskyang ditandai sebagai selesai ketika semua tugas dalam daftar selesai, seperti contoh di situs Bolts .

Saya ingin mengganti Boltsdengan RxJavadan saya mengasumsikan metode ini untuk membangun daftar tugas asinkron (ukuran tidak diketahui sebelumnya) dan membungkus semuanya menjadi satu Observableadalah mungkin, tetapi saya tidak tahu caranya.

Saya sudah mencoba melihat merge, zip, concatdll ... tapi tidak bisa bekerja pada List<Observable>bahwa aku akan membangun karena mereka semua tampak diarahkan untuk bekerja pada hanya dua Observablespada suatu waktu jika saya memahami dokumentasi dengan benar.

Saya mencoba untuk belajar RxJavadan saya masih sangat baru untuk itu jadi maafkan saya jika ini adalah pertanyaan yang jelas atau dijelaskan dalam dokumen di suatu tempat; Saya telah mencoba mencari. Bantuan apa pun akan sangat dihargai.

Craig Russell
sumber

Jawaban:

73

Sepertinya Anda sedang mencari operator Zip .

Ada beberapa cara berbeda untuk menggunakannya, jadi mari kita lihat contohnya. Katakanlah kita memiliki beberapa observasi sederhana dari berbagai jenis:

Observable<Integer> obs1 = Observable.just(1);
Observable<String> obs2 = Observable.just("Blah");
Observable<Boolean> obs3 = Observable.just(true);

Cara termudah untuk menunggu semuanya adalah seperti ini:

Observable.zip(obs1, obs2, obs3, (Integer i, String s, Boolean b) -> i + " " + s + " " + b)
.subscribe(str -> System.out.println(str));

Perhatikan bahwa dalam fungsi zip, parameter memiliki tipe konkret yang sesuai dengan jenis observable yang sedang di-zip.

Membuat zip daftar observasi juga dimungkinkan, baik secara langsung:

List<Observable<?>> obsList = Arrays.asList(obs1, obs2, obs3);

Observable.zip(obsList, (i) -> i[0] + " " + i[1] + " " + i[2])
.subscribe(str -> System.out.println(str));

... atau dengan membungkus daftar menjadi Observable<Observable<?>>:

Observable<Observable<?>> obsObs = Observable.from(obsList);

Observable.zip(obsObs, (i) -> i[0] + " " + i[1] + " " + i[2])
.subscribe(str -> System.out.println(str));

Namun, dalam kedua kasus ini, fungsi zip hanya dapat menerima satu Object[]parameter karena jenis observable dalam daftar tidak diketahui sebelumnya serta jumlahnya. Ini berarti bahwa fungsi zip harus memeriksa jumlah parameter dan mentransmisikannya.

Terlepas dari itu, semua contoh di atas pada akhirnya akan dicetak 1 Blah true

EDIT: Saat menggunakan Zip, pastikan bahwa semua yang Observablessedang di-zip memancarkan jumlah item yang sama. Dalam contoh di atas, ketiga observasi memancarkan satu item. Jika kita mengubahnya menjadi seperti ini:

Observable<Integer> obs1 = Observable.from(new Integer[]{1,2,3}); //Emits three items
Observable<String> obs2 = Observable.from(new String[]{"Blah","Hello"}); //Emits two items
Observable<Boolean> obs3 = Observable.from(new Boolean[]{true,true}); //Emits two items

Kemudian 1, Blah, Truedan 2, Hello, Trueakan menjadi satu-satunya item yang diteruskan ke fungsi zip. Item 3tersebut tidak akan pernah di-zip karena observasi lainnya telah selesai.

Malt
sumber
9
Ini tidak akan berfungsi jika salah satu panggilan gagal. Dalam hal ini semua panggilan akan hilang.
StarWind0
1
@ StarWind0 Anda dapat melewati kesalahan dengan menggunakan onErrorResumeNext, contoh:Observable.zip(ob1, ob2........).onErrorResumeNext(Observable.<String>empty())
vuhung3990
Bagaimana jika saya memiliki 100 observable?
Krzysztof Kubicki
80

Anda dapat menggunakan flatMapjika Anda memiliki komposisi tugas dinamis. Sesuatu seperti ini:

public Observable<Boolean> whenAll(List<Observable<Boolean>> tasks) {
    return Observable.from(tasks)
            //execute in parallel
            .flatMap(task -> task.observeOn(Schedulers.computation()))
            //wait, until all task are executed
            //be aware, all your observable should emit onComplemete event
            //otherwise you will wait forever
            .toList()
            //could implement more intelligent logic. eg. check that everything is successful
            .map(results -> true);
}

Contoh bagus lainnya dari eksekusi paralel

Catatan: Saya tidak begitu tahu kebutuhan Anda untuk penanganan kesalahan. Misalnya apa yang harus dilakukan jika hanya satu tugas yang gagal. Saya pikir Anda harus memverifikasi skenario ini.

MyDogTom
sumber
17
Ini harus menjadi jawaban yang diterima mengingat pertanyaan itu menyatakan "ketika semua tugas dalam daftar selesai". zipmemberi tahu tentang penyelesaian segera setelah salah satu tugas selesai dan karenanya tidak berlaku.
pengguna3707125
1
@MyDogTom: Dapatkah Anda Memperbarui Jawaban dengan versi Sintaks Java7 (Bukan lambda)?
sanedroid
3
@PoojaGaikwad Dengan lambda itu lebih mudah dibaca. Cukup ganti lambda pertama dengan new Func1<Observable<Boolean>, Observable<Boolean>>()...dan yang kedua dengannew Func1<List<Boolean>, Boolean>()
MyDogTom
@soshial RxJava 2 adalah hal terburuk yang pernah terjadi dengan RxJava, ya
egorikem
15

Dari saran yang diajukan, zip () sebenarnya menggabungkan hasil yang dapat diamati satu sama lain, yang mungkin diinginkan atau tidak, tetapi tidak ditanyakan dalam pertanyaan. Dalam pertanyaan tersebut, yang diinginkan hanyalah eksekusi dari setiap operasi, baik satu per satu atau secara paralel (yang tidak ditentukan, tetapi contoh Bolts terkait adalah tentang eksekusi paralel). Selain itu, zip () akan segera selesai ketika salah satu observable selesai, jadi itu melanggar persyaratan.

Untuk eksekusi paralel Observables, flatMap () yang disajikan di jawaban lain baik-baik saja, tetapi merge () akan lebih lurus ke depan. Perhatikan bahwa penggabungan akan keluar pada kesalahan dari salah satu Observable, jika Anda lebih suka menunda keluar sampai semua observable selesai, Anda harus melihat mergeDelayError () .

Untuk satu-per-satu, saya pikir metode statis Observable.concat () harus digunakan. Javadoc-nya menyatakan seperti ini:

concat (java.lang.Iterable> sequences) Meratakan Iterable Observable menjadi satu Observable, satu demi satu, tanpa menyela

yang terdengar seperti apa yang Anda cari jika Anda tidak ingin eksekusi paralel.

Juga, jika Anda hanya tertarik pada penyelesaian tugas Anda, bukan mengembalikan nilai, Anda mungkin harus melihat ke Completable daripada Observable .

TLDR: untuk pelaksanaan tugas satu-per-satu dan acara oncompletion ketika mereka selesai, saya pikir Completable.concat () paling cocok. Untuk eksekusi paralel, Completable.merge () atau Completable.mergeDelayError () terdengar seperti solusinya. Yang pertama akan segera berhenti pada setiap kesalahan pada penyelesaian apa pun, yang terakhir akan mengeksekusi semuanya bahkan jika salah satu dari mereka memiliki kesalahan, dan hanya kemudian melaporkan kesalahan.

eis
sumber
2

Anda mungkin melihat zipoperator yang bekerja dengan 2 Observable.

Ada juga metode statis Observable.zip. Ini memiliki satu bentuk yang seharusnya berguna untuk Anda:

zip(java.lang.Iterable<? extends Observable<?>> ws, FuncN<? extends R> zipFunction)

Anda dapat melihat javadoc untuk lebih lanjut.

LordRaydenMK
sumber
2

Dengan Kotlin

Observable.zip(obs1, obs2, BiFunction { t1 : Boolean, t2:Boolean ->

})

Penting untuk menyetel tipe untuk argumen fungsi atau Anda akan mengalami kesalahan kompilasi

Jenis argumen terakhir berubah dengan jumlah argumen: BiFungsi untuk 2 Fungsi3 untuk 3 Fungsi4 untuk 4 ...

Kevin ABRIOUX
sumber
1

Saya sedang menulis beberapa kode heave komputasi di Kotlin dengan JavaRx Observables dan RxKotlin. Saya ingin mengamati daftar observasi yang harus diselesaikan dan sementara itu memberi saya pembaruan dengan kemajuan dan hasil terbaru. Pada akhirnya ini mengembalikan hasil perhitungan terbaik. Persyaratan tambahan adalah menjalankan Observables secara paralel untuk menggunakan semua core cpu saya. Saya berakhir dengan solusi ini:

@Volatile var results: MutableList<CalculationResult> = mutableListOf()

fun doALotOfCalculations(listOfCalculations: List<Calculation>): Observable<Pair<String, CalculationResult>> {

    return Observable.create { subscriber ->
        Observable.concatEager(listOfCalculations.map { calculation: Calculation ->
            doCalculation(calculation).subscribeOn(Schedulers.computation()) // function doCalculation returns an Observable with only one result
        }).subscribeBy(
            onNext = {
                results.add(it)
                subscriber.onNext(Pair("A calculation is ready", it))

            },
            onComplete = {
                subscriber.onNext(Pair("Finished: ${results.size}", findBestCalculation(results)) 
                subscriber.onComplete()
            },
            onError = {
                subscriber.onError(it)
            }
        )
    }
}
Sjors
sumber
tidak terbiasa dengan RxKotlin atau @Volatile, tetapi bagaimana ini akan berfungsi jika ini dipanggil oleh beberapa utas pada saat yang bersamaan? Apa yang akan terjadi dengan hasil?
eis
0

Saya memiliki masalah yang sama, saya perlu mengambil item pencarian dari panggilan istirahat sementara juga mengintegrasikan saran yang disimpan dari RecentSearchProvider.AUTHORITY dan menggabungkannya menjadi satu daftar terpadu. Saya mencoba menggunakan solusi @MyDogTom, sayangnya tidak ada Observable.from di RxJava. Setelah beberapa penelitian, saya mendapat solusi yang berhasil untuk saya.

 fun getSearchedResultsSuggestions(context : Context, query : String) : Single<ArrayList<ArrayList<SearchItem>>>
{
    val fetchedItems = ArrayList<Observable<ArrayList<SearchItem>>>(0)
    fetchedItems.add(fetchSearchSuggestions(context,query).toObservable())
    fetchedItems.add(getSearchResults(query).toObservable())

    return Observable.fromArray(fetchedItems)
        .flatMapIterable { data->data }
        .flatMap {task -> task.observeOn(Schedulers.io())}
        .toList()
        .map { ArrayList(it) }
}

Saya membuat sebuah observable dari array observable yang berisi daftar saran dan hasil dari internet tergantung pada kueri. Setelah itu Anda hanya membahas tugas-tugas itu dengan flatMapIterable dan menjalankannya menggunakan flatmap, letakkan hasilnya dalam larik, yang nanti bisa diambil ke dalam tampilan daur ulang.

Anton Makov
sumber
0

Jika Anda menggunakan Project Reactor, Anda dapat menggunakan Mono.when.

Mono.when(publisher1, publisher2)
.map(i-> {
    System.out.println("everything is done!");
    return i;
}).block()
Kijang
sumber