也许通过实现自定义过滤器?
private static class SequencePredicate implements Predicate<Integer> {
private final List<Integer> searchedSequence;
private LinkedList<Integer> remaining;
private SequencePredicate(final List<Integer> searchedSequence) {
this.searchedSequence = searchedSequence;
this.remaining = new LinkedList<>(searchedSequence);
}
@Override
public boolean test(final Integer integer) throws Exception {
boolean result = false;
if (remaining.isEmpty()) {
result = true;
} else if (remaining.getFirst().equals(integer)) {
remaining.removeFirst();
result = remaining.isEmpty();
} else if (remaining.size() < searchedSequence.size()) {
remaining = new LinkedList<>(searchedSequence);
}
return result;
}
}
public static void main(String[] args) {
Flowable<Integer> sequence = Flowable.just(3, 5, 1, 2, 6, 1, 2, 3, 8, 9);
// Flowable<Integer> sequence = Flowable.just(3, 5, 1, 2, 6, 1, 2, 8, 9, 10);
List<Integer> searchedSequence = Arrays.asList(1, 2, 3);
Flowable<Integer> delayFlowable = sequence.doOnNext(n -> System.out.println("before: " + n))
.filter(new SequencePredicate(searchedSequence))
.doOnNext(n -> System.out.println("after: " + n));
Flowable<Boolean> task = Flowable.just(true);
task.delaySubscription(delayFlowable)
.subscribe(n -> System.out.println("task: " + n),
Throwable::printStackTrace,
() -> System.out.println("task: that's the end!"));
Flowable.timer(10, SECONDS) // just for blocking the main thread
.blockingSubscribe();
}