有没有可能用同样的方法并行处理多个amqp消息呢?这个方法是用@Incoming("queue")注释的,带有quarkus和smallrye-reactive-messaging。
更准确地说,我有以下类:
@ApplicationScoped
public class Receiver {
@Incoming("test-queue")
public void process(String input) {
System.out.println("start processing:" + input);
try {
Thread.sleep(10_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end processing:" + input);
}
}使用application.properties中的配置:
amqp-host: localhost
amqp-port: 5672
amqp-username: quarkus
amqp-password: quarkus
mp.messaging.incoming.test-queue.connector: smallrye-amqp
mp.messaging.incoming.test-queue.address: test-queue现在,我想通过配置来定义有多少消息可以并行处理。例如,在4核cpu上,它应该并行运行4个。
目前,我只能添加4个具有不同名称的方法副本来实现这种并行性,但这是不可配置的。
发布于 2020-12-09 23:39:25
我不确定,但我不认为响应式消息支持你所要求的。
然而,你可以用另一种方式来做你想做的事情。我认为这也是使用消息传递的一种更好的整体模式。
http://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/2.5/amqp/amqp.html#amqp-inbound
找到带有CompletionStage和显式ack()的示例。该变体是异步的,因此如果您将其与Java现有的并发工具结合使用,您将获得高效的并行处理。
我会将传入的工作发送给executor,然后在任务完成时执行任务ack()。
https://stackoverflow.com/questions/65092607
复制相似问题