代码之家  ›  专栏  ›  技术社区  ›  KJQ

Spring集成使用和方法验证

  •  2
  • KJQ  · 技术社区  · 7 年前

    1. 下面的代码是使用DSL构造流的正确方法吗?
    2. 在下面的“C”中,我可以将结果冒泡到“B”流吗?
    3. 使用DSL和XML是更好的方法吗?
    4. 我很困惑如何正确地“终止”流?

    在下面的代码中,我只是将页面发布到目标。整个流程是这样的。

    1. Publisher-flow侦听负载并将其拆分为多个部分。
    2. 内容流过滤出页面并将其拆分为多个部分。
      1. AWS flow订阅并处理部件。
      2. 文件流订阅并处理部件。

    A) 发布流(publisher.jar):

    这是我通过网关启动的“主”流。其目的是作为开始触发所有发布流的入口点。

    1. 接收消息
    2. 预处理消息并保存。
    3. 用其余数据充实每个条目
    4. 将每个条目放在输出通道上。

    代码如下:

    @Bean
    IntegrationFlow flowPublish()
    {
        return f -> f
            .channel(this.publishingInputChannel())
            //Prepare the payload
            .<Package>handle((p, h) -> this.save(p))
            //Split the artifact resolved items
            .split(Package.class, Package::getItems)
            //Find the artifact associated to each item (if available)
            .enrich(
                e -> e.<PackageEntry>requestPayload(
                    m ->
                    {
                        final PackageEntry item = m.getPayload();
                        final Publishable publishable = this.findPublishable(item);
                        item.setPublishable(publishable);
                        return item;
                    }))
            //Send the results to the output channel
            .channel(this.publishingOutputChannel());
    }
    

    该模块的职责是处理传入的“内容”有效负载(即本例中的页面),并将其拆分/路由到适当的订阅者。

    1. 收听发布者输出频道
    2. 将原始有效载荷添加到收割台以备将来使用
    3. 将有效载荷转换为实际类型
    4. 将每个元素路由到适当的PubSub通道。

    代码如下:

    @Bean
    @ContentChannel("asset")
    MessageChannel contentAssetChannel()
    {
        return MessageChannels.publishSubscribe("assetPublisherChannel").get();
    
        //return MessageChannels.queue(10).get();
    }
    
    @Bean
    @ContentChannel("page")
    MessageChannel contentPageChannel()
    {
        return MessageChannels.publishSubscribe("pagePublisherChannel").get();
    
        //return MessageChannels.queue(10).get();
    }
    
    @Bean
    IntegrationFlow flowPublishContent()
    {
        return flow -> flow
            .channel(this.publishingChannel)
            //Filter for root pages (which contain elements)
            .filter(PackageEntry.class, p -> p.getPublishable() instanceof Page)
            //Put the publishable details in the header
            .enrichHeaders(e -> e.headerFunction("item", Message::getPayload))
            //Transform the item to a Page
            .transform(PackageEntry.class, PackageEntry::getPublishable)
            //Split page into components and put the type in the header
            .split(Page.class, this::splitPageElements)
            //Route content based on type to the subscriber
            .<PageContent, String>route(PageContent::getType, mapping -> mapping
                .resolutionRequired(false)
                .subFlowMapping("page", sf -> sf.channel(this.contentPageChannel()))
                .subFlowMapping("image", sf -> sf.channel(this.contentAssetChannel()))
                .defaultOutputToParentFlow())
            .channel(IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME);
    }
    

    该模块是内容特定流的许多潜在订户之一。它根据发布到上面的路由通道分别处理每个元素。

    1. 订阅适当的频道。

    例如,“contentPageChannel”可以调用下面的flowPageToS3(在aws模块中)和FlowPagetFile(在另一个模块中)。

    代码如下:

    @Bean
    IntegrationFlow flowAssetToS3()
    {
        return flow -> flow
            .channel(this.assetChannel)
            .publishSubscribeChannel(c -> c
                .subscribe(s -> s
                    .<PageContent>handle((p, h) ->
                                         {
                                             return this.publishS3Asset(p);
                                         })));
    }
    
    @Bean
    IntegrationFlow flowPageToS3()
    {
        return flow -> flow
            .channel(this.pageChannel)
            .publishSubscribeChannel(c -> c
                .subscribe(s -> s
                    .<Page>handle((p, h) -> this.publishS3Page(p))
                    .enrichHeaders(e -> e.header("s3Command", Command.UPLOAD.name()))
                    .handle(this.s3MessageHandler())));
    }
    
    1 回复  |  直到 7 年前
        1
  •  2
  •   Artem Bilan    7 年前

    首先,你的问题中有很多内容:在阅读过程中很难保留所有信息。这是你的项目,所以你应该对这个主题非常有信心。但对我们来说,这是一种新的东西,甚至可能会放弃阅读,不说话,试图回答。

    下面的代码是使用DSL构造流的正确方法吗?

    这真的取决于你的逻辑。将其与逻辑组件区分开来是个好主意,但在这个问题上切断单独的jar可能会带来开销。看看你们的代码,在我看来,你们仍然把所有的东西都收集到一个Spring引导应用程序中 @Autowired @Configuration @配置

    在下面的“C”中,我可以将结果冒泡到“B”流吗?

    return Scatter-Gather 模式实现。

    使用DSL和XML是更好的方法吗?

    两者都只是一个高级API。下面是相同的集成组件。看看你的应用程序,你会发现同样的分布式解决方案与XML配置。没有理由退出Java DSL。至少对你来说,它没有那么冗长。

    这绝对不清楚有你的大描述。如果发送到S3或文件,则表示终止。这些组件没有回复,因此无处可去,无所事事。这就是停止。与Java方法相同 void .如果你担心你的入口网关,那就做吧 不要等待任何回复。看见 Messaging Gateway 了解更多信息。