编辑:
主要的问题是第三方的rabbitmq机器似乎不时地关闭空闲连接。从那时起,我开始得到“断管”的例外情况。唯一能得到通讯的方法。回到正常状态是让我终止进程并重新启动它们。我想还有更好的方法吗?
--
我有点迷路了。我是连接到第三方RabbitMQ服务器推送消息到。每隔一段时间,他们机器上的所有插座都会掉下来,我最终会得到一个“断管”异常。
我被告知要在代码中实现心跳检查,但我不确定具体如何实现。我在这里找到了一些信息:http://kombu.readthedocs.org/en/latest/changelog.html#version-2-3-0,但没有真正的示例代码。
我只需要将"?heartbeat=x“添加到连接字符串中吗?其他的是Kombu吗?我知道我需要在"x/2“处调用"Connection.heartbeat_check()”。我应该创建一个周期性的任务来称之为这一点吗?连接是如何重新建立的?
我在用:
我的代码现在看起来是这样的。调用一个简单的芹菜任务将消息发送到第三方RabbitMQ服务器(删除日志和注释以使其足够简短、足够基本):
class SendMessageTask(Task):
name = "campaign.backends.send"
routing_key = "campaign.backends.send"
ignore_result = True
default_retry_delay = 60 # 1 minute.
max_retries = 5
def run(self, send_to, message, **kwargs):
payload = "Testing message"
try:
conn = BrokerConnection(
hostname=HOSTNAME,
port=PORT,
userid=USER_ID,
password=PASSWORD,
virtual_host=VHOST
)
with producers[conn].acquire(block=True) as producer:
publish = conn.ensure(producer, producer.publish, errback=sending_errback, max_retries=3)
publish(
body=payload,
routing_key=OUT_ROUTING_KEY,
delivery_mode=2,
exchange=EXCHANGE,
serializer=None,
content_type='text/xml',
content_encoding = 'utf-8'
)
except Exception, ex:
print ex谢谢你的帮助。
发布于 2013-01-29 17:16:18
虽然您当然可以向生产者添加心跳支持,但对于消费者流程来说,它更有意义。
启用心跳意味着必须定期发送心跳,例如,如果将心跳设置为1秒,则必须每秒钟或更长时间发送一次心跳,否则遥控器将关闭连接。
这意味着您必须使用单独的线程或使用异步io来及时可靠地发送心跳,而且由于无法在线程之间共享连接,这就给我们留下了异步io。
好消息是,你可能不会得到太多的好处,增加心跳的生产-唯一的连接。
https://stackoverflow.com/questions/14581986
复制相似问题