代码之家  ›  专栏  ›  技术社区  ›  David

问题测试spring cloud SQS Listener

  •  7
  • David  · 技术社区  · 6 年前

    环境

    • 弹簧靴:1.5.13。释放
    • 云:Edgware。SR3
    • 云AWS:1.2.2。释放
    • 爪哇8
    • OSX 10.13.4

    问题

    我正在尝试为SQS编写一个集成测试。

    我有一个本地跑步项目 localstack 运行SQS的docker容器 TCP/4576

    在我的测试代码中,我定义了一个端点设置为本地4576的SQS客户机,它可以成功地连接并创建队列、发送消息和删除队列。我还可以使用SQS客户端接收消息,并提取我发送的消息。

    我的问题是,如果我删除手动接收消息的代码,以便允许另一个组件获取消息,那么似乎什么都没有发生。我有一个弹簧组件,注释如下:

    听众

    @Component
    public class MyListener {
    @SqsListener(value = "my_queue", deletionPolicy = ON_SUCCESS)
        public void receive(final MyMsg msg) {
            System.out.println("GOT THE MESSAGE: "+ msg.toString());
        }
    }
    

    测验

    @RunWith(SpringRunner.class)
    @SpringBootTest(properties = "spring.profiles.active=test")
    public class MyTest {
    
        @Autowired
        private AmazonSQSAsync amazonSQS;
    
        @Autowired
        private SimpleMessageListenerContainer container;
    
        private String queueUrl;
    
        @Before
        public void setUp() {
            queueUrl = amazonSQS.createQueue("my_queue").getQueueUrl();
        }
    
        @After
        public void tearDown() {
            amazonSQS.deleteQueue(queueUrl);
        }
    
        @Test
        public void name() throws InterruptedException {
            amazonSQS.sendMessage(new SendMessageRequest(queueUrl, "hello"));
            System.out.println("isRunning:" + container.isRunning());
            System.out.println("isActive:" + container.isActive());
            System.out.println("isRunningOnQueue:" + container.isRunning("my_queue"));
            Thread.sleep(30_000);
            System.out.println("GOT MESSAGE: " + amazonSQS.receiveMessage(queueUrl).getMessages().size());
        }
    
        @TestConfiguration
        @EnableSqs
        public static class SQSConfiguration {
    
            @Primary
            @Bean(destroyMethod = "shutdown")
            public AmazonSQSAsync amazonSQS() {
                final AwsClientBuilder.EndpointConfiguration endpoint = new AwsClientBuilder.EndpointConfiguration("http://127.0.0.1:4576", "eu-west-1");
                return new AmazonSQSBufferedAsyncClient(AmazonSQSAsyncClientBuilder
                        .standard()
                        .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("key", "secret")))
                        .withEndpointConfiguration(endpoint)
                        .build());
            }
        }
    }
    

    在测试日志中,我看到:

    o、 s.c.a.m.听众。QueueMessageHandler:1在MyListener类上找到的消息处理程序方法:{public void MyListener.receive(MyMsg)=org.springframework.cloud.aws.messaging.listener.QueueMessageHandler$MappingInformation@1cd4082a} 2018-05-31 22:50:39.582信息16329---

    o、 s.c.a.m.听众。QueueMessageHandler:映射的“org.springframework.cloud.aws.messaging.listener.QueueMessageHandler”$MappingInformation@1cd4082a“进入公共空间MyListener.receive(MyMsg)

    然后:

    isRunning:没错

    是的

    IsRuningQueuee:错误

    得到消息:1

    这表明,在发送消息与容器没有接收消息之间的30秒暂停时间内,当我手动轮询消息时,它就在队列中,我可以使用它。

    我的问题是,为什么不调用侦听器,为什么 isRunningOnQueue:false 表明该队列未自动启动的行?

    请注意,我也尝试设置自己的 SimpleMessageListenerContainer 将autostart显式设置为true(默认设置)的bean,并没有观察到行为的变化。我以为 org.springframework.cloud.aws.messaging.config.annotation.SqsConfiguration#simpleMessageListenerContainer 这是由 @EnableSqs 应该配置自动启动 SimpleMessageListenerContainer 这应该是投票给我的信息。

    我还设置了

    logging.level.org.apache.http=DEBUG
    logging.level.org.springframework.cloud=DEBUG
    

    在我的测试属性中,可以看到HTTP调用创建队列、发送消息和删除等,但没有要接收的HTTP调用(除了测试结束时的手动调用)。

    1 回复  |  直到 2 年前
        1
  •  5
  •   David    6 年前

    经过一番修补后,我发现了这一点。

    即使简单消息容器工厂被设置为非自动启动,它似乎也会进行初始化,这涉及到确定队列是否存在。

    在本例中,队列是在我的测试中用setup方法创建的——但遗憾的是,这是在spring上下文设置之后创建的,这意味着发生了异常。

    我通过简单地将队列创建移动到SQS客户机的上下文创建(在创建消息容器之前发生)来解决这个问题。即。:

    @Bean(destroyMethod = "shutdown")
            public AmazonSQSAsync amazonSQS() {
                final AwsClientBuilder.EndpointConfiguration endpoint = new AwsClientBuilder.EndpointConfiguration("http://localhost:4576", "eu-west-1");
                final AmazonSQSBufferedAsyncClient client = new AmazonSQSBufferedAsyncClient(AmazonSQSAsyncClientBuilder
                        .standard()
                        .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("dummyKey", "dummySecret")))
                        .withEndpointConfiguration(endpoint)
                        .build());
                client.createQueue("test-queue");
                return client;
            }