代码之家  ›  专栏  ›  技术社区  ›  Ofek Regev

rxjava在retrywhen on completeable之后完成

  •  0
  • Ofek Regev  · 技术社区  · 6 年前

    我在completeable上使用retrywhen运算符,有没有方法告诉它从retry flowable完成? 像这样的东西-

    PublishSubject<?> retrySubject = PublishSubject.create();
    public void someFunction() {
        someCompletable.retryWhen(new Function<Flowable<Throwable>, Publisher<?>>() {
            @Override
            public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception {
                return throwableFlowable.flatMap(throwable -> retrySubject.toFlowable(BackpressureStrategy.MISSING));
            }
        }).subscribe();
    }
    
    public void ignoreError(){
        retrySubject.onComplete();
    }
    
    0 回复  |  直到 6 年前
        1
  •  1
  •   akarnokd    6 年前

    你不能阻止 flatMap 给它一个空的来源。同时,每一个错误都会导致越来越多的观察者订阅导致内存泄漏的主题。

    使用 takeUntil 要通过另一个源的帮助停止序列,请执行以下操作:

    PublishProcessor<Throwable> stopProcessor = PublishProcessor.create();
    
    source.retryWhen(errors -> 
        errors.takeUntil(
            stopProcessor
        )
        .flatMap(error -> Flowable.timer(1, TimeUnit.SECONDS))
    )
    
    stopProcessor.onComplete();
    

    编辑 如果要重用同一主题,可以在停止路径上禁止项目:

    PublishProcessor<Integer> stopProcessor = PublishProcessor.create();
    
    source.retryWhen(errors -> 
        errors.takeUntil(
            stopProcessor.ignoreElements().toFlowable()
        )
        .flatMap(error -> stopProcessor)
    )
    
    // retry
    stopProcessor.onNext(1);
    
    // stop
    stopProcessor.onComplete();