首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Pika/RabbitMQ:正确使用add_backpressure_callback

Pika/RabbitMQ:正确使用add_backpressure_callback
EN

Stack Overflow用户
提问于 2012-08-08 09:34:04
回答 1查看 676关注 0票数 2

我刚开始使用RabbitMQ和Pika,所以如果答案是显而易见的,请原谅…

我们正在提供一些数据,并将结果传递到rabbitmq消息队列中。队列正在被一个将数据写入elasticsearch的进程消耗。

产生数据的速度比输入弹性搜索的速度快,因此队列会增长,几乎不会缩小。

我们正在使用pika,并收到警告:

代码语言:javascript
复制
UserWarning: Pika: Write buffer exceeded warning threshold at X bytes and an estimated X frames behind.

这会持续一段时间,直到Pika崩溃,并显示一条奇怪的错误消息:

代码语言:javascript
复制
NameError: global name 'log' is not defined

我们使用的是Pika BlockingConnection对象(http://pika.github.com/connecting.html#blockingconnection)。

我的计划是使用add_backpressure_callback函数来拥有一个函数,每当我们需要施加背压时,该函数将调用time.sleep(0.5)。然而,这似乎是一个过于简单的解决方案,必须有一种更合适的方式来处理这样的事情。

我猜这是一种常见的情况,填充队列的速度快于消耗队列的速度。我正在寻找一个例子,甚至是一些建议,关于什么是减缓队列的最好方法。

谢谢!

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2012-08-17 16:43:34

有趣的问题,正如您正确地指出的那样,这可能是很常见的。我在Stack Overflow上看到了另一个相关的问题,其中有一些指针

Pika: Write buffer exceeded warning

此外,您可能想要考虑扩展您的elasticsearch,这可能是您想要修复的基本瓶颈。在elasticsearch.org网站上快速浏览一下,就会发现

“分布式

弹性搜索的主要特点之一是它的分布式特性。索引被分解为分片,每个分片有0个或更多副本。集群内的每个数据节点托管一个或多个分片,并充当协调器,将操作委托给正确的分片。重新平衡和路由是在幕后自动完成的。“

(...although不确定插入是否也是分布式和可伸缩的)

毕竟,RabbitMQ不应该无限地增加队列。还可能希望考虑扩展RabbitMQ本身,例如通过在RabbitMQ配置中使用每队列进程等。

干杯!

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/11856487

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档