我正在测试kombu是如何工作的。我计划在几个项目中替换pika。我看到kombu有很多文档,但是根据我在文档中找到的文档,一些消息丢失了。下面是代码:
from kombu import Connection, Producer
conn = Connection('amqp://localhost:5672')
def errback(exc, interval):
logger.error('Error: %r', exc, exc_info=1)
logger.info('Retry in %s seconds.', interval)
producer = Producer(conn)
publish = conn.ensure(producer, producer.publish, errback=errback, max_retries=3)
for i in range(1, 200000):
publish({'hello': 'world'}, routing_key='test_queue')
time.sleep(0.001)当它发布时,我多次关闭连接,它继续发布,但队列中大约有60000条消息,所以有很多丢失的消息。
我尝试过不同的方法,例如:
publish({'hello': 'world'}, retry=True, mandatory=True, routing_key='hipri')谢谢!
发布于 2014-01-09 17:40:55
问题是默认情况下Kombu不使用'confirm',你必须使用:
conn = Connection('amqp://localhost:5672', transport_options={'confirm_publish': True})谢谢
https://stackoverflow.com/questions/20975914
复制相似问题