出身背景
我在ApacheArtemis 2.7.0上有一个JMS消息队列。redhat-00056。代理配置了一个
redelivery-delay
10分钟。如果我将一条消息发布到队列中,但在消费者身上失败,那么它将作为预定消息返回队列,并在10分钟内送达。随后发布的任何消息都会被直接处理,因此队列不会被调度的消息阻塞。
如果一系列消息被快速连续发送,那么它们都会失败,并被安排在10分钟内发送。在这种情况下,阿耳特弥斯似乎试图保持消息的顺序。
文档
关于重新交付的文件说:
其他后续消息将定期传递,只有被取消的消息将在延迟后异步发送回队列。
Redelivery documentation
问题
在我看来,这似乎是不一致的,如果你连续发布消息,Artemis似乎会保持顺序,而如果消息之间有轻微延迟,那么队列不会阻塞,只有失败的消息会被延迟调度(根据文档)。
我试图找到一个解决方案,这样,如果一条消息失败,需要在10分钟内重新发送,它就不会阻止后续消息。
实例
它不需要任何特殊的东西来重现。如上所述,您只需要将一些消息快速连续地发送到代理上具有重新交付策略的队列。我一直在用一个基本示例进行测试,如下所示:
Spring boot应用程序,在启动时生成五条消息。
@SpringBootApplication
public class ArtemisTestApplication
{
private Logger logger = LoggerFactory.getLogger(ArtemisTestApplication.class);
@Autowired
private JmsTemplate jmsTemplate;
@PostConstruct
public void init()
{
send("Message1");
send("Message2");
send("Message3");
send("Message4");
send("Message5");
}
public void send(String msg)
{
logger.debug("Sending message :{}", msg);
jmsTemplate.convertAndSend("jms.queue.TestQueue", msg);
}
public static void main(String[] args)
{
SpringApplication.run(ArtemisTestApplication.class, args);
}
}
使用消息并抛出错误以触发重新交付策略。
@Component
public class TestConsumer
{
private Logger logger = LoggerFactory.getLogger(TestConsumer.class);
@JmsListener(destination = "jms.queue.TestQueue")
public void receive(TextMessage message) throws JMSException
{
logger.debug("Message received: {}", message.getText());
throw new RuntimeException("Force redelivery policy");
}
}
该应用程序是使用
spring boot initializr
.除了给它起个名字,唯一值得注意的是,在消息传递下,对阿耳特弥斯的依赖性。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-artemis</artifactId>
</dependency>
正在申请中。属性我已经配置了本地运行的Artemis实例的连接属性。
spring.artemis.mode=native
spring.artemis.host=localhost
spring.artemis.port=61616
spring.artemis.user=
spring.artemis.password=
在代理上,我用重新交付策略配置了队列。注意:我在这里将延迟设置为0,但问题仍然存在,在第一条消息尝试三次并移动到DLQ之前,所有消息都被阻止。如果您将延迟更改为正数,那么您将看到所有五条消息都安排在稍后发送。
<address-settings>
<address-setting match="jms.queue.TestQueue">
<dead-letter-address>DLQ</dead-letter-address>
<redelivery-delay>0</redelivery-delay>
<max-delivery-attempts>3</max-delivery-attempts>
</address-setting>
</address-settings>
<addresses>
<address name="DLQ">
<anycast>
<queue name="DLQ" />
</anycast>
</address>
<address name="jms.queue.TestQueue">
<anycast>
<queue name="jms.queue.TestQueue" />
</anycast>
</address>
</addresses>