rxjava: Dapatkah saya menggunakan retry () tetapi dengan penundaan?

94

Saya menggunakan rxjava di aplikasi Android saya untuk menangani permintaan jaringan secara asinkron. Sekarang saya ingin mencoba lagi permintaan jaringan yang gagal hanya setelah waktu tertentu berlalu.

Apakah ada cara untuk menggunakan retry () pada Observable tetapi mencoba ulang hanya setelah penundaan tertentu?

Adakah cara untuk memberi tahu Observable yang saat ini sedang dicoba ulang (bukan mencoba untuk pertama kali)?

Saya telah melihat debounce () / throttleWithTimeout () tetapi mereka tampaknya melakukan sesuatu yang berbeda.

Edit:

Saya rasa saya menemukan satu cara untuk melakukannya, tetapi saya akan tertarik pada konfirmasi bahwa ini adalah cara yang benar untuk melakukannya atau cara lain yang lebih baik.

Apa yang saya lakukan adalah ini: Dalam metode call () dari Observable.OnSubscribe saya, sebelum saya memanggil metode Subscribers onError (), saya membiarkan Thread tidur selama waktu yang diinginkan. Jadi, untuk mencoba lagi setiap 1000 milidetik, saya melakukan sesuatu seperti ini:

@Override
public void call(Subscriber<? super List<ProductNode>> subscriber) {
    try {
        Log.d(TAG, "trying to load all products with pid: " + pid);
        subscriber.onNext(productClient.getProductNodesForParentId(pid));
        subscriber.onCompleted();
    } catch (Exception e) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e.printStackTrace();
        }
        subscriber.onError(e);
    }
}

Karena metode ini tetap berjalan di thread IO, metode ini tidak memblokir UI. Satu-satunya masalah yang dapat saya lihat adalah bahwa bahkan kesalahan pertama dilaporkan dengan penundaan sehingga penundaan tetap ada meskipun tidak ada retry (). Saya ingin lebih baik jika penundaan tidak diterapkan setelah kesalahan tetapi sebelum percobaan ulang (tapi jelas tidak sebelum percobaan pertama).

david.mihola
sumber

Jawaban:

173

Anda dapat menggunakan retryWhen()operator untuk menambahkan logika coba lagi ke Observable mana pun.

Kelas berikut berisi logika coba lagi:

RxJava 2.x

public class RetryWithDelay implements Function<Observable<? extends Throwable>, Observable<?>> {
    private final int maxRetries;
    private final int retryDelayMillis;
    private int retryCount;

    public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
        this.maxRetries = maxRetries;
        this.retryDelayMillis = retryDelayMillis;
        this.retryCount = 0;
    }

    @Override
    public Observable<?> apply(final Observable<? extends Throwable> attempts) {
        return attempts
                .flatMap(new Function<Throwable, Observable<?>>() {
                    @Override
                    public Observable<?> apply(final Throwable throwable) {
                        if (++retryCount < maxRetries) {
                            // When this Observable calls onNext, the original
                            // Observable will be retried (i.e. re-subscribed).
                            return Observable.timer(retryDelayMillis,
                                    TimeUnit.MILLISECONDS);
                        }

                        // Max retries hit. Just pass the error along.
                        return Observable.error(throwable);
                    }
                });
    }
}

RxJava 1.x

public class RetryWithDelay implements
        Func1<Observable<? extends Throwable>, Observable<?>> {

    private final int maxRetries;
    private final int retryDelayMillis;
    private int retryCount;

    public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
        this.maxRetries = maxRetries;
        this.retryDelayMillis = retryDelayMillis;
        this.retryCount = 0;
    }

    @Override
    public Observable<?> call(Observable<? extends Throwable> attempts) {
        return attempts
                .flatMap(new Func1<Throwable, Observable<?>>() {
                    @Override
                    public Observable<?> call(Throwable throwable) {
                        if (++retryCount < maxRetries) {
                            // When this Observable calls onNext, the original
                            // Observable will be retried (i.e. re-subscribed).
                            return Observable.timer(retryDelayMillis,
                                    TimeUnit.MILLISECONDS);
                        }

                        // Max retries hit. Just pass the error along.
                        return Observable.error(throwable);
                    }
                });
    }
}

