代码之家  ›  专栏  ›  技术社区  ›  Victor Dmitrienko

RxJava解Bounce运算符取决于消息

  •  1
  • Victor Dmitrienko  · 技术社区  · 6 年前

    我学习RxJava并试图理解如何实现非标准的反应式“debouce”逻辑。依赖于消息,新的操作员必须延迟某种消息,或者如果另一种类型的消息从observable到达,则跳过它。

    Debouce only A-messages or forget about it if another message arrived

    请帮我整理一下逻辑。

    2 回复  |  直到 6 年前
        1
  •  3
  •   akarnokd    6 年前

    这需要一个非平凡的运算符组合:

    public static <T> ObservableTransformer<T, T> debounceOnly(
            Predicate<? super T> condition, long time, 
            TimeUnit unit, Scheduler scheduler) {
        return o -> o.publish(f ->
            f.concatMapEager(v -> {
                if (condition.test(v)) {
                    return Observable.just(v).delay(time, unit, scheduler).takeUntil(f);
                }
                return Observable.just(v);
            })
        );
    }
    
    
    @Test
    public void test() {
        PublishSubject<String> subject = PublishSubject.create();
    
        TestScheduler sch = new TestScheduler();
    
        subject
        .compose(debounceOnly(v -> v.startsWith("A"), 
             100, TimeUnit.MILLISECONDS, sch))
        .subscribe(System.out::println, Throwable::printStackTrace, 
             () -> System.out.println("Done"));
    
        subject.onNext("A1");
    
        sch.advanceTimeBy(100, TimeUnit.MILLISECONDS);
    
        subject.onNext("B1");
        sch.advanceTimeBy(1, TimeUnit.MILLISECONDS);
    
        subject.onNext("C1");
        sch.advanceTimeBy(1, TimeUnit.MILLISECONDS);
    
        subject.onNext("A2");
        sch.advanceTimeBy(50, TimeUnit.MILLISECONDS);
    
        subject.onNext("A3");
        sch.advanceTimeBy(100, TimeUnit.MILLISECONDS);
    
        subject.onNext("A4");
        sch.advanceTimeBy(50, TimeUnit.MILLISECONDS);
    
        subject.onNext("B2");
        sch.advanceTimeBy(100, TimeUnit.MILLISECONDS);
    
        subject.onNext("C2");
        sch.advanceTimeBy(100, TimeUnit.MILLISECONDS);
    
        subject.onComplete();
    }
    
        2
  •  0
  •   Victor Dmitrienko    6 年前

    RxJava 1.2.10的Kotlin代码:

    val sequence = listOf(
                Pair(0, "A"),  // delay
                Pair(3, "B"),  // immediately
                Pair(4, "C"),  // immediately
                Pair(5, "A"),  // skip by next A
                Pair(6, "A"),  // delay
                Pair(9, "A"),  // skip by next B
                Pair(10, "B"), // immediately
                Pair(11, "C")  // immediately
        )
    
        val startTime = LocalDateTime.now()
        Observable
                .from(sequence)
                .flatMap { tm ->
                    Observable.just(tm.second)
                            .delay(tm.first.toLong(), TimeUnit.SECONDS)
                }
                .compose { o ->
                    o.publish { f ->
                        f.concatMap { v ->
                            if (v == "A")
                                Observable.just(v).delay(2, TimeUnit.SECONDS).takeUntil(f)
                            else
                                Observable.just(v)
                        }
                    }
                }
                .toBlocking()
                .subscribe {
                            val currentTime = LocalDateTime.now()
                            val sec = currentTime.toEpochSecond(ZoneOffset.UTC) - startTime.toEpochSecond(ZoneOffset.UTC)
                            println("$sec - $it")
                        }