代码之家  ›  专栏  ›  技术社区  ›  Daksh

条件链单链和完全链

  •  3
  • Daksh  · 技术社区  · 6 年前

    我的Rx呼叫总体工作流程应如下所示(无论当前的Rx代码如何):

    • 从中获取运动传感器读数列表 Room Dao (目的是将它们上载到REST API)。我用的是 Single<List<Reading>> 为了这个
    • 如果那样的话 readings 列表为空,然后执行 jobFinished() 回调和 在此之后不执行任何操作
    • 如果 读数 不为空,则将网络调用链接到此 Single . 网络调用返回 Completable
    • 这个 仅有一个的 从不在逻辑上抛出错误,因为它要么获取空的,要么获取非空的 读数 列表
    • 当整个Rx呼叫链终止时,执行 作业已完成() 回调函数
    • 在整个接收呼叫链成功后,删除这些 reading s来自 Dao公司
    • 关于 仅有一个的 ,但 可完成的 ,更新 Dao公司

    我当前的代码如下:

      Single.create<List<Reading>> {
            readings = readingDao.getNextUploadBatch()
    
            if (readings.isEmpty()) {
                jobFinished(job, false)
                return@create
            }
    
            it.onSuccess(readings)
        }
                .flatMapCompletable { api.uploadSensorReadings(it) }
                .doOnTerminate {
                    jobFinished(job, !readingDao.isEmpty())
                }
                .subscribeOn(rxSchedulers.network)
                .observeOn(rxSchedulers.database)
                .subscribe(
                        {
                            readingDao.delete(*readings.toTypedArray())
                        },
                        {
                            markCurrentReadingsAsNotUploading()
                        }
                )
    



    上述代码的逻辑问题是(尚未在运行时对其进行测试,但已编译):

    • 我想从 flatMapCompletable 如果 读数 列表为空
    • 我不想 doOnTerminate 若要执行 读数 为空
    • 我不想要 onComplete 零件(第一个 {} 块)的 subscribe 执行,除非 读数 为非空,并且 可完成的 也获得了成功
    • 我不想要 onError 第二部分 {} 块)的 订阅 执行,除非 读数 为非空,并且 可完成的 失败

    我不知道如何将我的工作流程作为一个高效整洁的Rx呼叫链来实施。欢迎提出任何建议!

    1 回复  |  直到 6 年前
        1
  •  5
  •   akarnokd    6 年前

    如果要根据值执行不同的操作,请考虑 flatMap :

    Single.fromCallable(() -> readingDao.getNextUploadBatch())
    .subscribeOn(rxSchedulers.network)
    .flatMapCompletable(readings -> {
        if (readings.isEmpty()) {
            jobFinished(job, false);
            return Completable.complete();
        }
        return api.uploadSensorReadings(readings)
               .doFinally(() -> jobFinished(job, !readingDao.isEmpty()))
               .observeOn(rxSchedulers.database)
               .doOnComplete(() -> readingDao.delete(readings.toTypedArray()))
    })
    .subscribe(() -> /* ignored */, error -> markCurrentReadingsAsNotUploading());