代码之家  ›  专栏  ›  技术社区  ›  Debopam

Spring集成在聚合后调用另一个处理程序方法

  •  2
  • Debopam  · 技术社区  · 6 年前

    我正在开发一个系统,将读取和处理文件从一个目录。处理完所有文件后,它将调用一个方法,该方法将生成一个文件。另外,它应该根据文件名路由/处理文件,我也使用了spring集成路由器。下面是集成的代码片段。我的问题是,如果我删除任何一行,这是行不通的 .channel(aggregatorOutputChannel()) .channel(confirmChannel()) ,而且我必须保持同一频道 聚合器前后。为什么我需要所有的三频道声明?如果这是错误的,如何纠正它。

    我使用的是JDK8,Spring5,SpringBoot2.0.4。

    @Configuration
    @EnableIntegration
    public class IntegrationConfig {
    
        @Value("${agent.demographic.input.directory}")
        private String inputDir;
    
        @Value("${agent.demographic.output.directory}")
        private String outputDir;
    
        @Value("${confirmationfile.directory}")
        private String confirmDir;
    
        @Value("${input.scan.frequency: 2}")
        private long scanFrequency;
    
        @Value("${processing.waittime: 6000}")
        private long messageGroupWaiting;
    
        @Value("${thread.corepoolsize: 10}")
        private int corepoolsize;
    
        @Value("${thread.maxpoolsize: 20}")
        private int maxpoolsize;
    
        @Value("${thread.queuecapacity: 1000}")
        private int queuedepth;
    
        @Bean
        public MessageSource<File> inputFileSource() {
            FileReadingMessageSource src = new FileReadingMessageSource();
    
            src.setDirectory(new File(inputDir));
            src.setAutoCreateDirectory(true);
    
            ChainFileListFilter<File> chainFileListFilter = new ChainFileListFilter<>();
            chainFileListFilter.addFilter(new AcceptOnceFileListFilter<>() );
            chainFileListFilter.addFilter(new RegexPatternFileListFilter("(?i)^.+\\.xml$"));
            src.setFilter(chainFileListFilter);
            return src;
        }
    
        @Bean
        public UnZipTransformer unZipTransformer() {
            UnZipTransformer unZipTransformer = new UnZipTransformer();
            unZipTransformer.setExpectSingleResult(false);
            unZipTransformer.setZipResultType(ZipResultType.FILE);
            unZipTransformer.setDeleteFiles(true);
    
            return unZipTransformer;
        }
    
        @Bean("agentdemographicsplitter")
        public UnZipResultSplitter splitter() {
            UnZipResultSplitter splitter = new UnZipResultSplitter();
            return splitter;
        }
    
        @Bean
        public DirectChannel outputChannel() {
            return new DirectChannel();
        }
    
        @Bean
        public DirectChannel aggregatorOutputChannel() {
            return new DirectChannel();
        }
    
        @Bean("confirmChannel")
        public DirectChannel confirmChannel() {
            return new DirectChannel();
        }
    
        @Bean
        public MessageHandler fileOutboundChannelAdapter() {
            FileWritingMessageHandler adapter = new FileWritingMessageHandler(new File(outputDir));
            adapter.setDeleteSourceFiles(true);
            adapter.setAutoCreateDirectory(true);
            adapter.setExpectReply(true);
            adapter.setLoggingEnabled(true);
            return adapter;
        }
    
    
        @Bean
        public MessageHandler confirmationfileOutboundChannelAdapter() {
            FileWritingMessageHandler adapter = new FileWritingMessageHandler(new File(confirmDir));
            adapter.setDeleteSourceFiles(true);
            adapter.setAutoCreateDirectory(true);
            adapter.setExpectReply(false);
            adapter.setFileNameGenerator(defaultFileNameGenerator() );
            return adapter;
        }
    
        @Bean
        public TaskExecutor taskExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(corepoolsize);
            executor.setMaxPoolSize(maxpoolsize);
            executor.setQueueCapacity(queuedepth);
            return executor;
        }
    
        @Bean
        public DefaultFileNameGenerator defaultFileNameGenerator() {
            DefaultFileNameGenerator defaultFileNameGenerator = new DefaultFileNameGenerator();
            defaultFileNameGenerator.setExpression("payload.name");
            return defaultFileNameGenerator;
        }
    
        @Bean
        public IntegrationFlow confirmGeneration() {
            return IntegrationFlows.
                    from("confirmChannel")
                    .handle(confirmationfileOutboundChannelAdapter())
                    .get();
        }
    
        @Bean
        public IntegrationFlow individualProcessor() {
            return flow -> flow.handle("thirdpartyIndividualAgentProcessor","processfile").channel(outputChannel()).handle(fileOutboundChannelAdapter());
        }
    
        @Bean
        public IntegrationFlow firmProcessor() {
            return flow -> flow.handle("thirdpartyFirmAgentProcessor","processfile").channel(outputChannel()).handle(fileOutboundChannelAdapter());
        }
    
        @Bean
        public IntegrationFlow thirdpartyAgentDemographicFlow() {
            return IntegrationFlows
                    .from(inputFileSource(), spec -> spec.poller(Pollers.fixedDelay(scanFrequency,TimeUnit.SECONDS)))
                    .channel(MessageChannels.executor(taskExecutor()))
                    .<File, Boolean>route(f -> f.getName().contains("individual"), m -> m
                            .subFlowMapping(true, sf -> sf.gateway(individualProcessor()))
                            .subFlowMapping(false, sf -> sf.gateway(firmProcessor()))
                            )
                    .channel(aggregatorOutputChannel())
                    .aggregate(aggregator -> aggregator.groupTimeout(messageGroupWaiting).correlationStrategy(new CorrelationStrategy() {
    
                        @Override
                        public Object getCorrelationKey(Message<?> message) {
                            return "xyz";
                        }
                    }))
                    .channel(aggregatorOutputChannel())
                    .handle("agentDemograpicOutput","generateAgentDemographicFile")
                    .channel(confirmChannel())
                    .get();
        }
    }
    

    下面是日志

    2018-09-07 17:29:20.003 DEBUG 10060 --- [ taskExecutor-2] o.s.integration.channel.DirectChannel    : preSend on channel 'outputChannel', message: GenericMessage [payload=C:\thirdpartyintg\input\18237232_firm.xml, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1a867ae7, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1a867ae7, file_name=18237232_firm.xml, file_originalFile=C:\thirdpartyintg\input\18237232_firm.xml, id=dd70999a-8b8d-93d2-1a43-a961ac2c339f, file_relativePath=18237232_firm.xml, timestamp=1536366560003}]
    2018-09-07 17:29:20.003 DEBUG 10060 --- [ taskExecutor-2] o.s.i.file.FileWritingMessageHandler     : fileOutboundChannelAdapter received message: GenericMessage [payload=C:\thirdpartyintg\input\18237232_firm.xml, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1a867ae7, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1a867ae7, file_name=18237232_firm.xml, file_originalFile=C:\thirdpartyintg\input\18237232_firm.xml, id=dd70999a-8b8d-93d2-1a43-a961ac2c339f, file_relativePath=18237232_firm.xml, timestamp=1536366560003}]
    2018-09-07 17:29:20.006 DEBUG 10060 --- [ taskExecutor-2] o.s.integration.channel.DirectChannel    : postSend (sent=true) on channel 'outputChannel', message: GenericMessage [payload=C:\thirdpartyintg\input\18237232_firm.xml, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1a867ae7, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1a867ae7, file_name=18237232_firm.xml, file_originalFile=C:\thirdpartyintg\input\18237232_firm.xml, id=dd70999a-8b8d-93d2-1a43-a961ac2c339f, file_relativePath=18237232_firm.xml, timestamp=1536366560003}]
    2018-09-07 17:29:20.006 DEBUG 10060 --- [ taskExecutor-2] o.s.integration.channel.DirectChannel    : postSend (sent=true) on channel 'firmProcessor.input', message: GenericMessage [payload=C:\thirdpartyintg\input\18237232_firm.xml, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1a867ae7, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1a867ae7, file_name=18237232_firm.xml, file_originalFile=C:\thirdpartyintg\input\18237232_firm.xml, id=0e6dcb75-db99-1740-7b58-e9b42bfbf603, file_relativePath=18237232_firm.xml, timestamp=1536366559761}]
    2018-09-07 17:29:20.007 DEBUG 10060 --- [ taskExecutor-2] o.s.integration.channel.DirectChannel    : preSend on channel 'thirdpartyintgAgentDemographicFlow.channel#2', message: GenericMessage [payload=C:\thirdpartyintg\output\18237232_firm.xml, headers={file_originalFile=C:\thirdpartyintg\input\18237232_firm.xml, id=e6e2a30a-60b9-7cdd-84cc-4977d4c21c97, file_name=18237232_firm.xml, file_relativePath=18237232_firm.xml, timestamp=1536366560007}]
    2018-09-07 17:29:20.008 DEBUG 10060 --- [ taskExecutor-2] o.s.integration.channel.DirectChannel    : postSend (sent=true) on channel 'thirdpartyintgAgentDemographicFlow.channel#2', message: GenericMessage [payload=C:\thirdpartyintg\output\18237232_firm.xml, headers={file_originalFile=C:\thirdpartyintg\input\18237232_firm.xml, id=e6e2a30a-60b9-7cdd-84cc-4977d4c21c97, file_name=18237232_firm.xml, file_relativePath=18237232_firm.xml, timestamp=1536366560007}]
    2018-09-07 17:29:20.009 DEBUG 10060 --- [ taskExecutor-2] o.s.integration.channel.DirectChannel    : postSend (sent=true) on channel 'thirdpartyintgAgentDemographicFlow.subFlow#1.channel#0', message: GenericMessage [payload=C:\thirdpartyintg\input\18237232_firm.xml, headers={file_originalFile=C:\thirdpartyintg\input\18237232_firm.xml, id=13713de8-91ce-b1fa-f52d-450d3038cf9c, file_name=18237232_firm.xml, file_relativePath=18237232_firm.xml, timestamp=1536366559757}]
    2018-09-07 17:29:26.009  INFO 10060 --- [ask-scheduler-9] o.s.i.a.AggregatingMessageHandler        : Expiring MessageGroup with correlationKey[processdate]
    2018-09-07 17:29:26.011 DEBUG 10060 --- [ask-scheduler-9] o.s.integration.channel.NullChannel      : message sent to null channel: GenericMessage [payload=C:\thirdpartyintg\output\17019222_individual.xml, headers={file_originalFile=C:\thirdpartyintg\input\17019222_individual.xml, id=c654076b-696f-25d4-bded-0a43d1a8ca97, file_name=17019222_individual.xml, file_relativePath=17019222_individual.xml, timestamp=1536366559927}]
    2018-09-07 17:29:26.011 DEBUG 10060 --- [ask-scheduler-9] o.s.integration.channel.NullChannel      : message sent to null channel: GenericMessage [payload=C:\thirdpartyintg\output\18237232_firm.xml, headers={file_originalFile=C:\thirdpartyintg\input\18237232_firm.xml, id=e6e2a30a-60b9-7cdd-84cc-4977d4c21c97, file_name=18237232_firm.xml, file_relativePath=18237232_firm.xml, timestamp=1536366560007}]
    
    1 回复  |  直到 6 年前
        1
  •  2
  •   Artem Bilan    6 年前

    首先 RegexPatternFileListFilter 应该是第一名 ChainFileListFilter AcceptOnceFileListFilter 对于您不感兴趣的文件。

    你需要 .channel(confirmChannel()) 最后 thirdpartyAgentDemographicFlow 因为这个是你的 confirmGeneration 流动。

    我不认为你 .channel(aggregatorOutputChannel()) 它必须是含蓄的。 你也不需要这个 .channel(outputChannel()) 在子流中。

    请详细说明:你得到了什么错误,然后它是如何工作的等等。。。 org.springframework.integration 来确定你的信息是如何传播的。

    如果您在GitHub上共享一些简单的Spring Boot项目,让我们按照您提供的说明进行游戏和复制,也会有很大帮助。

    我也注意到你的聚合器是基于 groupTimeout() . 要将聚合消息发送到下游,您还需要在那里配置:

    /**
     * @param sendPartialResultOnExpiry the sendPartialResultOnExpiry.
     * @return the handler spec.
     * @see AbstractCorrelatingMessageHandler#setSendPartialResultOnExpiry(boolean)
     */
    public S sendPartialResultOnExpiry(boolean sendPartialResultOnExpiry) {
    

    它是 false 默认情况下,您的消息确实会发送到 NullChannel . 在文档中查看更多信息: https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-routing-chapter.html#agg-and-group-to