如果要根据值执行不同的操作,请考虑
flatMap
:
Single.fromCallable(() -> readingDao.getNextUploadBatch())
.subscribeOn(rxSchedulers.network)
.flatMapCompletable(readings -> {
if (readings.isEmpty()) {
jobFinished(job, false);
return Completable.complete();
}
return api.uploadSensorReadings(readings)
.doFinally(() -> jobFinished(job, !readingDao.isEmpty()))
.observeOn(rxSchedulers.database)
.doOnComplete(() -> readingDao.delete(readings.toTypedArray()))
})
.subscribe(() -> /* ignored */, error -> markCurrentReadingsAsNotUploading());