首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何使用videlalvaro/php-amqplib完成阻塞的basic_get调用

如何使用videlalvaro/php-amqplib完成阻塞的basic_get调用
EN

Stack Overflow用户
提问于 2015-11-29 12:22:42
回答 1查看 625关注 0票数 2

我正在使用https://github.com/videlalvaro/php-amqplib做一些rabbitmq工作:

我正在尝试创建一个阻塞版本的basic_get (或者是一个可以重复调用的basic_consume版本,每次只能得到一个msg ),它将阻塞,直到消息就绪,如果队列中没有消息,则返回它,而不是返回null。

当我试图用basic_consume抓取一条消息时,事情变得一团糟,我最终得到了一堆“未准备好”但未确认的消息。(如果我只用这种方式抓取一条消息,它每次都可以工作,如果我试图获取两条消息,它有时会挂起,而其他消息则可以工作)

代码语言:javascript
复制
class Foo {
    ...
    private function blockingGet() {
            /*
                queue: Queue from where to get the messages
                consumer_tag: Consumer identifier
                no_local: Don't receive messages published by this consumer.
                no_ack: Tells the server if the consumer will acknowledge the messages.
                exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
                nowait:
                callback: A PHP Callback
             */
            $this->ch->basic_consume($this->queueName, "consumer_".$this->consumerNum++, false, false, false, false, function($msg) {
                    $this->msgCache = json_decode($msg->body);
                    $this->ch->basic_ack($msg->delivery_info['delivery_tag']);
                    $this->ch->basic_cancel($msg->delivery_info['consumer_tag']);
            });
            while (count($this->ch->callbacks)) {
                    $this->ch->wait();
            }
            return $this->msgCache;
    }
}

$q = new Foo();
for ($i = 0; $i < 5; $i++) {
    print $q->blockingGet();
}
EN

回答 1

Stack Overflow用户

发布于 2017-04-11 16:32:48

我实现了与这里类似的功能,方法是将接收到的消息保存在传递给$channel->basic_consume()回调参数的闭包中,然后在$channel->wait()调用之后处理它,因为如果接收到消息(或者设置了timeout参数并达到超时),wait()将返回控制权。尝试如下所示:

代码语言:javascript
复制
class Foo {
    // ...
    public function __construct() {
        $this->ch->basic_consume($this->queueName, "", false, false, false, false, function($msg) {
                $this->msgCache = json_decode($msg->body);
                $this->ch->basic_ack($msg->delivery_info['delivery_tag']);
        });
    }
    // ...
    private function blockingGet() {
        $this->ch->wait();
        if ($this->msgCache) {
            $msgCache = $this->msgCache;
            $this->msgCache = null;
            return $msgCache;
        }
        return null;
    }
}

$q = new Foo();
for ($i = 0; $i < 5; $i++) {
    print $q->blockingGet();
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/33979388

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档