我建议使用compute并向缓存拓扑中的所有节点发送闭包。然后,在每个节点上迭代本地主集并进行更新。即使使用这种方法,您还是最好批量更新并使用putAll调用(或者使用IgniteDataStreamer)发布它们。
注意:对于下面的示例,重要的是“mappings”和“entities”缓存中的键要么是相同的,要么是并置的。有关搭配的更多信息,请参见:
https://apacheignite.readme.io/docs/affinity-collocation
伪代码如下所示:
ClusterGroup cacheNodes = ignite.cluster().forCache("mappings");
IgniteCompute compute = ignite.compute(cacheNodes.nodes());
compute.broadcast(() -> {
IgniteCache<> mappings = getCache("mappings");
IgniteCache<> entities = getCache("entities");
// Iterate over local primary entries.
entities.localEntries(CachePeekMode.PRIMARY).forEach((entry) -> {
V1 mappingVal = mappings.get(entry.getKey());
V2 entityVal = entry.getValue();
V2 newEntityVal = // do enrichment;
// It would be better to create a batch, and then call putAll(...)
// Using simple put call for simplicity.
entities.put(entry.getKey(), newEntityVal);
}
});