代码之家  ›  专栏  ›  技术社区  ›  Sheep

Spring聚合器聚合输入组

  •  1
  • Sheep  · 技术社区  · 6 年前

    我想将数据聚合到组中,如何才能做到这一点?我想使用聚合器(Spring集成)来实现这一点。我的场景是这样的。

    Spring Batch<-->Spring Integration File -->Reader-->Proccessor<-->[Gateway<->Aggregator]
    

    {Male,Joe;Male,Dave;Female,Anne;Female,Jane}-->sequentialyto-->Gateway-->Aggregate-> Gender(Male,{Joe,Dave}...Gender{Female,{Anne,Jane})
    

    发布策略是什么样的?我需要一些小代码片段

    谢谢

    1 回复  |  直到 6 年前
        1
  •  0
  •   Artem Bilan    6 年前

    我不知道是什么 Reader 在您的情况下,但是Spring集成提供了 FileSplitter 工具它的 markers 您可以使用来确定聚合器已准备好发布。

    我想说你需要 releaseStrategy false :

    release-strategy-expression="false"
    

    这意味着你的团队在正常情况下不会被释放。您无法确定要发布的组的大小。

    这个 FileSplitter.FileMarker.Mark.END 作为来自的最后一条消息发出 文件拆分器 读取文件的最后一行以及 MessageGroupStore.expireMessageGroups(0) 调用将使聚合器运行良好。但仍需配置:

     <xsd:attribute name="send-partial-result-on-expiry" type="xsd:string">
                    <xsd:annotation>
                        <xsd:documentation>
                        Specifies whether messages that expired should be aggregated and sent to the 'output-channel' or 'replyChannel'.
                        Messages are expired when their containing MessageGroup expires. One of the ways of expiring MessageGroups is by
                        configuring a MessageGroupStoreReaper. However MessageGroups can alternatively be expired by simply calling
                        MessageGroupStore.expireMessageGroups(timeout). That could be accomplished via a ControlBus operation
                        or by simply invoking that method if you have a reference to the MessageGroupStore instance.
                        </xsd:documentation>
                    </xsd:annotation>
                </xsd:attribute>
    

    在到期后正常释放组。