我有一个BlockingConnection,我遵循例句的皮卡文档。但在所有这些代码中,开始使用消息的代码示例如下:
connection = pika.BlockingConnection()
channel = connection.channel()
channel.basic_consume('test', on_message)
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
connection.close()(有或多或少的细节)。
我必须编写许多脚本,并且我想一个接一个地运行(用于测试/研究目的)。但是上面的代码要求我在每个代码中添加^C。
我试图添加一些超时在文件中解释,但我没有运气。例如,如果我为set找到一个参数如果客户端在最后X秒内不消耗任何消息,那么脚本finish。这是放放皮卡的地方吗?还是我必须改变方法?
发布于 2021-08-25 15:51:18
import pika
parameters = pika.ConnectionParameters(host="localhost")
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
def ack_message(channel, method):
"""Note that `channel` must be the same pika channel instance via which
the message being ACKed was retrieved (AMQP protocol constraint).
"""
if channel.is_open:
channel.basic_ack(method.delivery_tag)
else:
# Channel is already closed, so we can't ACK this message;
# log and/or do something that makes sense for your app in this case.
pass
def callback(channel,method, properties, body):
ack_message(channel,method)
print("body",body, flush=True)
channel.basic_consume(
queue="hello", on_message_callback=callback)
channel.start_consuming()
connection.close()原来的密码是卢克·巴肯的答案。
但我已经编辑了一些代码。
:)
发布于 2022-06-13 12:33:33
为时已晚,但也许有人从中受益。可以在blocked_connection_timeout中使用pika.ConnectionParameters()参数,如下所示,
connection = pika.BlockingConnection(
pika.ConnectionParameters(
heartbeat=600,
blocked_connection_timeout=600,
host=self.queue_host,
port=constants.RABBTIMQ_PORT,
virtual_host=self.rabbitmq_virtual_host,
credentials=pika.PlainCredentials(
username=self.rabbitmq_username,
password=self.rabbitmq_password
)
)
)https://stackoverflow.com/questions/56999451
复制相似问题