代码之家  ›  专栏  ›  技术社区  ›  dplesa

安静地将作业返回队列

  •  0
  • dplesa  · 技术社区  · 6 年前

    我有一个定义了作业队列的RabBMQ,我用Java通过Spring框架来消耗它。我知道,如果在处理从队列接收到的作业时在代码中的某个地方抛出异常,那么该作业将返回到队列。但是,是否有其他方法可以将作业返回到队列,而不引发异常,或者“手动”将作业返回到队列?

    1 回复  |  直到 6 年前
        1
  •  1
  •   kinjelom    6 年前

    解决方案可能取决于您使用的抽象,但是:


    春云溪

    我会用一个 Spring Cloud Stream Processor 处理消息并设置 routingKeyExpression .

    绑定:

    => theSourceQueue 
       => myProcessor(message) consumes messages and setts routing key as a header
          => DestinationExchange 
             => route 1 => theSourceQueue
             => route 2 => ?
    

    弹簧AMQP

    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.messaging.handler.annotation.Header;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    
    @Component
    public class Receiver {
        @RabbitListener(queues = "my-messages")
        public void receiveMessage(String payload, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
            System.out.println("Received message: `" + payload + "`, deliveryTag: " + deliveryTag);
            channel.basicNack(deliveryTag, false, true);
        }
    }
    

    Java客户端

    你也可以去下层使用 negative acknowledgement and re-queue :

    此示例通过对代理的一次调用拒绝两条消息(上的第二个参数 basicNack 是多个标志):

    GetResponse gr1 = channel.basicGet("some.queue", false);
    GetResponse gr2 = channel.basicGet("some.queue", false);
    channel.basicNack(gr2.getEnvelope().getDeliveryTag(), true, true);
    

    当消息被重新排队时,如果可能的话,它将被放在队列中的原始位置。如果不是(由于多个消费者共享一个队列时来自其他消费者的并发传递和确认),则消息将重新排队到靠近队列头的位置。