我有一个应用程序,它使用RabbitMQ作为消息队列,在发送方和接收方两个组件之间发送/接收消息。发送者以非常快的方式发送消息。接收方接收消息,然后执行一些非常耗时的任务(主要是针对非常大的数据量写入数据库)。由于接收方需要很长时间来完成任务,然后检索队列中的下一条消息,因此发送方将不断快速地填满队列。所以我的问题是:这会导致消息队列溢出吗?
消息使用者如下所示:
public void onMessage() throws IOException, InterruptedException {
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare("allDataCase", true, false, false, null).getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
JSONObject json = new JSONObject(message);
String caseID = json.getString("caseID");
//following takes very long time
dao.saveToDB(caseID);
}
}消费者收到的每条消息都包含一个caseID。对于每个caseID,它会将大量数据保存到数据库中,这需要很长的时间。由于生产者/消费者对caseID的发布/订阅使用相同的队列,因此目前只为RabbitMQ设置了一个消费者。那么,如何加快消费者吞吐量,以便消费者能够赶上生产者,并避免队列中的消息溢出?我应该在消费者部分使用多线程来加快消耗速度吗?或者我是否应该使用多个消费者同时使用传入的消息?或者,是否有任何异步方法可以让使用者异步使用消息,而无需等待消息完成?欢迎提出任何建议。
发布于 2014-10-31 02:16:10
“这会导致消息队列溢出吗?”
是。RabbitMQ将进入“流控制”状态,以防止随着队列长度的增加而过度消耗内存。它还将开始将消息持久化到磁盘,而不是将它们保存在内存中。
“如何加快消费者吞吐量,使消费者能够赶上生产者,并避免队列中的消息溢出。”
您有两个选项:
“我应该在消费者部分使用多线程来加快消耗速度吗?”
除非你有一个设计良好的解决方案。向应用程序添加并行性将在消费者端增加大量开销。您最终可能会耗尽ThreadPool或限制内存使用量。
在处理AMQP时,您确实需要考虑每个流程的业务需求,以便设计最佳解决方案。你收到的消息的时间敏感度如何?它们是否需要尽快持久化到DB中,或者数据是否立即可用对您的用户来说很重要?
如果数据不需要立即持久化,您可以修改您的应用程序,以便使用者简单地从队列中删除消息,并将它们保存到缓存集合中,例如在Redis中。引入第二个进程,然后依次读取和处理缓存的消息。这将确保您的队列长度不会增长到足以导致流控制的程度,同时防止您的数据库受到写请求的轰炸,写请求通常比读请求更昂贵。您的使用者现在只需从队列中删除消息,稍后将由另一个进程处理。
发布于 2014-10-29 05:24:06
你有很多方法来提高你的表现。
但我认为主要的问题是数据库,而不是RabbitMQ。通过良好的调优、多线程和工作队列,您可以拥有可伸缩且快速的解决方案。
让我知道
发布于 2014-10-31 04:17:38
虽然添加更多的消费者可能会加快速度,但真正的问题是保存到数据库中。
这里已经有很多关于添加消费者(线程和/或机器)和更改QoS的答案,所以我不打算重复这一点。相反,您应该认真考虑使用Aggregator模式将消息聚合到一组消息中,然后一次性将该组批量插入到数据库中。
您当前为每条消息编写的代码可能会打开一个连接,插入数据,然后关闭该连接(或返回池)。更糟糕的是,它甚至可能使用事务。
通过使用聚合器模式,您基本上可以在刷新之前缓冲数据。
现在,编写一个好的聚合器是很棘手的。你需要决定如何缓冲(即每个工人都有自己的缓冲区或者像Redis这样的中央缓冲区)。我相信Spring集成有一个聚合器。
https://stackoverflow.com/questions/26617649
复制相似问题