Saya menggunakan rxjava di aplikasi Android saya untuk menangani permintaan jaringan secara asinkron. Sekarang saya ingin mencoba lagi permintaan jaringan yang gagal hanya setelah waktu tertentu berlalu.
Apakah ada cara untuk menggunakan retry () pada Observable tetapi mencoba ulang hanya setelah penundaan tertentu?
Adakah cara untuk memberi tahu Observable yang saat ini sedang dicoba ulang (bukan mencoba untuk pertama kali)?
Saya telah melihat debounce () / throttleWithTimeout () tetapi mereka tampaknya melakukan sesuatu yang berbeda.
Edit:
Saya rasa saya menemukan satu cara untuk melakukannya, tetapi saya akan tertarik pada konfirmasi bahwa ini adalah cara yang benar untuk melakukannya atau cara lain yang lebih baik.
Apa yang saya lakukan adalah ini: Dalam metode call () dari Observable.OnSubscribe saya, sebelum saya memanggil metode Subscribers onError (), saya membiarkan Thread tidur selama waktu yang diinginkan. Jadi, untuk mencoba lagi setiap 1000 milidetik, saya melakukan sesuatu seperti ini:
@Override
public void call(Subscriber<? super List<ProductNode>> subscriber) {
try {
Log.d(TAG, "trying to load all products with pid: " + pid);
subscriber.onNext(productClient.getProductNodesForParentId(pid));
subscriber.onCompleted();
} catch (Exception e) {
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e.printStackTrace();
}
subscriber.onError(e);
}
}
Karena metode ini tetap berjalan di thread IO, metode ini tidak memblokir UI. Satu-satunya masalah yang dapat saya lihat adalah bahwa bahkan kesalahan pertama dilaporkan dengan penundaan sehingga penundaan tetap ada meskipun tidak ada retry (). Saya ingin lebih baik jika penundaan tidak diterapkan setelah kesalahan tetapi sebelum percobaan ulang (tapi jelas tidak sebelum percobaan pertama).
Error:(73, 20) error: incompatible types: RetryWithDelay cannot be converted to Func1<? super Observable<? extends Throwable>,? extends Observable<?>>
RetryWithDelay
ke ini: pastebin.com/6SiZeKnCTerinspirasi oleh jawaban Paul , dan jika Anda tidak peduli dengan
retryWhen
masalah yang dikemukakan oleh Abhijit Sarkar , cara termudah untuk menunda langganan ulang dengan rxJava2 unconditionnaly adalah:source.retryWhen(throwables -> throwables.delay(1, TimeUnit.SECONDS))
Anda mungkin ingin melihat lebih banyak contoh dan penjelasan tentang retryWhen dan repeatWhen .
sumber
Contoh ini bekerja dengan jxjava 2.2.2:
Coba lagi tanpa penundaan:
Single.just(somePaylodData) .map(data -> someConnection.send(data)) .retry(5) .doOnSuccess(status -> log.info("Yay! {}", status);
Coba lagi dengan penundaan:
Single.just(somePaylodData) .map(data -> someConnection.send(data)) .retryWhen((Flowable<Throwable> f) -> f.take(5).delay(300, TimeUnit.MILLISECONDS)) .doOnSuccess(status -> log.info("Yay! {}", status) .doOnError((Throwable error) -> log.error("I tried five times with a 300ms break" + " delay in between. But it was in vain."));
Single sumber kami gagal jika someConnection.send () gagal. Ketika itu terjadi, kegagalan yang dapat diamati di dalam retryWhen mengeluarkan kesalahan. Kami menunda emisi itu sebanyak 300 md dan mengirimkannya kembali untuk menandakan percobaan ulang. take (5) menjamin bahwa signaling observable kami akan berhenti setelah kami menerima lima kesalahan. retryWhen melihat penghentian dan tidak mencoba lagi setelah kegagalan kelima.
sumber
Ini adalah solusi yang didasarkan pada potongan Ben Christensen saya melihat, RetryWhen Contoh , dan RetryWhenTestsConditional (saya harus mengubah
n.getThrowable()
untukn
untuk itu untuk bekerja). Saya menggunakan evant / gradle-retrolambda untuk membuat notasi lambda berfungsi di Android, tetapi Anda tidak harus menggunakan lambda (meskipun sangat disarankan). Untuk penundaan, saya menerapkan back-off eksponensial, tetapi Anda dapat memasukkan logika mundur apa pun yang Anda inginkan di sana. Untuk kelengkapan saya tambahkan operatorsubscribeOn
danobserveOn
. Saya menggunakan ReactiveX / RxAndroid untukAndroidSchedulers.mainThread()
.int ATTEMPT_COUNT = 10; public class Tuple<X, Y> { public final X x; public final Y y; public Tuple(X x, Y y) { this.x = x; this.y = y; } } observable .subscribeOn(Schedulers.io()) .retryWhen( attempts -> { return attempts.zipWith(Observable.range(1, ATTEMPT_COUNT + 1), (n, i) -> new Tuple<Throwable, Integer>(n, i)) .flatMap( ni -> { if (ni.y > ATTEMPT_COUNT) return Observable.error(ni.x); return Observable.timer((long) Math.pow(2, ni.y), TimeUnit.SECONDS); }); }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(subscriber);
sumber
Observable
objek lain ?kjones
solusi dan itu bekerja dengan sempurna untuk saya, terima kasihalih-alih menggunakan MyRequestObservable.retry saya menggunakan fungsi pembungkus retryObservable (MyRequestObservable, retrycount, detik) yang mengembalikan Observable baru yang menangani tipuan untuk penundaan sehingga saya bisa melakukannya
retryObservable(restApi.getObservableStuff(), 3, 30) .subscribe(new Action1<BonusIndividualList>(){ @Override public void call(BonusIndividualList arg0) { //success! } }, new Action1<Throwable>(){ @Override public void call(Throwable arg0) { // failed after the 3 retries ! }}); // wrapper code private static <T> Observable<T> retryObservable( final Observable<T> requestObservable, final int nbRetry, final long seconds) { return Observable.create(new Observable.OnSubscribe<T>() { @Override public void call(final Subscriber<? super T> subscriber) { requestObservable.subscribe(new Action1<T>() { @Override public void call(T arg0) { subscriber.onNext(arg0); subscriber.onCompleted(); } }, new Action1<Throwable>() { @Override public void call(Throwable error) { if (nbRetry > 0) { Observable.just(requestObservable) .delay(seconds, TimeUnit.SECONDS) .observeOn(mainThread()) .subscribe(new Action1<Observable<T>>(){ @Override public void call(Observable<T> observable){ retryObservable(observable, nbRetry - 1, seconds) .subscribe(subscriber); } }); } else { // still fail after retries subscriber.onError(error); } } }); } }); }
sumber
retryWhen
adalah operator yang rumit, bahkan mungkin buggy. Dokumen resmi dan setidaknya satu jawaban di sini menggunakanrange
operator, yang akan gagal jika tidak ada percobaan ulang yang harus dilakukan. Lihat diskusi saya dengan anggota ReactiveX David Karnok.Saya meningkatkan jawaban kjones dengan mengubah
flatMap
keconcatMap
dan dengan menambahkanRetryDelayStrategy
kelas.flatMap
tidak menjaga urutan emisi sementaraconcatMap
melakukannya, yang penting untuk penundaan dengan back-off. ItuRetryDelayStrategy
, seperti namanya, memungkinkan pengguna memilih dari berbagai mode yang menghasilkan penundaan percobaan ulang, termasuk mundur. Kode tersedia di GitHub saya lengkap dengan kasus uji berikut:Lihat
setRandomJokes
metode.sumber
Berdasarkan jawaban kjones berikut adalah versi Kotlin dari RxJava 2.x coba lagi dengan penundaan sebagai ekstensi. Ganti
Observable
untuk membuat ekstensi yang sama untukFlowable
.fun <T> Observable<T>.retryWithDelay(maxRetries: Int, retryDelayMillis: Int): Observable<T> { var retryCount = 0 return retryWhen { thObservable -> thObservable.flatMap { throwable -> if (++retryCount < maxRetries) { Observable.timer(retryDelayMillis.toLong(), TimeUnit.MILLISECONDS) } else { Observable.error(throwable) } } } }
Kemudian gunakan saja di observable
observable.retryWithDelay(3, 1000)
sumber
Single
juga?flatMap
harus digunakanFlowable.timer
danFlowable.error
meskipun fungsinyaSingle<T>.retryWithDelay
.Sekarang dengan RxJava versi 1.0+ Anda dapat menggunakan zipWith untuk mencoba ulang dengan penundaan.
Menambahkan modifikasi ke jawaban kjones .
Diubah
public class RetryWithDelay implements Func1<Observable<? extends Throwable>, Observable<?>> { private final int MAX_RETRIES; private final int DELAY_DURATION; private final int START_RETRY; /** * Provide number of retries and seconds to be delayed between retry. * * @param maxRetries Number of retries. * @param delayDurationInSeconds Seconds to be delays in each retry. */ public RetryWithDelay(int maxRetries, int delayDurationInSeconds) { MAX_RETRIES = maxRetries; DELAY_DURATION = delayDurationInSeconds; START_RETRY = 1; } @Override public Observable<?> call(Observable<? extends Throwable> observable) { return observable .delay(DELAY_DURATION, TimeUnit.SECONDS) .zipWith(Observable.range(START_RETRY, MAX_RETRIES), new Func2<Throwable, Integer, Integer>() { @Override public Integer call(Throwable throwable, Integer attempt) { return attempt; } }); } }
sumber
Jawaban yang sama seperti dari kjones tetapi diperbarui ke versi terbaru Untuk RxJava versi 2.x : ('io.reactivex.rxjava2: rxjava: 2.1.3')
public class RetryWithDelay implements Function<Flowable<Throwable>, Publisher<?>> { private final int maxRetries; private final long retryDelayMillis; private int retryCount; public RetryWithDelay(final int maxRetries, final int retryDelayMillis) { this.maxRetries = maxRetries; this.retryDelayMillis = retryDelayMillis; this.retryCount = 0; } @Override public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception { return throwableFlowable.flatMap(new Function<Throwable, Publisher<?>>() { @Override public Publisher<?> apply(Throwable throwable) throws Exception { if (++retryCount < maxRetries) { // When this Observable calls onNext, the original // Observable will be retried (i.e. re-subscribed). return Flowable.timer(retryDelayMillis, TimeUnit.MILLISECONDS); } // Max retries hit. Just pass the error along. return Flowable.error(throwable); } }); } }
Pemakaian:
// Tambahkan logika coba lagi ke observasi yang ada. // Coba lagi maksimal 3 kali dengan penundaan selama 2 detik.
observable .retryWhen(new RetryWithDelay(3, 2000));
sumber
Anda bisa menambahkan penundaan di Observable yang dikembalikan di retryWhen Operator
/** * Here we can see how onErrorResumeNext works and emit an item in case that an error occur in the pipeline and an exception is propagated */ @Test public void observableOnErrorResumeNext() { Subscription subscription = Observable.just(null) .map(Object::toString) .doOnError(failure -> System.out.println("Error:" + failure.getCause())) .retryWhen(errors -> errors.doOnNext(o -> count++) .flatMap(t -> count > 3 ? Observable.error(t) : Observable.just(null).delay(100, TimeUnit.MILLISECONDS)), Schedulers.newThread()) .onErrorResumeNext(t -> { System.out.println("Error after all retries:" + t.getCause()); return Observable.just("I save the world for extinction!"); }) .subscribe(s -> System.out.println(s)); new TestSubscriber((Observer) subscription).awaitTerminalEvent(500, TimeUnit.MILLISECONDS); }
Anda dapat melihat lebih banyak contoh di sini. https://github.com/politrons/reactive
sumber
Lakukan saja seperti ini:
Observable.just("") .delay(2, TimeUnit.SECONDS) //delay .flatMap(new Func1<String, Observable<File>>() { @Override public Observable<File> call(String s) { L.from(TAG).d("postAvatar="); File file = PhotoPickUtil.getTempFile(); if (file.length() <= 0) { throw new NullPointerException(); } return Observable.just(file); } }) .retry(6) .subscribe(new Action1<File>() { @Override public void call(File file) { postAvatar(file); } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { } });
sumber
Untuk versi Kotlin & RxJava1
class RetryWithDelay(private val MAX_RETRIES: Int, private val DELAY_DURATION_IN_SECONDS: Long) : Function1<Observable<out Throwable>, Observable<*>> { private val START_RETRY: Int = 1 override fun invoke(observable: Observable<out Throwable>): Observable<*> { return observable.delay(DELAY_DURATION_IN_SECONDS, TimeUnit.SECONDS) .zipWith(Observable.range(START_RETRY, MAX_RETRIES), object : Function2<Throwable, Int, Int> { override fun invoke(throwable: Throwable, attempt: Int): Int { return attempt } }) } }
sumber
(Kotlin) Saya sedikit meningkatkan kode dengan kemunduran eksponensial dan menerapkan pertahanan yang dipancarkan dari Observable.range ():
fun testOnRetryWithDelayExponentialBackoff() { val interval = 1 val maxCount = 3 val ai = AtomicInteger(1); val source = Observable.create<Unit> { emitter -> val attempt = ai.getAndIncrement() println("Subscribe ${attempt}") if (attempt >= maxCount) { emitter.onNext(Unit) emitter.onComplete() } emitter.onError(RuntimeException("Test $attempt")) } // Below implementation of "retryWhen" function, remove all "println()" for real code. val sourceWithRetry: Observable<Unit> = source.retryWhen { throwableRx -> throwableRx.doOnNext({ println("Error: $it") }) .zipWith(Observable.range(1, maxCount) .concatMap { Observable.just(it).delay(0, TimeUnit.MILLISECONDS) }, BiFunction { t1: Throwable, t2: Int -> t1 to t2 } ) .flatMap { pair -> if (pair.second >= maxCount) { Observable.error(pair.first) } else { val delay = interval * 2F.pow(pair.second) println("retry delay: $delay") Observable.timer(delay.toLong(), TimeUnit.SECONDS) } } } //Code to print the result in terminal. sourceWithRetry .doOnComplete { println("Complete") } .doOnError({ println("Final Error: $it") }) .blockingForEach { println("$it") } }
sumber
jika Anda perlu mencetak hitungan percobaan ulang, Anda dapat menggunakan contoh yang disediakan di halaman wiki Rxjava https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators
observable.retryWhen(errors -> // Count and increment the number of errors. errors.map(error -> 1).scan((i, j) -> i + j) .doOnNext(errorCount -> System.out.println(" -> query errors #: " + errorCount)) // Limit the maximum number of retries. .takeWhile(errorCount -> errorCount < retryCounts) // Signal resubscribe event after some delay. .flatMapSingle(errorCount -> Single.timer(errorCount, TimeUnit.SECONDS));
sumber