代码之家  ›  专栏  ›  技术社区  ›  JvmSd121

默认为content\u类型的应用程序/json,覆盖了DefaultExceptionStrategy中的isFatal

  •  0
  • JvmSd121  · 技术社区  · 7 年前

    我不想要求我的客户提供content\u类型的应用程序/json,而只是默认设置。我让它工作了。

    我还尝试结合另一个示例来实现ConditionalRejectingErrorHandler中的自定义isFatal(Throwable t)。我可以让我的自定义错误处理程序启动,但它似乎再次需要content\u type属性。我想不出如何让他们俩同时工作。

    有什么想法吗?

    以下成功运行,不需要content\u类型 编辑:下面的代码没有像我想的那样工作。队列中具有属性content\u type application/json的旧消息必须已被拉入

     @EnableRabbit
     @Configuration
     public class ExampleRabbitConfigurer implements 
     RabbitListenerConfigurer {
    
    @Value("${spring.rabbitmq.host:'localhost'}")
    private String host;
    
    @Value("${spring.rabbitmq.port:5672}")
    private int port;
    
    @Value("${spring.rabbitmq.username}")
    private String username;
    
    @Value("${spring.rabbitmq.password}")
    private String password;
    
    @Autowired
    private MappingJackson2MessageConverter mappingJackson2MessageConverter;
    
    @Autowired
    private DefaultMessageHandlerMethodFactory messageHandlerMethodFactory;
    
    @Bean
    public MappingJackson2MessageConverter mappingJackson2MessageConverter() {
       return new MappingJackson2MessageConverter();
    }
    
    @Bean
    public DefaultMessageHandlerMethodFactory messageHandlerMethodFactory() {
       DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
       factory.setMessageConverter(mappingJackson2MessageConverter);
       return factory;
    }
    
    @Override
    public void configureRabbitListeners(final RabbitListenerEndpointRegistrar registrar) {
       registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory);
    }
    

    下面的代码用于覆盖ConditionalRejectingErrorHandler中的isFatal()。SimpleRabbitListenerContainerFactory。setMessageConverter()似乎应该与DefaultMessageHandlerMethodFactory具有相同的用途。setMessageConverter()。显然情况并非如此。

     @EnableRabbit
     @Configuration
     public class ExampleRabbitConfigurer {
    
    @Value("${spring.rabbitmq.host:'localhost'}")
    private String host;
    
    @Value("${spring.rabbitmq.port:5672}")
    private int port;
    
    @Value("${spring.rabbitmq.username}")
    private String username;
    
    @Value("${spring.rabbitmq.password}")
    private String password;
    
    @Autowired
    ConnectionFactory connectionFactory;
    
    @Autowired
    Jackson2JsonMessageConverter jackson2JsonConverter;
    
    @Autowired
    ErrorHandler amqpErrorHandlingExceptionStrategy;
    
    @Bean
    public Jackson2JsonMessageConverter jackson2JsonConverter() {
        return new Jackson2JsonMessageConverter();
    }
    
    @Bean
    public ErrorHandler amqpErrorHandlingExceptionStrategy() {
        return new ConditionalRejectingErrorHandler(new AmqpErrorHandlingExceptionStrategy());
    }
    
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(jackson2JsonConverter);
        factory.setErrorHandler(amqpErrorHandlingExceptionStrategy);
        return factory;
    }
    
    
    public static class AmqpErrorHandlingExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {
    
        private final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(getClass());
    
        @Override
        public boolean isFatal(Throwable t) {
    
            if (t instanceof ListenerExecutionFailedException) {
                ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t;
                LOGGER.error("Failed to process inbound message from queue "
                        + lefe.getFailedMessage().getMessageProperties().getConsumerQueue()
                        + "; failed message: " + lefe.getFailedMessage(), t);
            }
            return super.isFatal(t);
        }
    
    }
    
    1 回复  |  直到 7 年前
        1
  •  2
  •   Gary Russell    7 年前

    使用“接收后” MessagePostProcessor 添加 contentType 入站邮件的标头。

    从2.0版开始,您可以将MPP添加到容器工厂。

    对于早期版本,您可以重新配置。。。

    @SpringBootApplication
    public class So47424449Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So47424449Application.class, args);
        }
    
        @Bean
        public ApplicationRunner runner(RabbitListenerEndpointRegistry registry, RabbitTemplate template) {
            return args -> {
                SimpleMessageListenerContainer container =
                        (SimpleMessageListenerContainer) registry.getListenerContainer("myListener");
                container.setAfterReceivePostProcessors(m -> {
                    m.getMessageProperties().setContentType("application/json");
                    return m;
                });
                container.start();
    
                // send a message with no content type
                template.setMessageConverter(new SimpleMessageConverter());
                template.convertAndSend("foo", "{\"bar\":\"baz\"}", m -> {
                    m.getMessageProperties().setContentType(null);
                    return m;
                });
                template.convertAndSend("foo", "{\"bar\":\"qux\"}", m -> {
                    m.getMessageProperties().setContentType(null);
                    return m;
                });
            };
        }
    
        @Bean
        public Jackson2JsonMessageConverter converter() {
            return new Jackson2JsonMessageConverter();
        }
    
        @RabbitListener(id = "myListener", queues = "foo", autoStartup = "false")
        public void listen(Foo foo) {
            System.out.println(foo);
            if (foo.bar.equals("qux")) {
                throw new MessageConversionException("test");
            }
        }
    
        public static class Foo {
    
            public String bar;
    
            public String getBar() {
                return this.bar;
            }
    
            public void setBar(String bar) {
                this.bar = bar;
            }
    
            @Override
            public String toString() {
                return "Foo [bar=" + this.bar + "]";
            }
    
        }
    
    }
    

    如您所见,由于它修改了源消息,因此修改后的标头在错误处理程序中可用。。。

    2017-11-22 09:39:26.615 WARN 97368---[cTaskExecutor-1]IngerroHandler$DefaultExceptionStrategy:致命消息转换错误;消息被拒绝;如果这样配置,它将被丢弃或路由到死信交换:(正文:{“bar”:“qux”}消息属性[头={}, contentType=应用程序/json ,contentEncoding=UTF-8,contentLength=0,receivedDeliveryMode=PERSISTENT,priority=0,redelived=false,receivedExchange=,receivedrootingkey=foo,deliveryTag=2,consumerTag=amq。ctag-re1kcxKV14L\u nl186stM0w,consumerQueue=foo]),contentType=application/json,contentEncoding=UTF-8,contentLength=0,receivedDeliveryMode=PERSISTENT,priority=0,redelived=false,receivedExchange=,receivedrootingkey=foo,deliveryTag=2,consumerTag=amq。ctag-re1kcxKV14L\U nl186stM0w,consumerQueue=foo])