我刚接触过狂犬病和兔皮卡,在停止消费方面遇到了困难。
通道和队列设置:
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue=new_task_id, durable=True, auto_delete=True)基本上,消费者和生产者都是这样的:
消费者:
def task(task_id):
def callback(channel, method, properties, body):
if body != "quit":
print(body)
else:
print(body)
channel.stop_consuming(task_id)
channel.basic_consume(callback, queue=task_id, no_ack=True)
channel.start_consuming()
print("finish")
return "finish"制片人:
proc = Popen(['app/sample.sh'], shell=True, stdout=PIPE)
while proc.returncode is None: # running
line = proc.stdout.readline()
if line:
channel.basic_publish(
exchange='',
routing_key=self.request.id,
body=line
)
else:
channel.basic_publish(
exchange='',
routing_key=self.request.id,
body="quit"
)
break消费者task给了我输出:
# ... output from sample.sh, as expected
quit
�}q(UstatusqUSUCCESSqU tracebackqNUresultqNUtask_idqU
1419350416qUchildrenq]u.然而,"finish"没有被打印出来,所以我猜这是因为channel.stop_consuming(task_id)并没有停止消费。如果是的话,正确的做法是什么?谢谢。
发布于 2016-02-14 13:35:09
我也有同样的问题。这似乎是由内部start_consuming调用self.connection.process_data_events(time_limit=None)这一事实造成的。这个time_limit=None使它挂起。
我设法解决了这个问题,将对channel.start_consuming()的调用替换为它的内嵌:
while channel._consumer_infos:
channel.connection.process_data_events(time_limit=1) # 1 second发布于 2022-03-25 10:42:40
我有一个用通道和连接的成员变量定义的类。它们由单独的线程初始化。MyClient类的使用者使用close()方法,连接和使用者被停止!
class MyClient:
def __init__(self, unique_client_code):
self.Channel = None
self.Conn: pika.BlockingConnection = None
self.ClientThread = self.init_client_driver()
def _close_callback(self):
self.Channel.stop_consuming()
self.Channel.close()
self.Conn.close()
def _client_driver_thread(self, tmout=None):
print("Starting Driver Thread...")
self.Conn = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
self.Channel = self.Conn.channel()
def init_client_driver(self, tmout=None):
kwargs = {'tmout': tmout}
t = threading.Thread(target=self._client_driver_thread, kwargs=kwargs)
t.daemon = True
t.start()
return t
def close(self):
self.Conn.add_callback_threadsafe(self._close_callback)
self.ClientThread.join()https://stackoverflow.com/questions/27624166
复制相似问题