代码之家  ›  专栏  ›  技术社区  ›  Ben

Spring Rabbitmq获取对扇出消息的所有答复

  •  0
  • Ben  · 技术社区  · 6 年前

    中的以下类包含在几个使用者应用程序中:

    @Component
    @Configuration
    public class HealthListener {
    
        public static final String HEALTH_CHECK_QUEUE_NAME = "healthCheckQueue";
        public static final String HEALTH_CHECK_FANOUT_EXCHANGE_NAME = "health-check-fanout";
    
    
        @Bean
        public Binding healthListenerBinding(
                @Qualifier("healthCheckQueue") Queue queue,
                @Qualifier("instanceFanoutExchange") FanoutExchange exchange) {
            return BindingBuilder.bind(queue).to(exchange);
        }
    
        @Bean
        public FanoutExchange instanceFanoutExchange() {
            return new FanoutExchange(HEALTH_CHECK_FANOUT_EXCHANGE_NAME, true, false);
        }
    
        @Bean
        public Queue healthCheckQueue() {
            return new Queue(HEALTH_CHECK_QUEUE_NAME);
        }
    
        @RabbitListener(queues = HEALTH_CHECK_QUEUE_NAME)
        public String healthCheck() {
            return "some result";
        }
    
    }
    

    我正在尝试向Fanout Exchange发送消息,并接收所有回复,以了解哪些消费者正在运行。

    我可以发送一条消息并得到这样的第一个回复:

    @Autowired
    RabbitTemplate template;
    
    // ...
    String firstReply = template.convertSendAndReceiveAsType("health-check-fanout", "", "", ParameterizedTypeReference.forType(String.class));
    

    不过,我需要对这条信息做最全面的回复,而不仅仅是第一条。我需要设置一个应答侦听器,但我不确定如何设置。

    1 回复  |  直到 6 年前
        1
  •  1
  •   Gary Russell    6 年前

    这个 (convertS|s)endAndReceive.*() 方法不是用来处理多个答复的;它们严格来说是一个请求/一个答复方法。

    你需要使用 (convertAndS|s)end() 方法发送请求,并实现您自己的答复机制,可能使用侦听器容器进行答复,以及一些组件来聚合答复。

    您可以使用类似Spring集成聚合器这样的工具,但是您需要一些机制( ReleaseStrategy )这将知道何时收到所有预期的答复。

    或者,您可以简单地接收离散的答复并单独处理它们。

    编辑

    @SpringBootApplication
    public class So54207780Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So54207780Application.class, args);
        }
    
        @Bean
        public ApplicationRunner runner(RabbitTemplate template) {
            return args -> template.convertAndSend("fanout", "", "foo", m -> {
                m.getMessageProperties().setReplyTo("replies");
                return m;
            });
        }
    
        @RabbitListener(queues = "queue1")
        public String listen1(String in) {
            return in.toUpperCase();
        }
    
        @RabbitListener(queues = "queue2")
        public String listen2(String in) {
            return in + in;
        }
    
        @RabbitListener(queues = "replies")
        public void replyHandler(String reply) {
            System.out.println(reply);
        }
    
        @Bean
        public FanoutExchange fanout() {
            return new FanoutExchange("fanout");
        }
    
        @Bean
        public Queue queue1() {
            return new Queue("queue1");
        }
    
        @Bean
        public Binding binding1() {
            return BindingBuilder.bind(queue1()).to(fanout());
        }
    
        @Bean
        public Queue queue2() {
            return new Queue("queue2");
        }
    
        @Bean
        public Binding binding2() {
            return BindingBuilder.bind(queue2()).to(fanout());
        }
    
        @Bean
        public Queue replies() {
            return new Queue("replies");
        }
    
    }
    

    FOO
    foofoo