您可以尝试一个自定义聚合操作,该操作将发出以前和当前看到的值,如下所示:
public static <T> AggregateOperation1<T, ?, Tuple2<T, T>> previousAndCurrent() {
return AggregateOperation
.withCreate(() -> new Object[2])
.<T>andAccumulate((acc, current) -> {
acc[0] = acc[1];
acc[1] = current;
})
.andExportFinish((acc) -> tuple2((T) acc[0], (T) acc[1]));
}
输出应该是以下形式的元组
(previous, current)
. 然后您可以再次将滚动聚合应用于输出。为了将问题简化为输入,我有一对
(id, amount)
对。
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<Integer, Long>mapJournal("map", START_FROM_OLDEST)) // (id, amount)
.groupingKey(Entry::getKey)
.rollingAggregate(previousAndCurrent(), (key, val) -> val)
.rollingAggregate(AggregateOperations.summingLong(e -> {
long prevValue = e.f0() == null ? 0 : e.f0().getValue();
long newValue = e.f1().getValue();
return newValue - prevValue;
}))
.drainTo(Sinks.logger());
JetConfig config = new JetConfig();
config.getHazelcastConfig().addEventJournalConfig(new EventJournalConfig().setMapName("map"));
JetInstance jet = Jet.newJetInstance(config);
IMapJet<Object, Object> map = jet.getMap("map");
map.put(0, 1L);
map.put(0, 2L);
map.put(1, 10L);
map.put(1, 40L);
jet.newJob(p).join();
1, 2, 12, 42
.