代码之家  ›  专栏  ›  技术社区  ›  Heath Borders

创建一个observate,它在subscribe上生成一个单独的,但是它停止并重用一个特定的值

  •  0
  • Heath Borders  · 技术社区  · 6 年前

    我在用 Retrofit 进行api调用,它返回 Single ,我使用 onErrorReturn 将任何异常转换为默认对象。我想让消费者看到当前值,但是如果当前值是默认对象,我想尝试重新查询api并将结果发送出去。更复杂的是,我可能有多个订户。

    所以,我知道 改造 单身 必须转换为 Observable 不仅仅是一个 onNext / onComplete 像个普通人 Single.toObservable 是的,但我不知道如何重新查询api,并使用 单身 改造 是的。

    现在,我知道:

    fun request(): Observable<Foo> {
      if (behaviorSubject.value == defaultObject) {
        API
          .request()
          .onErrorReturn(defaultObject)
          .subscribe(behaviorSubject)
      }
      return behaviorSubject
    }
    

    但我知道 subscribe 违反了RX链接,所以我正在想办法摆脱它。

    1 回复  |  直到 6 年前
        1
  •  0
  •   laenger    5 年前

    谢谢你的有趣情节。这里有一个解决方案,我相信可以满足您的需求。它比您的解决方案长,但在涉及多个订阅时应该是安全的。

    // subscribe to this observable with one or more subscribers
    val requestObservable = replayAndRetry(API.request(), defaultObject)
    
    private fun <T> replayAndRetry(request: Single<T>, defaultValue: T): Observable<T> {
        val responses = BehaviorSubject.create<T>()
    
        val initialRequest = request
                .onErrorReturnItem(defaultValue)
                .doOnSuccess(responses::onNext)
                .ignoreElement()
                .cache() // run the initial request at most once
    
        val retryWhenNecessary = Maybe
                .fromCallable { if (responses.value == defaultValue) true else null }
                .flatMapCompletable { request
                        .doOnSuccess(responses::onNext)
                        .ignoreElement()
                        .onErrorComplete() // subject already has the default value
                }
                .toObservable<T>().share() // avoid multiple simultaneous retries
    
        return responses // source for all responses
                .mergeWith(initialRequest) // will run once and then complete
                .mergeWith(retryWhenNecessary) // will check for default item on every subscription
                                               // will not run simultaneous retries
    }