代码之家  ›  专栏  ›  技术社区  ›  levant pied

Mono.在发布服务器完成前触发时

  •  0
  • levant pied  · 技术社区  · 5 年前

    样本代码:

    Flux<Integer> fluxSrc = Flux.<Integer> create(e -> {
       e.next(1);
    
       try {
          Thread.sleep(500);
       } catch (InterruptedException e1) {
          throw new RuntimeException(e1);
       }
    
       e.complete();
    })
       .publishOn(Schedulers.single())
       .publish().autoConnect(2);
    
    Flux<Integer> fluxA = fluxSrc
       .publishOn(Schedulers.single())
       .map(j -> 10 + j);
    
    fluxA.subscribe(System.out::println);
    
    Mono<Integer> monoB = fluxSrc
       .publishOn(Schedulers.single())
       .reduce(20, (j, k) -> {
          try {
             Thread.sleep(1000);
          } catch (InterruptedException e1) {
             throw new RuntimeException(e1);
          }
          return j + k;
       });
    
    monoB.subscribe(System.out::println);
    
    Mono.when(fluxA, monoB)
       .block();
    
    System.out.println("After");
    

    这将产生以下输出:

    11
    After
    21
    

    为什么不等待两个出版商呢?( fluxA monoB )要完成吗?我应该如何构造代码,以确保所有发布者在 After 到达了吗?

    0 回复  |  直到 5 年前
        1
  •  1
  •   szurawar    5 年前

    通过使用 .publish() ,请 fluxSrc 变成热通量。考虑:

    另一方面,热门出版商不依赖于 订户。他们可能会马上开始发布数据 每当新的订阅服务器出现时(在这种情况下 所述用户将只看到在其之后发出的新元素 已订阅)。对于热门出版商来说,以前确实发生过一些事情 你订阅。

    ( https://projectreactor.io/docs/core/release/reference/#reactor.hotCold )

    解决它的一个方法是摆脱 publish 在冷流中操作。另一个是改变 .autoConnect(2); .autoConnect(3); -这是因为您希望在第三次订阅时开始处理数据- Mono.when(fluxA, monoB).block(); 已到达(前一个是 fluxA.subscribe monoB.subscribe )。

    编辑 以下内容: When 确实等待源完成,但它从以前的子脚本收到了一个完整的信号。

    可能发生的是:

    1. 通量A由 fluxA.subscribe(System.out::println); ,发出11并打印。
    2. 通量B由 monoB.subscribe(System.out::println); 开始减产。
    3. Mono.when 被订阅(触发“多播”——通量被第二次订阅)。
    4. 开始减量,结果是21。
    5. 另一个减少开始,并立即完成结果20(减少空流-来自FluxSRC的仅项目已被另一个减少消耗)。
    6. Flux A向两个子用户发送了OnComplete。
    7. 通量B发送到一个完整的还原结果=20。它被传递到订阅 单声道。 这就是为什么它没有被打印出来。
    8. 两个通量都从mono.when subscription发送到complete,因此 After 已打印。
    9. 大约在那时,第一次还原完成,值21传递给 monob.subscribe(系统输出::println);