代码之家  ›  专栏  ›  技术社区  ›  Abhishek

HazelcastJet滚动聚合,删除以前的数据并添加新数据

  •  1
  • Abhishek  · 技术社区  · 5 年前

    我们有一个用例,在这个用例中,我们接收到来自kafka的需要聚合的消息。这必须以这样一种方式进行聚合:如果一个更新来自同一个id,那么需要减去现有值(如果有的话)并添加新值。

    从各种论坛我了解到,jet不存储原始值,而是存储聚合结果和一些内部数据。

    在这种情况下,我怎样才能做到这一点?

    例子

    Balance 1 {id:1, amount:100} // aggregated result 100
    Balance 2 {id:2, amount:200} // 300
    Balance 3 {id:1, amount:400} // 600 after removing 100 and adding 400
    

    我可以实现一个简单的使用,每次添加。但我没能实现现有价值需要减去而新价值必须增加的聚合。

    rollingAggregation(AggregatorOperations.summingDouble(<login to add remove>))
        .drainTo(Sinks.logger()).
    
    1. 余额1、2、3是一系列消息
    2. 注释显示jet执行的每条消息的聚合值。
    1 回复  |  直到 5 年前
        1
  •  2
  •   Can Gencer    5 年前

    您可以尝试一个自定义聚合操作,该操作将发出以前和当前看到的值,如下所示:

    public static <T> AggregateOperation1<T, ?, Tuple2<T, T>> previousAndCurrent() {
        return AggregateOperation
                .withCreate(() -> new Object[2])
                .<T>andAccumulate((acc, current) -> {
                    acc[0] = acc[1];
                    acc[1] = current;
                })
                .andExportFinish((acc) -> tuple2((T) acc[0], (T) acc[1]));
    }
    

    输出应该是以下形式的元组 (previous, current) . 然后您可以再次将滚动聚合应用于输出。为了将问题简化为输入,我有一对 (id, amount) 对。

    Pipeline p = Pipeline.create();
    p.drawFrom(Sources.<Integer, Long>mapJournal("map", START_FROM_OLDEST)) // (id, amount)
            .groupingKey(Entry::getKey)
            .rollingAggregate(previousAndCurrent(), (key, val) -> val)
            .rollingAggregate(AggregateOperations.summingLong(e -> {
                long prevValue = e.f0() == null ? 0 : e.f0().getValue();
                long newValue = e.f1().getValue();
                return newValue - prevValue;
            }))
            .drainTo(Sinks.logger());
    
    JetConfig config = new JetConfig();
    config.getHazelcastConfig().addEventJournalConfig(new EventJournalConfig().setMapName("map"));
    JetInstance jet = Jet.newJetInstance(config);
    
    IMapJet<Object, Object> map = jet.getMap("map");
    
    map.put(0, 1L);
    map.put(0, 2L);
    map.put(1, 10L);
    map.put(1, 40L);
    
    jet.newJob(p).join();
    

    1, 2, 12, 42 .

    推荐文章