首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >stop_consuming不起作用

stop_consuming不起作用
EN

Stack Overflow用户
提问于 2014-12-23 16:21:42
回答 2查看 8.9K关注 0票数 10

我刚接触过狂犬病和兔皮卡,在停止消费方面遇到了困难。

通道和队列设置:

代码语言:javascript
复制
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue=new_task_id, durable=True, auto_delete=True)

基本上,消费者和生产者都是这样的:

消费者:

代码语言:javascript
复制
def task(task_id):
    def callback(channel, method, properties, body):
        if body != "quit":
            print(body)
        else:
            print(body)
            channel.stop_consuming(task_id)

    channel.basic_consume(callback, queue=task_id, no_ack=True)
    channel.start_consuming()
    print("finish")
    return "finish"

制片人:

代码语言:javascript
复制
proc = Popen(['app/sample.sh'], shell=True, stdout=PIPE)
while proc.returncode is None:  # running
    line = proc.stdout.readline()
    if line:
        channel.basic_publish(
            exchange='',
            routing_key=self.request.id,
            body=line
        )
    else:
        channel.basic_publish(
            exchange='',
            routing_key=self.request.id,
            body="quit"
        )
        break

消费者task给了我输出:

代码语言:javascript
复制
# ... output from sample.sh, as expected

quit
�}q(UstatusqUSUCCESSqU  tracebackqNUresultqNUtask_idqU
1419350416qUchildrenq]u.

然而,"finish"没有被打印出来,所以我猜这是因为channel.stop_consuming(task_id)并没有停止消费。如果是的话,正确的做法是什么?谢谢。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2016-02-14 13:35:09

我也有同样的问题。这似乎是由内部start_consuming调用self.connection.process_data_events(time_limit=None)这一事实造成的。这个time_limit=None使它挂起。

我设法解决了这个问题,将对channel.start_consuming()的调用替换为它的内嵌:

代码语言:javascript
复制
while channel._consumer_infos:
    channel.connection.process_data_events(time_limit=1) # 1 second
票数 8
EN

Stack Overflow用户

发布于 2022-03-25 10:42:40

我有一个用通道和连接的成员变量定义的类。它们由单独的线程初始化。MyClient类的使用者使用close()方法,连接和使用者被停止!

代码语言:javascript
复制
class MyClient:

    def __init__(self, unique_client_code):
        self.Channel = None
        self.Conn: pika.BlockingConnection = None  
        self.ClientThread = self.init_client_driver()

    def _close_callback(self):
        self.Channel.stop_consuming()
        self.Channel.close()
        self.Conn.close()
    def _client_driver_thread(self, tmout=None):
        print("Starting Driver Thread...")
        self.Conn = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
        self.Channel = self.Conn.channel()

    def init_client_driver(self, tmout=None):
        kwargs = {'tmout': tmout}
        t = threading.Thread(target=self._client_driver_thread, kwargs=kwargs)
        t.daemon = True
        t.start()
        return t

    def close(self):
        self.Conn.add_callback_threadsafe(self._close_callback)
        self.ClientThread.join()
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/27624166

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档