我很想看看是否有不那么冗长的解决方案,但以下是我目前正在使用的解决方案:
我定义了一个“State”类,它保存偏移量,并委托给真正的实现。这充分利用了以下事实:
generate
是同步的。
private static class State {
private final int offset = 0;
private final AnimalFetcher delegate;
State(AnimalFetcher delegate) { this.delegate = delegate; }
void getNextValue(Emitter<String> emitter) {
delegate.getValue(offset)
.doOnSuccess(animal -> offset += animal.size())
.subscribe(emitter::onNext, emitter::onError);
}
}
Observable<String> getAllTheAnimals() {
return Observable.generate(
() -> new State(this),
State::getNextValue
);
}