代码之家  ›  专栏  ›  技术社区  ›  manabreak

如何在另一个发布服务器发出特定序列之前延迟订阅?

  •  2
  • manabreak  · 技术社区  · 6 年前

    假设我有一个 Flowable<Integer> sequence 它发出编码事件。另一个 Flowable<Boolean> task sequence 发出一个特定的序列(比如, 1 , 2 , 3

    目前,我有 task

    task = task.delaySubscription(sequence.filter(i -> i == 42));
    

    我试着用 buffer(3) ,但在发射值的总数不能与3整除的情况下,它不起作用:

    task = task.delaySubscription(sequence.buffer(3).filter(vals -> vals.get(0) == 1 && vals.get(1) == 2 && vals.get(2) == 3));
    
    // This fails
    sequence.onNext(42);
    sequence.onNext(1);
    sequence.onNext(2);
    sequence.onNext(3);
    task.test().assertValueCount(1).assertValue(true);
    
    // While this works
    sequence.onNext(1);
    sequence.onNext(2);
    sequence.onNext(3);
    task.test().assertValueCount(1).assertValue(true);
    

    我需要这两个案例来工作;唯一重要的是最近发出的三个项目。

    1 回复  |  直到 6 年前
        1
  •  0
  •   ctranxuan    5 年前

    也许通过实现自定义过滤器?

    private static class SequencePredicate implements Predicate<Integer> {
        private final List<Integer> searchedSequence;
        private LinkedList<Integer> remaining;
    
        private SequencePredicate(final List<Integer> searchedSequence) {
            this.searchedSequence = searchedSequence;
            this.remaining = new LinkedList<>(searchedSequence);
        }
    
        @Override
        public boolean test(final Integer integer) throws Exception {
            boolean result = false;
    
            if (remaining.isEmpty()) {
                result = true;
            } else if (remaining.getFirst().equals(integer)) {
                remaining.removeFirst();
                result = remaining.isEmpty();
            } else if (remaining.size() < searchedSequence.size()) {
                remaining = new LinkedList<>(searchedSequence);
            }
            return result;
        }
    }
    
    public static void main(String[] args) {
        Flowable<Integer> sequence = Flowable.just(3, 5, 1, 2, 6, 1, 2, 3, 8, 9);
        // Flowable<Integer> sequence = Flowable.just(3, 5, 1, 2, 6, 1, 2, 8, 9, 10);
        List<Integer> searchedSequence = Arrays.asList(1, 2, 3);
    
        Flowable<Integer> delayFlowable = sequence.doOnNext(n -> System.out.println("before: " + n))
                                                  .filter(new SequencePredicate(searchedSequence))
                                                  .doOnNext(n -> System.out.println("after: " + n));
    
        Flowable<Boolean> task = Flowable.just(true);
        task.delaySubscription(delayFlowable)
            .subscribe(n -> System.out.println("task: " + n),
                       Throwable::printStackTrace,
                       () -> System.out.println("task: that's the end!"));
    
        Flowable.timer(10, SECONDS) // just for blocking the main thread
                .blockingSubscribe();
    }