解决方案可能取决于您使用的抽象,但是:
春云溪
我会用一个
Spring Cloud Stream Processor
处理消息并设置
routingKeyExpression
.
绑定:
=> theSourceQueue
=> myProcessor(message) consumes messages and setts routing key as a header
=> DestinationExchange
=> route 1 => theSourceQueue
=> route 2 => ?
弹簧AMQP
import com.rabbitmq.client.Channel;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class Receiver {
@RabbitListener(queues = "my-messages")
public void receiveMessage(String payload, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
System.out.println("Received message: `" + payload + "`, deliveryTag: " + deliveryTag);
channel.basicNack(deliveryTag, false, true);
}
}
Java客户端
你也可以去下层使用
negative acknowledgement and re-queue
:
此示例通过对代理的一次调用拒绝两条消息(上的第二个参数
basicNack
是多个标志):
GetResponse gr1 = channel.basicGet("some.queue", false);
GetResponse gr2 = channel.basicGet("some.queue", false);
channel.basicNack(gr2.getEnvelope().getDeliveryTag(), true, true);
当消息被重新排队时,如果可能的话,它将被放在队列中的原始位置。如果不是(由于多个消费者共享一个队列时来自其他消费者的并发传递和确认),则消息将重新排队到靠近队列头的位置。