我已经为应用程序中注册的每个队列创建了一个随需应变的ChannelAdapter、AsyncTaskExecutor和一个通道。我注意到当AsyncTaskExecutor的maxPoolSize数等于1时,消息不会被处理。这就是AsyncTaskExecutor bean的创建方式。
static void registerAsyncTaskExecutor(final Consumer consumer, final GenericApplicationContext registry) {
final TaskExecutor executor = consumer.getExecutor();
final BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(ThreadPoolTaskExecutor.class);
builder.addPropertyValue("corePoolSize", executor.getCorePoolSize());
builder.addPropertyValue("maxPoolSize", executor.getMaxPoolSize());
builder.addPropertyValue("threadNamePrefix", consumer.getName() + "-");
final String beanName = executor.getName();
final BeanDefinition beanDefinition = builder.getBeanDefinition();
registry.registerBeanDefinition(beanName, beanDefinition);
}我注意到的另一件事是,当这个方法被称为java.util.concurrent.ThreadPoolExecutor#execute时,这个条件workerCountOf(c) < corePoolSize总是为false。完整的项目链接在这里https://github.com/LeoFuso/spring-integration-aws-demo
发布于 2020-09-11 22:45:39
只为某个可管理组件提供一个线程的线程池总是不好的做法。您可能不知道该组件将对您的线程池做什么,事实上,您的单个线程在内部被某个长期任务占用,并且所有新任务都将在队列中停滞,等待该单个线程释放,这可能不会发生。
事实上,这就是我们从Spring Cloud AWS获得的AsynchronousMessageListener,它由前面提到的SqsMessageDrivenChannelAdapter使用
public void run() {
while (isQueueRunning()) {所以,或者依赖于默认的执行器,或者提供足够的线程到你自己的。
看起来那里的逻辑是关于线程数量的:
int spinningThreads = this.getRegisteredQueues().size();
if (spinningThreads > 0) {
threadPoolTaskExecutor
.setCorePoolSize(spinningThreads * DEFAULT_WORKER_THREADS);因此,我们有确切的线程数量,因为我们提供SQS队列,外加2乘数的工作者。看起来我们需要为每个队列分配一个线程来轮询,并需要额外的线程来处理来自它们的消息。
(不是Spring Integration问题-更像是Spring Cloud AWS)。
https://stackoverflow.com/questions/63839869
复制相似问题