我刚开始使用RabbitMQ和Pika,所以如果答案是显而易见的,请原谅…
我们正在提供一些数据,并将结果传递到rabbitmq消息队列中。队列正在被一个将数据写入elasticsearch的进程消耗。
产生数据的速度比输入弹性搜索的速度快,因此队列会增长,几乎不会缩小。
我们正在使用pika,并收到警告:
UserWarning: Pika: Write buffer exceeded warning threshold at X bytes and an estimated X frames behind.这会持续一段时间,直到Pika崩溃,并显示一条奇怪的错误消息:
NameError: global name 'log' is not defined我们使用的是Pika BlockingConnection对象(http://pika.github.com/connecting.html#blockingconnection)。
我的计划是使用add_backpressure_callback函数来拥有一个函数,每当我们需要施加背压时,该函数将调用time.sleep(0.5)。然而,这似乎是一个过于简单的解决方案,必须有一种更合适的方式来处理这样的事情。
我猜这是一种常见的情况,填充队列的速度快于消耗队列的速度。我正在寻找一个例子,甚至是一些建议,关于什么是减缓队列的最好方法。
谢谢!
发布于 2012-08-17 16:43:34
有趣的问题,正如您正确地指出的那样,这可能是很常见的。我在Stack Overflow上看到了另一个相关的问题,其中有一些指针
Pika: Write buffer exceeded warning
此外,您可能想要考虑扩展您的elasticsearch,这可能是您想要修复的基本瓶颈。在elasticsearch.org网站上快速浏览一下,就会发现
“分布式
弹性搜索的主要特点之一是它的分布式特性。索引被分解为分片,每个分片有0个或更多副本。集群内的每个数据节点托管一个或多个分片,并充当协调器,将操作委托给正确的分片。重新平衡和路由是在幕后自动完成的。“
(...although不确定插入是否也是分布式和可伸缩的)
毕竟,RabbitMQ不应该无限地增加队列。还可能希望考虑扩展RabbitMQ本身,例如通过在RabbitMQ配置中使用每队列进程等。
干杯!
https://stackoverflow.com/questions/11856487
复制相似问题