首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >用RabbitMQ解决问题

用RabbitMQ解决问题
EN

Stack Overflow用户
提问于 2018-03-12 00:42:53
回答 1查看 2.5K关注 0票数 8

我试图使用websockets将一些数据从php应用程序发送到用户的浏览器。因此,我决定将斯沃尔与RabbitMQ结合使用。

这是我第一次使用websockets,在阅读了一些关于Socket.IO、棘轮等的文章之后,我决定停止使用Swoole,因为它是用C编写的,而且与php一起使用非常方便。

这就是我如何理解使用websockets启用数据传输的思想: 1)在CLI 2) php应用程序中启动RabbitMQ worker和Swoole服务器,将数据发送到RabbitMQ 3) RabbitMQ向worker发送消息,4) Worker +通过数据+建立与Swoole socket服务器的套接字连接。5) Swoole服务器向所有连接广播数据

问题是如何将Swoole服务器与RabbitMQ绑定?或者如何使RabbitMQ与Swoole建立连接并发送数据?

以下是代码:

Swoole服务器(swoole_sever.php)

代码语言:javascript
复制
$server = new \swoole_websocket_server("0.0.0.0", 2345, SWOOLE_BASE);

$server->on('open', function(\Swoole\Websocket\Server $server, $req)
{
    echo "connection open: {$req->fd}\n";
});

$server->on('message', function($server, \Swoole\Websocket\Frame $frame)
{
    echo "received message: {$frame->data}\n";
    $server->push($frame->fd, json_encode(["hello", "world"]));
});

$server->on('close', function($server, $fd)
{
    echo "connection close: {$fd}\n";
});

$server->start();

工作人员接收来自RabbitMQ的消息,然后连接到Swoole,并通过套接字连接(worker.php)广播消息。

代码语言:javascript
复制
$connection = new AMQPStreamConnection('0.0.0.0', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function($msg){
    echo " [x] Received ", $msg->body, "\n";
    sleep(substr_count($msg->body, '.'));
    echo " [x] Done", "\n";
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);


    // Here I'm trying to make connection to Swoole server and sernd data
    $cli = new \swoole_http_client('0.0.0.0', 2345);

    $cli->on('message', function ($_cli, $frame) {
        var_dump($frame);
    });

    $cli->upgrade('/', function($cli)
    {
        $cli->push('This is the message to send to Swoole server');
        $cli->close();
    });
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

将消息发送到RabbitMQ (new_task.php)的新任务:

代码语言:javascript
复制
$connection = new AMQPStreamConnection('0.0.0.0', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data,
    array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);

$channel->basic_publish($msg, '', 'task_queue');

echo " [x] Sent ", $data, "\n";

$channel->close();
$connection->close();

启动swoole服务器和辅助程序之后,我将从命令行触发new_task.php:

代码语言:javascript
复制
php new_task.php

在运行RabbitMQ工作人员(worker.php)的命令行提示符中,我可以看到一条消息被传递给工作人员("x接收到Hello!“)信息正在出现)。

但是,在Swoole服务器运行的命令行提示符中,什么也不会发生。

所以问题是: 1)这种方法的想法正确吗? 2)我做错了什么?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-05-21 19:46:59

在接收到消息时触发的回调(在worker.php中)中,您使用的是仅为异步的swoole_http_client。这似乎会导致代码永远不会完全执行,因为回调函数会在异步代码被触发之前返回。

一个同步的方法来做同样的事情将解决这个问题。下面是一个简单的例子:

代码语言:javascript
复制
$client = new WebSocketClient('0.0.0.0', 2345);
$client->connect();
$client->send('This is the message to send to Swoole server');
$recv = $client->recv();
print_r($recv);
$client->close();

查看WebSocketClient类和github上的示例用法。

您还可以将其包装在协同线中,如下所示:

代码语言:javascript
复制
go(function () {
    $client = new WebSocketClient('0.0.0.0', 2345);
    $client->connect();
    $client->send('This is the message to send to Swoole server');
    $recv = $client->recv();
    print_r($recv);
    $client->close();
});
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/49226659

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档