Pemakaian:

// Add retry logic to existing observable.
// Retry max of 3 times with a delay of 2 seconds.
observable
    .retryWhen(new RetryWithDelay(3, 2000));
kjones
sumber
2
Error:(73, 20) error: incompatible types: RetryWithDelay cannot be converted to Func1<? super Observable<? extends Throwable>,? extends Observable<?>>
Nima G
3
@nima Saya memiliki masalah yang sama, ubah RetryWithDelayke ini: pastebin.com/6SiZeKnC
user1480019
2
Sepertinya operator RxJava retryWhen telah berubah sejak saya pertama kali menulis ini. Saya akan memperbarui jawabannya.
kjones
3
Anda harus memperbarui jawaban ini untuk mematuhi RxJava 2
Vishnu M.
1
bagaimana versi rxjava 2 mencari kotlin?
Gabriel Sanmartin
19

Terinspirasi oleh jawaban Paul , dan jika Anda tidak peduli dengan retryWhenmasalah yang dikemukakan oleh Abhijit Sarkar , cara termudah untuk menunda langganan ulang dengan rxJava2 unconditionnaly adalah:

source.retryWhen(throwables -> throwables.delay(1, TimeUnit.SECONDS))

Anda mungkin ingin melihat lebih banyak contoh dan penjelasan tentang retryWhen dan repeatWhen .

McX
sumber
15

Contoh ini bekerja dengan jxjava 2.2.2:

Coba lagi tanpa penundaan:

