首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >RabbitMQ:快速生产者和慢速消费者

RabbitMQ:快速生产者和慢速消费者
EN

Stack Overflow用户
提问于 2014-10-29 04:14:56
回答 5查看 22K关注 0票数 21

我有一个应用程序,它使用RabbitMQ作为消息队列,在发送方和接收方两个组件之间发送/接收消息。发送者以非常快的方式发送消息。接收方接收消息,然后执行一些非常耗时的任务(主要是针对非常大的数据量写入数据库)。由于接收方需要很长时间来完成任务,然后检索队列中的下一条消息,因此发送方将不断快速地填满队列。所以我的问题是:这会导致消息队列溢出吗?

消息使用者如下所示:

代码语言:javascript
复制
public void onMessage() throws IOException, InterruptedException {
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    String queueName = channel.queueDeclare("allDataCase", true, false, false, null).getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, "");

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(queueName, true, consumer);

    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println(" [x] Received '" + message + "'");

        JSONObject json = new JSONObject(message);
        String caseID = json.getString("caseID");
        //following takes very long time            
        dao.saveToDB(caseID);
    }
}

消费者收到的每条消息都包含一个caseID。对于每个caseID,它会将大量数据保存到数据库中,这需要很长的时间。由于生产者/消费者对caseID的发布/订阅使用相同的队列,因此目前只为RabbitMQ设置了一个消费者。那么,如何加快消费者吞吐量,以便消费者能够赶上生产者,并避免队列中的消息溢出?我应该在消费者部分使用多线程来加快消耗速度吗?或者我是否应该使用多个消费者同时使用传入的消息?或者,是否有任何异步方法可以让使用者异步使用消息,而无需等待消息完成?欢迎提出任何建议。

EN

回答 5

Stack Overflow用户

发布于 2014-10-31 02:16:10

“这会导致消息队列溢出吗?”

是。RabbitMQ将进入“流控制”状态,以防止随着队列长度的增加而过度消耗内存。它还将开始将消息持久化到磁盘,而不是将它们保存在内存中。

“如何加快消费者吞吐量,使消费者能够赶上生产者,并避免队列中的消息溢出。”

您有两个选项:

  1. 增加了更多的消费者。请记住,如果您选择此选项,您的数据库现在将由多个并发进程操作。确保DB能够承受消费通道的额外pressure.
  2. Increase和QOS值。这将从队列中拉出更多消息,并将它们缓冲到消费者上。这将增加整体处理时间;如果缓冲了5条消息,则第5条消息将花费消息1的处理时间...5来完成。

“我应该在消费者部分使用多线程来加快消耗速度吗?”

除非你有一个设计良好的解决方案。向应用程序添加并行性将在消费者端增加大量开销。您最终可能会耗尽ThreadPool或限制内存使用量。

在处理AMQP时,您确实需要考虑每个流程的业务需求,以便设计最佳解决方案。你收到的消息的时间敏感度如何?它们是否需要尽快持久化到DB中,或者数据是否立即可用对您的用户来说很重要?

如果数据不需要立即持久化,您可以修改您的应用程序,以便使用者简单地从队列中删除消息,并将它们保存到缓存集合中,例如在Redis中。引入第二个进程,然后依次读取和处理缓存的消息。这将确保您的队列长度不会增长到足以导致流控制的程度,同时防止您的数据库受到写请求的轰炸,写请求通常比读请求更昂贵。您的使用者现在只需从队列中删除消息,稍后将由另一个进程处理。

票数 17
EN

Stack Overflow用户

发布于 2014-10-29 05:24:06

你有很多方法来提高你的表现。

  1. 您可以创建一个具有更多生产者的工作队列,通过这种方式,您可以创建一个简单的负载平衡系统。不要使用exchange-> queue,只使用queue。阅读此帖子RabbitMQ Non-Round Robin Dispatching
  2. 当您收到一条消息时,您可以创建一个池线程来在数据库中插入数据,但在这种情况下,您必须处理失败。

但我认为主要的问题是数据库,而不是RabbitMQ。通过良好的调优、多线程和工作队列,您可以拥有可伸缩且快速的解决方案。

让我知道

票数 3
EN

Stack Overflow用户

发布于 2014-10-31 04:17:38

虽然添加更多的消费者可能会加快速度,但真正的问题是保存到数据库中。

这里已经有很多关于添加消费者(线程和/或机器)和更改QoS的答案,所以我不打算重复这一点。相反,您应该认真考虑使用Aggregator模式将消息聚合到一组消息中,然后一次性将该组批量插入到数据库中。

您当前为每条消息编写的代码可能会打开一个连接,插入数据,然后关闭该连接(或返回池)。更糟糕的是,它甚至可能使用事务。

通过使用聚合器模式,您基本上可以在刷新之前缓冲数据。

现在,编写一个好的聚合器是很棘手的。你需要决定如何缓冲(即每个工人都有自己的缓冲区或者像Redis这样的中央缓冲区)。我相信Spring集成有一个聚合器。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/26617649

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档