代码之家  ›  专栏  ›  技术社区  ›  ericn

RXJava 2-如何在出错时取消无限流并进行处理?

  •  0
  • ericn  · 技术社区  · 6 年前

    我有下面的无限流,它每秒钟都在做一些事情。
    我想要的是在出错时停止流并处理它。
    我怎样才能做到?

    void doSomething() {
            Disposable disposable = execute(doSomethingInner(), 0L, TimeUnit.SECONDS, schedulerProvider.io(), someClass -> 1L).doOnError
                    (throwable -> {
                Timber.e(throwable, "error happened");// Never triggered
            })
                    .doOnNext(someClass -> Timber.i("doing the infinite stuff"))
                    .subscribe(Functions.emptyConsumer(), throwable -> {
                        Timber.e(throwable, "stop doing the infinite stuff");// Never triggered
                    });
        }
    
        Observable<SomeClass> doSomethingInner() {
            return Observable.error(new Exception("something went wrong"));
        }
    
        Observable<SomeClass> execute(Observable<SomeClass> source,
                                      long delayInterval,
                                      TimeUnit timeUnit,
                                      Scheduler scheduler,
                                      Function<SomeClass, Long> interval) {
            return Observable.defer(new Callable<ObservableSource<SomeClass>>() {
                long currentInterval = delayInterval;
    
                @Override
                public ObservableSource<SomeClass> call() {
                    return Single.timer(currentInterval, timeUnit, scheduler)
                            .flatMapObservable(o -> source)
                            .doOnNext(t -> currentInterval = interval.apply(t));
                }
            })
                    .repeat()
                    .retry();
        }
    
    1 回复  |  直到 6 年前
        1
  •  2
  •   ericn    6 年前

    retry()

    • retry(Predicate<Throwable>)

    onError() subscribe()