我正在实现一个日常作业,它从MongoDB (大约300 K文档)获取数据,并为每个文档在RabbitMQ队列上发布一条消息。另一方面,我在同一个队列中有一些消费者,理想情况下,这些用户应该并行工作。
一切都很正常,但没有我想的那么多,特别是关于消费者的表现。
我是这样声明队列的:
rabbitMQ.getChannel().queueDeclare(QUEUE_NAME, true, false, false, null);以下是发布的方式:
rabbitMQ.getChannel().basicPublish("", QUEUE_NAME, null, body.getBytes());因此,用于声明队列的通道用于发布所有消息。
这就是在for循环中实例化使用者的方式(总共10,但它可以是任意数字):
Channel channel = rabbitMQ.getConnection().createChannel();
MyConsumer consumer = new MyConsumer(customMapper, channel, subscriptionUpdater);
channel.basicQos(1); // also tried with 0, 10, 100, ...
channel.basicConsume(QUEUE_NAME, false, consumer);因此,对于每个使用者,我创建了一个新的通道,这是由日志确认的:
...
com.rabbitmq.client.impl.recovery.AutorecoveringChannel@bdd2027
com.rabbitmq.client.impl.recovery.AutorecoveringChannel@5d1b9c3d
com.rabbitmq.client.impl.recovery.AutorecoveringChannel@49a26d19
...据我所了解,从我非常短的RabbitMQ经验,这应该可以保证所有的消费者被调用。顺便说一下,消费者需要0.5到1.2秒来完成他们的任务。我刚刚发现了很短的3秒。
我有两个单独的队列,我重复了两次(使用相同的RabbitMQ连接)。
因此,我已经测试了为每个队列发布100条消息。这两家公司都有10名qos=1用户。
我并不期望交付/消费性能达到10/s,而是注意到:


我是否遗漏了RabbitMQ中线程化的主要概念?或者任何可能仍然是默认值的特定配置?我已经做了几天了,所以这可能是可能的。
请注意,我处于一个幸运的位置,可以控制发布和消费部分:)
我在本地使用RabbitMQ 3.7.3,所以不会出现任何网络延迟问题。
谢谢你的帮忙!
发布于 2018-04-19 07:01:44
RabbitMQ通道和使用者的设置最终是正确的:因此每个消费者都有一个通道。
问题在于使用者调用同步方法来查找和更新MongoDB文档。
这拖延了一些消费者的执行时间:甚至最糟糕的是,我添加的消费者越多(考虑加快处理),我得到的消息速率/s就越少。
我已经将MongoDB部分移到了发布端,在这里,我不必关心同步,因为它只由一个发布者按顺序完成。我有一个轻微下降的交货率/s,但现在只有5个消费者,我很容易达到50-60/s。
汲取的经验教训:
作为一般规则:
这将为您提供一个初始的qos值,以便根据您的场景进行测试和调整。最后的目标是为所有可用的消费者提供100%的利用率。
https://stackoverflow.com/questions/49860367
复制相似问题