Single.just(somePaylodData)
   .map(data -> someConnection.send(data))
   .retry(5)
   .doOnSuccess(status -> log.info("Yay! {}", status);

Coba lagi dengan penundaan:

Single.just(somePaylodData)
   .map(data -> someConnection.send(data))
   .retryWhen((Flowable<Throwable> f) -> f.take(5).delay(300, TimeUnit.MILLISECONDS))
   .doOnSuccess(status -> log.info("Yay! {}", status)
   .doOnError((Throwable error) 
                -> log.error("I tried five times with a 300ms break" 
                             + " delay in between. But it was in vain."));

Single sumber kami gagal jika someConnection.send () gagal. Ketika itu terjadi, kegagalan yang dapat diamati di dalam retryWhen mengeluarkan kesalahan. Kami menunda emisi itu sebanyak 300 md dan mengirimkannya kembali untuk menandakan percobaan ulang. take (5) menjamin bahwa signaling observable kami akan berhenti setelah kami menerima lima kesalahan. retryWhen melihat penghentian dan tidak mencoba lagi setelah kegagalan kelima.

Erunafailaro
sumber
9

Ini adalah solusi yang didasarkan pada potongan Ben Christensen saya melihat, RetryWhen Contoh , dan RetryWhenTestsConditional (saya harus mengubah n.getThrowable()untuk nuntuk itu untuk bekerja). Saya menggunakan evant / gradle-retrolambda untuk membuat notasi lambda berfungsi di Android, tetapi Anda tidak harus menggunakan lambda (meskipun sangat disarankan). Untuk penundaan, saya menerapkan back-off eksponensial, tetapi Anda dapat memasukkan logika mundur apa pun yang Anda inginkan di sana. Untuk kelengkapan saya tambahkan operator subscribeOndan observeOn. Saya menggunakan ReactiveX / RxAndroid untuk AndroidSchedulers.mainThread().

int ATTEMPT_COUNT = 10;

public class Tuple<X, Y> {
    public final X x;
    public final Y y;

    public Tuple(X x, Y y) {
        this.x = x;
        this.y = y;
    }
}


observable
    .subscribeOn(Schedulers.io())
    .retryWhen(
            attempts -> {
                return attempts.zipWith(Observable.range(1, ATTEMPT_COUNT + 1), (n, i) -> new Tuple<Throwable, Integer>(n, i))
                .flatMap(
                        ni -> {
                            if (ni.y > ATTEMPT_COUNT)
                                return Observable.error(ni.x);
                            return Observable.timer((long) Math.pow(2, ni.y), TimeUnit.SECONDS);
                        });
            })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(subscriber);
david-hoze
sumber
2
ini terlihat elegan tapi saya tidak menggunakan fungsi lamba, bagaimana saya bisa menulis tanpa lambas? @ amitai-hoze
ericn
juga bagaimana cara menulisnya sehingga saya dapat menggunakan kembali fungsi coba lagi ini untuk Observableobjek lain ?
ericn
Tidak masalah, saya telah menggunakan kjonessolusi dan itu bekerja dengan sempurna untuk saya, terima kasih
ericn
8

alih-alih menggunakan MyRequestObservable.retry saya menggunakan fungsi pembungkus retryObservable (MyRequestObservable, retrycount, detik) yang mengembalikan Observable baru yang menangani tipuan untuk penundaan sehingga saya bisa melakukannya

retryObservable(restApi.getObservableStuff(), 3, 30)
    .subscribe(new Action1<BonusIndividualList>(){
        @Override
        public void call(BonusIndividualList arg0) 
        {
            //success!
        }
    }, 
    new Action1<Throwable>(){
        @Override
        public void call(Throwable arg0) { 
           // failed after the 3 retries !
        }}); 


// wrapper code
private static <T> Observable<T> retryObservable(
        final Observable<T> requestObservable, final int nbRetry,
        final long seconds) {

    return Observable.create(new Observable.OnSubscribe<T>() {

        @Override
        public void call(final Subscriber<? super T> subscriber) {
            requestObservable.subscribe(new Action1<T>() {

                @Override
                public void call(T arg0) {
                    subscriber.onNext(arg0);
                    subscriber.onCompleted();
                }
            },

            new Action1<Throwable>() {
                @Override
                public void call(Throwable error) {

                    if (nbRetry > 0) {
                        Observable.just(requestObservable)
                                .delay(seconds, TimeUnit.SECONDS)
                                .observeOn(mainThread())
                                .subscribe(new Action1<Observable<T>>(){
                                    @Override
                                    public void call(Observable<T> observable){
                                        retryObservable(observable,
                                                nbRetry - 1, seconds)
                                                .subscribe(subscriber);
                                    }
                                });
                    } else {
                        // still fail after retries
                        subscriber.onError(error);
                    }

                }
            });

        }

    });

}
Alexis Contour
sumber
Saya sangat menyesal tidak menanggapi sebelumnya - entah bagaimana saya melewatkan pemberitahuan dari SO bahwa ada jawaban untuk pertanyaan saya ... - Saya harus menerima jawaban karena ini lebih merupakan solusi daripada jawaban langsung. Tapi saya rasa, karena Anda memberikan solusi, jawaban untuk pertanyaan awal saya adalah "tidak, Anda tidak bisa" ...
david.mihola
5

retryWhenadalah operator yang rumit, bahkan mungkin buggy. Dokumen resmi dan setidaknya satu jawaban di sini menggunakan rangeoperator, yang akan gagal jika tidak ada percobaan ulang yang harus dilakukan. Lihat diskusi saya dengan anggota ReactiveX David Karnok.

Saya meningkatkan jawaban kjones dengan mengubah flatMapke concatMapdan dengan menambahkan RetryDelayStrategykelas. flatMaptidak menjaga urutan emisi sementara concatMapmelakukannya, yang penting untuk penundaan dengan back-off. Itu RetryDelayStrategy, seperti namanya, memungkinkan pengguna memilih dari berbagai mode yang menghasilkan penundaan percobaan ulang, termasuk mundur. Kode tersedia di GitHub saya lengkap dengan kasus uji berikut:

  1. Berhasil pada percobaan pertama (tidak ada percobaan ulang)
  2. Gagal setelah 1 percobaan ulang
  3. Mencoba mencoba lagi 3 kali tetapi berhasil di urutan ke-2 sehingga tidak mengulangi ke-3 kali
  4. Berhasil pada percobaan ketiga

Lihat setRandomJokesmetode.

Abhijit Sarkar
sumber
5

Berdasarkan jawaban kjones berikut adalah versi Kotlin dari RxJava 2.x coba lagi dengan penundaan sebagai ekstensi. Ganti Observableuntuk membuat ekstensi yang sama untuk Flowable.

fun <T> Observable<T>.retryWithDelay(maxRetries: Int, retryDelayMillis: Int): Observable<T> {
    var retryCount = 0

    return retryWhen { thObservable ->
        thObservable.flatMap { throwable ->
            if (++retryCount < maxRetries) {
                Observable.timer(retryDelayMillis.toLong(), TimeUnit.MILLISECONDS)
            } else {
                Observable.error(throwable)
            }
        }
    }
}

Kemudian gunakan saja di observable observable.retryWithDelay(3, 1000)

JuliusScript
sumber
Apakah mungkin untuk menggantinya dengan Singlejuga?
Papps
2
@Papps Ya itu harus bekerja, hanya perhatikan bahwa flatMapharus digunakan Flowable.timerdan Flowable.error meskipun fungsinya Single<T>.retryWithDelay.
JuliusScript
3

Sekarang dengan RxJava versi 1.0+ Anda dapat menggunakan zipWith untuk mencoba ulang dengan penundaan.

Menambahkan modifikasi ke jawaban kjones .

Diubah

public class RetryWithDelay implements 
                            Func1<Observable<? extends Throwable>, Observable<?>> {

    private final int MAX_RETRIES;
    private final int DELAY_DURATION;
    private final int START_RETRY;

    /**
     * Provide number of retries and seconds to be delayed between retry.
     *
     * @param maxRetries             Number of retries.
     * @param delayDurationInSeconds Seconds to be delays in each retry.
     */
    public RetryWithDelay(int maxRetries, int delayDurationInSeconds) {
        MAX_RETRIES = maxRetries;
        DELAY_DURATION = delayDurationInSeconds;
        START_RETRY = 1;
    }

    @Override
    public Observable<?> call(Observable<? extends Throwable> observable) {
        return observable
                .delay(DELAY_DURATION, TimeUnit.SECONDS)
                .zipWith(Observable.range(START_RETRY, MAX_RETRIES), 
                         new Func2<Throwable, Integer, Integer>() {
                             @Override
                             public Integer call(Throwable throwable, Integer attempt) {
                                  return attempt;
                             }
                         });
    }
}
Omkar
sumber
3

Jawaban yang sama seperti dari kjones tetapi diperbarui ke versi terbaru Untuk RxJava versi 2.x : ('io.reactivex.rxjava2: rxjava: 2.1.3')

public class RetryWithDelay implements Function<Flowable<Throwable>, Publisher<?>> {

    private final int maxRetries;
    private final long retryDelayMillis;
    private int retryCount;

    public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
        this.maxRetries = maxRetries;
        this.retryDelayMillis = retryDelayMillis;
        this.retryCount = 0;
    }

    @Override
    public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception {
        return throwableFlowable.flatMap(new Function<Throwable, Publisher<?>>() {
            @Override
            public Publisher<?> apply(Throwable throwable) throws Exception {
                if (++retryCount < maxRetries) {
                    // When this Observable calls onNext, the original
                    // Observable will be retried (i.e. re-subscribed).
                    return Flowable.timer(retryDelayMillis,
                            TimeUnit.MILLISECONDS);
                }

                // Max retries hit. Just pass the error along.
                return Flowable.error(throwable);
            }
        });
    }
}

Pemakaian:

// Tambahkan logika coba lagi ke observasi yang ada. // Coba lagi maksimal 3 kali dengan penundaan selama 2 detik.

observable
    .retryWhen(new RetryWithDelay(3, 2000));
Mihuilk
sumber
1

Anda bisa menambahkan penundaan di Observable yang dikembalikan di retryWhen Operator

          /**
 * Here we can see how onErrorResumeNext works and emit an item in case that an error occur in the pipeline and an exception is propagated
 */
@Test
public void observableOnErrorResumeNext() {
    Subscription subscription = Observable.just(null)
                                          .map(Object::toString)
                                          .doOnError(failure -> System.out.println("Error:" + failure.getCause()))
                                          .retryWhen(errors -> errors.doOnNext(o -> count++)
                                                                     .flatMap(t -> count > 3 ? Observable.error(t) : Observable.just(null).delay(100, TimeUnit.MILLISECONDS)),
                                                     Schedulers.newThread())
                                          .onErrorResumeNext(t -> {
                                              System.out.println("Error after all retries:" + t.getCause());
                                              return Observable.just("I save the world for extinction!");
                                          })
                                          .subscribe(s -> System.out.println(s));
    new TestSubscriber((Observer) subscription).awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
}

Anda dapat melihat lebih banyak contoh di sini. https://github.com/politrons/reactive

paul
sumber
0

Lakukan saja seperti ini:

                  Observable.just("")
                            .delay(2, TimeUnit.SECONDS) //delay
                            .flatMap(new Func1<String, Observable<File>>() {
                                @Override
                                public Observable<File> call(String s) {
                                    L.from(TAG).d("postAvatar=");

                                    File file = PhotoPickUtil.getTempFile();
                                    if (file.length() <= 0) {
                                        throw new NullPointerException();
                                    }
                                    return Observable.just(file);
                                }
                            })
                            .retry(6)
                            .subscribe(new Action1<File>() {
                                @Override
                                public void call(File file) {
                                    postAvatar(file);
                                }
                            }, new Action1<Throwable>() {
                                @Override
                                public void call(Throwable throwable) {

                                }
                            });
Allen Vork
sumber
0

Untuk versi Kotlin & RxJava1

class RetryWithDelay(private val MAX_RETRIES: Int, private val DELAY_DURATION_IN_SECONDS: Long)
    : Function1<Observable<out Throwable>, Observable<*>> {

    private val START_RETRY: Int = 1

    override fun invoke(observable: Observable<out Throwable>): Observable<*> {
        return observable.delay(DELAY_DURATION_IN_SECONDS, TimeUnit.SECONDS)
            .zipWith(Observable.range(START_RETRY, MAX_RETRIES),
                object : Function2<Throwable, Int, Int> {
                    override fun invoke(throwable: Throwable, attempt: Int): Int {
                        return attempt
                    }
                })
    }
}
Cody
sumber
0

(Kotlin) Saya sedikit meningkatkan kode dengan kemunduran eksponensial dan menerapkan pertahanan yang dipancarkan dari Observable.range ():

    fun testOnRetryWithDelayExponentialBackoff() {
    val interval = 1
    val maxCount = 3
    val ai = AtomicInteger(1);
    val source = Observable.create<Unit> { emitter ->
        val attempt = ai.getAndIncrement()
        println("Subscribe ${attempt}")
        if (attempt >= maxCount) {
            emitter.onNext(Unit)
            emitter.onComplete()
        }
        emitter.onError(RuntimeException("Test $attempt"))
    }

    // Below implementation of "retryWhen" function, remove all "println()" for real code.
    val sourceWithRetry: Observable<Unit> = source.retryWhen { throwableRx ->
        throwableRx.doOnNext({ println("Error: $it") })
                .zipWith(Observable.range(1, maxCount)
                        .concatMap { Observable.just(it).delay(0, TimeUnit.MILLISECONDS) },
                        BiFunction { t1: Throwable, t2: Int -> t1 to t2 }
                )
                .flatMap { pair ->
                    if (pair.second >= maxCount) {
                        Observable.error(pair.first)
                    } else {
                        val delay = interval * 2F.pow(pair.second)
                        println("retry delay: $delay")
                        Observable.timer(delay.toLong(), TimeUnit.SECONDS)
                    }
                }
    }

    //Code to print the result in terminal.
    sourceWithRetry
            .doOnComplete { println("Complete") }
            .doOnError({ println("Final Error: $it") })
            .blockingForEach { println("$it") }
}
ultraon
sumber
0

jika Anda perlu mencetak hitungan percobaan ulang, Anda dapat menggunakan contoh yang disediakan di halaman wiki Rxjava https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators

observable.retryWhen(errors ->
    // Count and increment the number of errors.
    errors.map(error -> 1).scan((i, j) -> i + j)  
       .doOnNext(errorCount -> System.out.println(" -> query errors #: " + errorCount))
       // Limit the maximum number of retries.
       .takeWhile(errorCount -> errorCount < retryCounts)   
       // Signal resubscribe event after some delay.
       .flatMapSingle(errorCount -> Single.timer(errorCount, TimeUnit.SECONDS));
Angel Koh
sumber