代码之家  ›  专栏  ›  技术社区  ›  Ben Thurley

如何阻止activemq artemis一次传递所有消息,并触发除第一个消息外的所有消息的重新传递延迟

  •  0
  • Ben Thurley  · 技术社区  · 5 年前

    出身背景

    我在ApacheArtemis 2.7.0上有一个JMS消息队列。redhat-00056。代理配置了一个 redelivery-delay 10分钟。如果我将一条消息发布到队列中,但在消费者身上失败,那么它将作为预定消息返回队列,并在10分钟内送达。随后发布的任何消息都会被直接处理,因此队列不会被调度的消息阻塞。

    如果一系列消息被快速连续发送,那么它们都会失败,并被安排在10分钟内发送。在这种情况下,阿耳特弥斯似乎试图保持消息的顺序。

    文档

    关于重新交付的文件说:

    其他后续消息将定期传递,只有被取消的消息将在延迟后异步发送回队列。

    Redelivery documentation

    问题

    在我看来,这似乎是不一致的,如果你连续发布消息,Artemis似乎会保持顺序,而如果消息之间有轻微延迟,那么队列不会阻塞,只有失败的消息会被延迟调度(根据文档)。

    我试图找到一个解决方案,这样,如果一条消息失败,需要在10分钟内重新发送,它就不会阻止后续消息。

    实例

    它不需要任何特殊的东西来重现。如上所述,您只需要将一些消息快速连续地发送到代理上具有重新交付策略的队列。我一直在用一个基本示例进行测试,如下所示:

    Spring boot应用程序,在启动时生成五条消息。

    @SpringBootApplication
    public class ArtemisTestApplication
    {
    
        private Logger logger = LoggerFactory.getLogger(ArtemisTestApplication.class);
    
        @Autowired
        private JmsTemplate jmsTemplate;
    
        @PostConstruct
        public void init()
        {
            send("Message1");
            send("Message2");
            send("Message3");
            send("Message4");
            send("Message5");
        }
    
        public void send(String msg)
        {
            logger.debug("Sending message :{}", msg);
            jmsTemplate.convertAndSend("jms.queue.TestQueue", msg);
        }
    
        public static void main(String[] args)
        {
            SpringApplication.run(ArtemisTestApplication.class, args);
        }
    
    }
    

    使用消息并抛出错误以触发重新交付策略。

    @Component
    public class TestConsumer
    {
        private Logger logger = LoggerFactory.getLogger(TestConsumer.class);
    
        @JmsListener(destination = "jms.queue.TestQueue")
        public void receive(TextMessage message) throws JMSException
        {
            logger.debug("Message received: {}", message.getText());
            throw new RuntimeException("Force redelivery policy");
        }
    }
    

    该应用程序是使用 spring boot initializr .除了给它起个名字,唯一值得注意的是,在消息传递下,对阿耳特弥斯的依赖性。

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-artemis</artifactId>
        </dependency>
    

    正在申请中。属性我已经配置了本地运行的Artemis实例的连接属性。

    spring.artemis.mode=native
    spring.artemis.host=localhost
    spring.artemis.port=61616
    spring.artemis.user=
    spring.artemis.password=
    

    在代理上,我用重新交付策略配置了队列。注意:我在这里将延迟设置为0,但问题仍然存在,在第一条消息尝试三次并移动到DLQ之前,所有消息都被阻止。如果您将延迟更改为正数,那么您将看到所有五条消息都安排在稍后发送。

    <address-settings>      
        <address-setting match="jms.queue.TestQueue">            
            <dead-letter-address>DLQ</dead-letter-address>                      
            <redelivery-delay>0</redelivery-delay>    
            <max-delivery-attempts>3</max-delivery-attempts>
        </address-setting>    
      </address-settings>
    
    <addresses>     
        <address name="DLQ">            
            <anycast>               
              <queue name="DLQ" />            
            </anycast>         
          </address>       
          <address name="jms.queue.TestQueue">            
            <anycast>               
              <queue name="jms.queue.TestQueue" />            
            </anycast>         
          </address>                      
    </addresses>
    
    0 回复  |  直到 5 年前
        1
  •  1
  •   Ben Thurley    5 年前

    我已经得出结论,这是阿尔忒弥斯的一个错误。我为此开了罚单,并与其他遇到同样问题的人发表了评论。

    https://issues.apache.org/jira/browse/ARTEMIS-2417

    与此同时,我不得不更改我们的客户端应用程序,以处理重新交付策略本身。如果读取消息时出错,则我们在消息上增加一个计数器,并以所需延迟将其作为新消息写入。然后确认正在使用的消息以解除队列阻塞,并允许读取其他消息。我已经在代理上保留了重新交付策略的配置,以防出现超出此逻辑的错误或未捕获的内容。这并不理想,但至少现在满足了要求。