使用spring云侦听AWS SQS队列,如下所示:
@SqsListener(value = "${queue.name}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void queueListener(String message, @Headers Map<String, Object> sqsHeaders) {
// code
}Spring配置:
<aws-messaging:annotation-driven-queue-listener
max-number-of-messages="10" wait-time-out="20" visibility-timeout="3600"
amazon-sqs="awsSqsClient" />AwsSqsClient:
@Bean
public com.amazonaws.services.sqs.AmazonSQSAsyncClient awsSqsClient() {
ExecutorService executorService = Executors.newFixedThreadPool(10);
return new AmazonSQSAsyncClient(new DefaultAWSCredentialsProviderChain(), executorService);
}这个很好用。
如上面的代码所示,配置了10个线程在SQS客户端中处理这些消息。这也很好,在任何时候最多处理10条消息。
问题是,我想不出控制投票间隔的方法。默认情况下,一旦所有线程都是空闲的,spring就会进行轮询。
例如,考虑以下示例
同时,大约有25条消息被传递到队列中。Spring不会轮询队列,直到之前交付的所有3条消息都完成。特别是,就像上面的例子一样,Spring只在20分钟后投票,尽管仍然有7个线程是免费的!!
知道我们怎么能控制这场投票吗?也就是说,如果有任何线程空闲,则应启动Poll,而不应等到所有线程都空闲时才开始。
发布于 2019-04-24 19:44:13
侦听器可以将消息加载到Spring应用程序中,并将它们与Acknowledgement和Visibility对象一起提交到另一个线程池中(如果您想控制这两个线程池的话)。
一旦消息被提交到这个线程池,侦听器就可以加载更多的数据。可以通过调整线程池设置来控制并发性。
您的侦听器的方法签名将类似于下面的一个:
@SqsListener(value = "${queueName}", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void listen(YourCustomPOJO pojo,
@Headers Map<String, Object> headers,
Acknowledgment acknowledgment,
Visibility visibility) throws Exception {
...... Send pojo to worker thread and return然后,工作线程将确认成功的处理。
acknowledgment.acknowledge().get();确保将消息可见性设置为大于最高处理时间的值(使用一些超时来限制执行时间)。
https://stackoverflow.com/questions/35860294
复制相似问题