代码之家  ›  专栏  ›  技术社区  ›  Tobi

如何按组聚合通量上的元素/如何按组减少?

  •  1
  • Tobi  · 技术社区  · 5 年前

    假设有一系列具有以下结构的对象:

    class Element {
      String key;
      int count;
    }
    

    现在想象一下,这些元素以预定义的排序顺序流动,总是以一组键的形式流动,如

    { key = "firstKey",  count=123}
    { key = "firstKey",  count=1  }
    { key = "secondKey", count=4  }
    { key = "thirdKey",  count=98 }
    { key = "thirdKey",  count=5  }
     .....
    

    我想做的是创建一个通量,它为每个不同的元素返回一个元素 key count 对于每个关键组。 reduce 运算符不起作用,因为它只返回一个元素,我想得到一个通量,每个不同的键有一个元素。

    使用 bufferUntil 可能有效,但有一个缺点,我必须保持一个状态来检查 与前一个相比发生了变化。

    使用 groupBy 这是一种过度的杀伤力,因为我知道一旦找到一个新密钥,每个组都会结束,所以我不想在该事件之后保留任何缓存。

    这样的聚合是否可以使用 Flux ,而不将状态保持在流之外?

    1 回复  |  直到 5 年前
        1
  •  2
  •   Simon Baslé    5 年前

    目前(从3.2.5开始)如果不跟踪自己的状态,这是不可能的。 distinctUntilChanged

    解决这个问题最简单的方法是使用 windowUntil compose +一个 AtomicReference 对于每个订户的状态:

    Flux<Tuple2<T, Integer>> sourceFlux = ...; //assuming key/count represented as `Tuple2`
    Flux<Tuple2<T, Integer>> aggregated = sourceFlux.compose(source -> {
        //having this state inside a compose means it will not be shared by multiple subscribers
        AtomicReference<T> last = new AtomicReference<>(null);
    
        return source
          //use "last seen" state so split into windows, much like a `groupBy` but with earlier closing
          .windowUntil(i -> !i.getT1().equals(last.getAndSet(i.getT1())), true)
          //reduce each window
          .flatMap(window -> window.reduce((i1, i2) -> Tuples.of(i1.getT1(), i1.getT2() + i2.getT2()))
    });