TL; DR
Bagaimana cara mengubahnya Task.whenAll(List<Task>)
menjadi RxJava
?
Kode saya yang ada menggunakan Bolts untuk membuat daftar tugas asinkron dan menunggu hingga semua tugas tersebut selesai sebelum melakukan langkah lain. Pada dasarnya, ini membangun List<Task>
dan mengembalikan satu Task
yang ditandai sebagai selesai ketika semua tugas dalam daftar selesai, seperti contoh di situs Bolts .
Saya ingin mengganti Bolts
dengan RxJava
dan saya mengasumsikan metode ini untuk membangun daftar tugas asinkron (ukuran tidak diketahui sebelumnya) dan membungkus semuanya menjadi satu Observable
adalah mungkin, tetapi saya tidak tahu caranya.
Saya sudah mencoba melihat merge
, zip
, concat
dll ... tapi tidak bisa bekerja pada List<Observable>
bahwa aku akan membangun karena mereka semua tampak diarahkan untuk bekerja pada hanya dua Observables
pada suatu waktu jika saya memahami dokumentasi dengan benar.
Saya mencoba untuk belajar RxJava
dan saya masih sangat baru untuk itu jadi maafkan saya jika ini adalah pertanyaan yang jelas atau dijelaskan dalam dokumen di suatu tempat; Saya telah mencoba mencari. Bantuan apa pun akan sangat dihargai.
sumber
onErrorResumeNext
, contoh:Observable.zip(ob1, ob2........).onErrorResumeNext(Observable.<String>empty())
Anda dapat menggunakan
flatMap
jika Anda memiliki komposisi tugas dinamis. Sesuatu seperti ini:public Observable<Boolean> whenAll(List<Observable<Boolean>> tasks) { return Observable.from(tasks) //execute in parallel .flatMap(task -> task.observeOn(Schedulers.computation())) //wait, until all task are executed //be aware, all your observable should emit onComplemete event //otherwise you will wait forever .toList() //could implement more intelligent logic. eg. check that everything is successful .map(results -> true); }
Contoh bagus lainnya dari eksekusi paralel
Catatan: Saya tidak begitu tahu kebutuhan Anda untuk penanganan kesalahan. Misalnya apa yang harus dilakukan jika hanya satu tugas yang gagal. Saya pikir Anda harus memverifikasi skenario ini.
sumber
zip
memberi tahu tentang penyelesaian segera setelah salah satu tugas selesai dan karenanya tidak berlaku.new Func1<Observable<Boolean>, Observable<Boolean>>()...
dan yang kedua dengannew Func1<List<Boolean>, Boolean>()
Dari saran yang diajukan, zip () sebenarnya menggabungkan hasil yang dapat diamati satu sama lain, yang mungkin diinginkan atau tidak, tetapi tidak ditanyakan dalam pertanyaan. Dalam pertanyaan tersebut, yang diinginkan hanyalah eksekusi dari setiap operasi, baik satu per satu atau secara paralel (yang tidak ditentukan, tetapi contoh Bolts terkait adalah tentang eksekusi paralel). Selain itu, zip () akan segera selesai ketika salah satu observable selesai, jadi itu melanggar persyaratan.
Untuk eksekusi paralel Observables, flatMap () yang disajikan di jawaban lain baik-baik saja, tetapi merge () akan lebih lurus ke depan. Perhatikan bahwa penggabungan akan keluar pada kesalahan dari salah satu Observable, jika Anda lebih suka menunda keluar sampai semua observable selesai, Anda harus melihat mergeDelayError () .
Untuk satu-per-satu, saya pikir metode statis Observable.concat () harus digunakan. Javadoc-nya menyatakan seperti ini:
yang terdengar seperti apa yang Anda cari jika Anda tidak ingin eksekusi paralel.
Juga, jika Anda hanya tertarik pada penyelesaian tugas Anda, bukan mengembalikan nilai, Anda mungkin harus melihat ke Completable daripada Observable .
TLDR: untuk pelaksanaan tugas satu-per-satu dan acara oncompletion ketika mereka selesai, saya pikir Completable.concat () paling cocok. Untuk eksekusi paralel, Completable.merge () atau Completable.mergeDelayError () terdengar seperti solusinya. Yang pertama akan segera berhenti pada setiap kesalahan pada penyelesaian apa pun, yang terakhir akan mengeksekusi semuanya bahkan jika salah satu dari mereka memiliki kesalahan, dan hanya kemudian melaporkan kesalahan.
sumber
Anda mungkin melihat
zip
operator yang bekerja dengan 2 Observable.Ada juga metode statis
Observable.zip
. Ini memiliki satu bentuk yang seharusnya berguna untuk Anda:Anda dapat melihat javadoc untuk lebih lanjut.
sumber
Dengan Kotlin
Penting untuk menyetel tipe untuk argumen fungsi atau Anda akan mengalami kesalahan kompilasi
Jenis argumen terakhir berubah dengan jumlah argumen: BiFungsi untuk 2 Fungsi3 untuk 3 Fungsi4 untuk 4 ...
sumber
Saya sedang menulis beberapa kode heave komputasi di Kotlin dengan JavaRx Observables dan RxKotlin. Saya ingin mengamati daftar observasi yang harus diselesaikan dan sementara itu memberi saya pembaruan dengan kemajuan dan hasil terbaru. Pada akhirnya ini mengembalikan hasil perhitungan terbaik. Persyaratan tambahan adalah menjalankan Observables secara paralel untuk menggunakan semua core cpu saya. Saya berakhir dengan solusi ini:
@Volatile var results: MutableList<CalculationResult> = mutableListOf() fun doALotOfCalculations(listOfCalculations: List<Calculation>): Observable<Pair<String, CalculationResult>> { return Observable.create { subscriber -> Observable.concatEager(listOfCalculations.map { calculation: Calculation -> doCalculation(calculation).subscribeOn(Schedulers.computation()) // function doCalculation returns an Observable with only one result }).subscribeBy( onNext = { results.add(it) subscriber.onNext(Pair("A calculation is ready", it)) }, onComplete = { subscriber.onNext(Pair("Finished: ${results.size}", findBestCalculation(results)) subscriber.onComplete() }, onError = { subscriber.onError(it) } ) } }
sumber
@Volatile
, tetapi bagaimana ini akan berfungsi jika ini dipanggil oleh beberapa utas pada saat yang bersamaan? Apa yang akan terjadi dengan hasil?Saya memiliki masalah yang sama, saya perlu mengambil item pencarian dari panggilan istirahat sementara juga mengintegrasikan saran yang disimpan dari RecentSearchProvider.AUTHORITY dan menggabungkannya menjadi satu daftar terpadu. Saya mencoba menggunakan solusi @MyDogTom, sayangnya tidak ada Observable.from di RxJava. Setelah beberapa penelitian, saya mendapat solusi yang berhasil untuk saya.
fun getSearchedResultsSuggestions(context : Context, query : String) : Single<ArrayList<ArrayList<SearchItem>>> { val fetchedItems = ArrayList<Observable<ArrayList<SearchItem>>>(0) fetchedItems.add(fetchSearchSuggestions(context,query).toObservable()) fetchedItems.add(getSearchResults(query).toObservable()) return Observable.fromArray(fetchedItems) .flatMapIterable { data->data } .flatMap {task -> task.observeOn(Schedulers.io())} .toList() .map { ArrayList(it) } }
Saya membuat sebuah observable dari array observable yang berisi daftar saran dan hasil dari internet tergantung pada kueri. Setelah itu Anda hanya membahas tugas-tugas itu dengan flatMapIterable dan menjalankannya menggunakan flatmap, letakkan hasilnya dalam larik, yang nanti bisa diambil ke dalam tampilan daur ulang.
sumber
Jika Anda menggunakan Project Reactor, Anda dapat menggunakan
Mono.when
.Mono.when(publisher1, publisher2) .map(i-> { System.out.println("everything is done!"); return i; }).block()
sumber