首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >SQSListener与ThreadpoolExecutor

SQSListener与ThreadpoolExecutor
EN

Stack Overflow用户
提问于 2018-04-27 21:13:16
回答 1查看 10.2K关注 0票数 13

在下面的示例中,我将最大和核心池大小设置为1。但是没有处理任何消息。当我启用调试日志时,我能够看到从SQS中提取的消息,但我猜它没有被处理/删除。但是,当我将核心和最大池大小增加到2时,消息似乎会被处理。

编辑

我相信Spring可能会为接收者分配一个线程,用于从队列中读取数据,因此它无法将线程分配给正在处理消息的侦听器。当我将核心池大小增加到2时,我看到消息正在从队列中读取。当我添加另一个侦听器(对于死信队列)时,我遇到了同样的问题--2个线程不够,因为消息没有被处理。当我将核心池大小增加到3时,它开始处理消息。在本例中,我假设分配了一个线程来读取队列中的消息,每个线程分配了两个侦听器。

代码语言:javascript
复制
@Configuration
public class SqsListenerConfiguration {

    @Bean
    @ConfigurationProperties(prefix = "aws.configuration")
    public ClientConfiguration clientConfiguration() {
        return new ClientConfiguration();
    }


    @Bean
    @Primary
    public AWSCredentialsProvider awsCredentialsProvider() {

        ProfileCredentialsProvider credentialsProvider = new ProfileCredentialsProvider("credential");
        try {
            credentialsProvider.getCredentials();
            System.out.println(credentialsProvider.getCredentials().getAWSAccessKeyId());
            System.out.println(credentialsProvider.getCredentials().getAWSSecretKey());

        } catch (Exception e) {
            throw new AmazonClientException(
                    "Cannot load the credentials from the credential profiles file. " +
                            "Please make sure that your credentials file is at the correct " +
                            "location (~/.aws/credentials), and is in valid format.",
                    e);
        }
        return credentialsProvider;
    }


    @Bean
    @Primary
    public AmazonSQSAsync amazonSQSAsync() {
        return AmazonSQSAsyncClientBuilder.standard().
                withCredentials(awsCredentialsProvider()).
                withClientConfiguration(clientConfiguration()).
                build();
    }


    @Bean
    @ConfigurationProperties(prefix = "aws.queue")
    public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAsync amazonSQSAsync) {
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
        simpleMessageListenerContainer.setAmazonSqs(amazonSQSAsync);
        simpleMessageListenerContainer.setMessageHandler(queueMessageHandler());
        simpleMessageListenerContainer.setMaxNumberOfMessages(10);
        simpleMessageListenerContainer.setTaskExecutor(threadPoolTaskExecutor());
        return simpleMessageListenerContainer;
    }


    @Bean
    public QueueMessageHandler queueMessageHandler() {
        QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
        queueMessageHandlerFactory.setAmazonSqs(amazonSQSAsync());
        QueueMessageHandler queueMessageHandler = queueMessageHandlerFactory.createQueueMessageHandler();
        return queueMessageHandler;
    }


    @Bean
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(1);
        executor.setMaxPoolSize(1);
        executor.setThreadNamePrefix("oaoQueueExecutor");
        executor.initialize();
        return executor;
    }


    @Bean
    public QueueMessagingTemplate messagingTemplate(@Autowired AmazonSQSAsync amazonSQSAsync) {
        return new QueueMessagingTemplate(amazonSQSAsync);
    }


}

侦听器Config

代码语言:javascript
复制
    @SqsListener(value = "${oao.sqs.url}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
    public void onMessage(String serviceData, @Header("MessageId") String messageId, @Header("ApproximateFirstReceiveTimestamp") String approximateFirstReceiveTimestamp) {

        System.out.println(" Data = " + serviceData + " MessageId = " + messageId);

        repository.execute(serviceData);
}
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-04-30 15:04:22

通过相同的设置corePoolSizemaximumPoolSize,您可以创建一个fixed-size thread pool。对规则的一个非常好的解释被记录为这里

设置maxPoolSize隐式允许删除任务。但是,默认的队列容量是Integer.MAX_VALUE,实际上它是无穷大的。

值得注意的是,ThreadPoolTaskExecutor在下面使用了一个ThreadPoolExecutor,它有一种不寻常的排队方法,用医生们来描述。

如果corePoolSize或更多线程正在运行,执行器总是更喜欢排队请求,而不是添加新线程。

这意味着,只有当队列已满时,maxPoolSize才是相关的,否则线程的数量将永远不会超过corePoolSize。例如,如果我们向线程池提交从未完成的任务:

  • 第一个corePoolSize提交将启动一个新线程,每个线程;
  • 在此之后,所有提交的材料都进入队列;
  • 如果队列是有限的,并且其容量被耗尽,则每个提交将启动一个新线程,直到maxPoolSize
  • 当池和队列都已满时,新提交将被拒绝。

队列-读取文档

任何BlockingQueue都可以用于传输和保存提交的任务。使用此队列与池大小进行交互:

  • 如果运行的线程少于corePoolSize线程,则执行器总是更喜欢添加新线程而不是排队。
  • 如果corePoolSize或更多线程正在运行,执行器总是更喜欢排队请求,而不是添加新线程。
  • 如果请求不能排队,将创建一个新线程,除非这将超过maximumPoolSize,在这种情况下,任务将被拒绝。

Unbounded queues。使用无界队列(例如,没有预定义容量的LinkedBlockingQueue )将导致新任务在所有corePoolSize线程都繁忙的情况下排队。因此,将只创建corePoolSize线程。(因此,maximumPoolSize的值没有任何影响。)

  1. 如果线程数小于corePoolSize,则创建一个新线程来运行新任务。
  2. 如果线程数等于(或大于) corePoolSize,则将任务放入队列。
  3. 如果队列已满,且线程数小于maxPoolSize,则创建一个新线程以在其中运行任务。
  4. 如果队列已满,且线程数大于或等于maxPoolSize,则拒绝该任务。
票数 18
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50070384

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档