代码之家  ›  专栏  ›  技术社区  ›  Jigar Naik

使用CompositeItemWriter时,不会调用编写器或分类方法

  •  0
  • Jigar Naik  · 技术社区  · 6 年前

    我正在使用SpringBoot编写一个Spring批,我需要根据条件编写两个不同的表,因此我尝试 CompositeItemWriter 但是,当我调用批处理时,不会调用编写器。

    这是我的作业配置类。

    @Configuration
    public class JobConfiguration {
    
        ...
        ...
        ...
    
        @Bean
        public JdbcCursorItemReader<Notification> reader() {
            JdbcCursorItemReader<Notification> reader = new JdbcCursorItemReader<Notification>();
            reader.setDataSource(dataSource);
    
        ...
        ...
            reader.setRowMapper(new BeanPropertyRowMapper<>(Notification.class));
            return reader;
        }
    
        @Bean
        public NotificationItemProcessor notificatonProcessor() {
            return new NotificationItemProcessor();
        }
    
        @Bean
        public JdbcBatchItemWriter<Notification> updateWriter() {
            JdbcBatchItemWriter<Notification> writer = new JdbcBatchItemWriter<Notification>();
            writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Notification>());
            ...
            writer.setDataSource(dataSource);
            return writer;
        }
    
    
        /**
         * Composite Exchange Writer
         * @return
         * @throws InstantiationException
         * @throws IllegalAccessException
         */
        @SuppressWarnings("unchecked")
        @Bean
        public CompositeItemWriter<Notification> compositeExchangeWriter() throws InstantiationException, IllegalAccessException {
            HashMap<String, Object> map = new HashMap<String, Object>();
            map.put(ExchangeRouter.INSERT_EXCHANGE_FOR_NOTIFICATION.getActionName(), exchangeWorkflowWriter());
            map.put(ExchangeRouter.INSERT_EXCHANGE_FOR_PACK.getActionName(), exchangeWriter());
            map.put(ExchangeRouter.DO_NOTHING.getActionName(), doNothing());
            return new CompositeItemWriterBuilder(map, ExchangeWriterRouterClassifier.class).build();
        }
    
        @Bean
        public JdbcBatchItemWriter<Notification> exchangeWorkflowWriter() {
            JdbcBatchItemWriter<Notification> writer = new JdbcBatchItemWriter<Notification>();
            writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Notification>());
    
            writer.setSql(" INSERT INTO SOME TABLE..");
    
            writer.setDataSource(dataSource);
            return writer;
        }
    
        @Bean
        public JdbcBatchItemWriter<Notification> exchangeWriter() {
            JdbcBatchItemWriter<Notification> writer = new JdbcBatchItemWriter<Notification>();
            writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Notification>());
    
            writer.setSql("INSERT INTO SOME OTHER TABLE.");
    
            writer.setDataSource(dataSource);
            return writer;
        }
    
        @Bean
        public ItemWriter<Document> doNothing() {
            return new DummyWriter();
        }
    
        @Bean
        public Job generatePdf(JobCompletionNotificationListener listener) throws InstantiationException, IllegalAccessException {
            return jobBuilderFactory.get("generatePdf")
                    .incrementer(new RunIdIncrementer())
                    .flow(treatStock())
                    .end()
                    .build();
        }
    
        @Bean
        public Step treatStock() throws InstantiationException, IllegalAccessException {
            return stepBuilderFactory.get("treatStock")
                    .<Notification, Notification>chunk(1)
                    .reader(reader())
                    .processor(notificatonProcessor())
                    .writer(compositeExchangeWriter())
                    .writer(updateWriter())
                    .build();
        }
    
    }
    

    复合项编写器.java

    public class CompositeItemWriterBuilder extends CompositeItemBuilder<CompositeItemWriter> {
    
        public CompositeItemWriterBuilder(HashMap<String, Object> matcherMap, Class<?> routerDelegate) throws InstantiationException, IllegalAccessException {
            BackToBackPatternClassifier classif = new BackToBackPatternClassifier();
            classif.setRouterDelegate(routerDelegate.newInstance());
            classif.setMatcherMap(matcherMap);
    
            ClassifierCompositeItemWriter classifier = new ClassifierCompositeItemWriter();
            classifier.setClassifier(classif);
    
            this.delegates.add(classifier);
    
        }
    
        public CompositeItemWriterBuilder(List<Object> delegates) {
            this.delegates = delegates;
        }
    
        @Override
        protected Class<?> getCompositeItem() {
            return CompositeItemWriter.class;
        }
    }
    

    compositeitembuiler.java

    public abstract class CompositeItemBuilder<T> {
    
        protected List<Object> delegates = new ArrayList<Object>();
    
        @SuppressWarnings("unchecked")
        public T build() throws InstantiationException, IllegalAccessException {
    
            Object compositeItem = getCompositeItem().newInstance();
            Method setDelegates = ReflectionUtils.findMethod(compositeItem.getClass(), "setDelegates", List.class);
            ReflectionUtils.invokeMethod(setDelegates,compositeItem, delegates);
    
            return (T) compositeItem;
        }
    
        abstract protected Class<?> getCompositeItem();
    }
    

    JAVA(分类方法未被调用)

    public class ExchangeWriterRouterClassifier {
    
        @Classifier
        public String classify(Notification notification) {
            return notification.getExchangesWorkflow().getRouter().getActionName();
        }
    
    }
    

    Spring如何调用分类器? 我错过什么了吗?

    1 回复  |  直到 6 年前
        1
  •  1
  •   Mahmoud Ben Hassine    6 年前

    但是,当我调用批处理时,不会调用编写器。

    问题在步骤定义中:

    @Bean
    public Step treatStock() throws InstantiationException, IllegalAccessException {
        return stepBuilderFactory.get("treatStock")
                .<Notification, Notification>chunk(1)
                .reader(reader())
                .processor(notificatonProcessor())
                .writer(compositeExchangeWriter())
                .writer(updateWriter())
                .build();
    }
    

    你在打电话给 writer() 方法两次,因此 updateWriter() 将覆盖 compositeExchangeWriter() . 您需要使用复合编写器作为参数调用一次该方法,您可以在该参数上设置委托编写器。

    作为一个附加说明,在使用复合编写器时,如果委托不实现 ItemStream 接口。有关此的详细信息,请访问: https://docs.spring.io/spring-batch/4.0.x/reference/html/readersAndWriters.html#delegatePatternAndRegistering

    Spring如何调用分类器?

    当A ClassifierCompositeItemWriter 如果配置正确,Spring批处理将对每个项调用分类器以确定要使用哪个编写器,然后调用适当的编写器来编写项。

    在您的配置中, 分类器组合项编写器 此处未正确配置:

        @SuppressWarnings("unchecked")
    public T build() throws InstantiationException, IllegalAccessException {
    
        Object compositeItem = getCompositeItem().newInstance();
        Method setDelegates = ReflectionUtils.findMethod(compositeItem.getClass(), "setDelegates", List.class);
        ReflectionUtils.invokeMethod(setDelegates,compositeItem, delegates);
    
        return (T) compositeItem;
    }
    

    我不会用反省来设置代表。问题是您希望使用该方法 compositeExchangeWriter 注册一个 分类器组合项编写器 但它的返回类型是 CompositeItemWriter . 因此,复合编写器不被视为分类器。

    您可以找到如何使用 分类器组合项编写器 在这里: https://github.com/spring-projects/spring-batch/blob/master/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/support/ClassifierCompositeItemWriterTests.java#L44