代码之家  ›  专栏  ›  技术社区  ›  Punter Vicky

带有ThreadpoolExecutor的SQSLister

  •  12
  • Punter Vicky  · 技术社区  · 6 年前

    在下面的示例中,我将最大和核心池大小设置为1。但是,未处理任何消息。当我启用调试日志时,我能够看到从SQS中提取的消息,但我猜它没有被处理/删除。然而,当我将core和max pool size增加到2时,消息似乎得到了处理。

    编辑

    我相信Spring可能会为从队列中读取数据的接收方分配一个线程,因此它无法将线程分配给正在处理消息的侦听器。当我将corepoolsize增加到2时,我看到消息正在从队列中读取。当我添加另一个监听器(用于死信队列)时,我遇到了相同的问题-2个线程不够,因为消息没有得到处理。当我将corepoolsize增加到3时,它开始处理消息。我假设在本例中,分配了1个线程来读取队列中的消息,并为每个侦听器分配了2个线程。

    @Configuration
    public class SqsListenerConfiguration {
    
        @Bean
        @ConfigurationProperties(prefix = "aws.configuration")
        public ClientConfiguration clientConfiguration() {
            return new ClientConfiguration();
        }
    
    
        @Bean
        @Primary
        public AWSCredentialsProvider awsCredentialsProvider() {
    
            ProfileCredentialsProvider credentialsProvider = new ProfileCredentialsProvider("credential");
            try {
                credentialsProvider.getCredentials();
                System.out.println(credentialsProvider.getCredentials().getAWSAccessKeyId());
                System.out.println(credentialsProvider.getCredentials().getAWSSecretKey());
    
            } catch (Exception e) {
                throw new AmazonClientException(
                        "Cannot load the credentials from the credential profiles file. " +
                                "Please make sure that your credentials file is at the correct " +
                                "location (~/.aws/credentials), and is in valid format.",
                        e);
            }
            return credentialsProvider;
        }
    
    
        @Bean
        @Primary
        public AmazonSQSAsync amazonSQSAsync() {
            return AmazonSQSAsyncClientBuilder.standard().
                    withCredentials(awsCredentialsProvider()).
                    withClientConfiguration(clientConfiguration()).
                    build();
        }
    
    
        @Bean
        @ConfigurationProperties(prefix = "aws.queue")
        public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAsync amazonSQSAsync) {
            SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
            simpleMessageListenerContainer.setAmazonSqs(amazonSQSAsync);
            simpleMessageListenerContainer.setMessageHandler(queueMessageHandler());
            simpleMessageListenerContainer.setMaxNumberOfMessages(10);
            simpleMessageListenerContainer.setTaskExecutor(threadPoolTaskExecutor());
            return simpleMessageListenerContainer;
        }
    
    
        @Bean
        public QueueMessageHandler queueMessageHandler() {
            QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
            queueMessageHandlerFactory.setAmazonSqs(amazonSQSAsync());
            QueueMessageHandler queueMessageHandler = queueMessageHandlerFactory.createQueueMessageHandler();
            return queueMessageHandler;
        }
    
    
        @Bean
        public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(1);
            executor.setMaxPoolSize(1);
            executor.setThreadNamePrefix("oaoQueueExecutor");
            executor.initialize();
            return executor;
        }
    
    
        @Bean
        public QueueMessagingTemplate messagingTemplate(@Autowired AmazonSQSAsync amazonSQSAsync) {
            return new QueueMessagingTemplate(amazonSQSAsync);
        }
    
    
    }
    

    侦听器配置

        @SqsListener(value = "${oao.sqs.url}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
        public void onMessage(String serviceData, @Header("MessageId") String messageId, @Header("ApproximateFirstReceiveTimestamp") String approximateFirstReceiveTimestamp) {
    
            System.out.println(" Data = " + serviceData + " MessageId = " + messageId);
    
            repository.execute(serviceData);
    }
    
    1 回复  |  直到 6 年前
        1
  •  16
  •   Vikram Palakurthi    6 年前

    按设置 corePoolSize maximumPoolSize 同样,您可以创建 fixed-size thread pool . 记录了对规则的非常好的解释 here

    背景 maxPoolSize 隐式允许删除任务。 但是,默认队列容量为 Integer.MAX_VALUE ,实际上,它是无限的。

    需要注意的是 ThreadPoolTaskExecutor 使用 ThreadPoolExecutor 下面介绍了一种有点不寻常的排队方法 the docs :

    如果 核心池大小 如果有一个或多个线程正在运行,执行器总是倾向于将请求排队,而不是添加新线程。

    这意味着 最大池大小 仅当队列已满时才相关,否则线程数将永远不会超过 核心池大小 . 例如,如果我们提交任务 永远不会完成的 到线程池:

    • 第一个 核心池大小 提交内容将分别启动一个新线程;
    • 之后,所有提交都进入队列;
    • 如果队列有限且其容量已耗尽,则每次提交都会启动一个新线程,最多 最大池大小 ;
    • 当池和队列都已满时,新提交将被拒绝。

    排队 -阅读 docs

    任何 BlockingQueue 可用于传输和保留提交的任务。此队列的使用与池大小交互:

    • 如果运行的线程少于corePoolSize,则执行器始终 更喜欢添加新线程,而不是排队。
    • 如果corePoolSize或更多线程正在运行,则执行器始终 宁愿对请求排队,也不愿添加新线程。
    • 如果请求无法排队,则会创建一个新线程,除非 这将超过maximumPoolSize,在这种情况下,任务将 被拒绝。

    Unbounded queues . 使用无界队列(例如 LinkedBlockingQueue 没有预定义的容量)将导致 在所有corePoolSize线程都繁忙的情况下要排队的任务。 因此,不超过 核心池大小 将永远创建线程。(以及 的值 最大工具大小 因此没有任何影响。)

    1. 如果线程数小于 核心池大小 ,新建 线程以运行新任务。
    2. 如果线程数等于(或大于) 核心池大小 ,将任务放入队列。
    3. 如果队列已满,且线程数小于 最大池大小 ,创建新线程以在其中运行任务。
    4. 如果队列已满,且线程数大于或 等于 最大池大小 ,拒绝任务。