因此,我试图创建一个简单的应用程序,可以做4件事。
1)获得一份消费者名单(如果我能得到那些注册..。也许我可以在他们加入的时候给他们起个名字,所以这是动态的)。
2)向一个随机用户发送“消息”并显示结果。
3)向一个特定的消费者发送一条“消息”(从上面的列表或可能是预定义的列表中获得)并显示结果。
4)向所有消费者发送“信息”,并显示来自每个消费者的结果。
该应用程序是在php中使用php-amqplib (https://github.com/videlalvaro/php-amqplib)完成的。rabbitmq已经启动并运行,并且似乎有效(尝试了教程)。
对于我来说,amqp的文档有点奇怪,所以我非常希望看到一些示例代码和对使用的params的描述。
发布于 2016-01-23 22:20:36
1)可以通过使用rabbitmq的管理/api/队列来解决问题,方法是为每个使用者命名一个持久的队列名。
2,3,4,被这样解决了:
<?php
//Producer Config
$host = "remote_host";
$port = 5672;
$user = "user";
$pass = "pass";
$array = array("consumer1", "consumer2");..。
<?php
//Producer
if(!isset($argv[1]) || !isset($argv[2])){
die("Specify a target and a message\n");
}
require_once __DIR__.'/config.php';
require_once __DIR__.'/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPSSLConnection;
use PhpAmqpLib\Message\AMQPMessage;
$corr_id = uniqid();
$connection = new AMQPSSLConnection($host, $port, $user, $pass, "/", array("verify_peer" => false));
$channel = $connection->channel();
$response = null;
$onResponse = function ($rep) {
global $response;
$response = $rep->body;
echo " [>] Received: '".$response."'\n";
};
list($callback_queue, ,) = $channel->queue_declare("", false, false, true, false);
$channel->basic_consume($callback_queue, '', false, false, false, false, $onResponse);
$msg = new AMQPMessage($argv[2], array('correlation_id' => uniqid(), 'reply_to' => $callback_queue));
switch($argv[1]){
case "random":
$dest = $array[array_rand($array)];
$type = "direct";
break;
case "all":
$dest = "to_all";
$type = "fanout";
break;
case $argv[1]:
$dest = $argv[1];
$type = "direct";
break;
}
$channel->exchange_declare($dest, $type, false, false, false);
$channel->basic_publish($msg, $dest);
echo " [<] Sent '".$argv[2]."' to '".$dest."'\n";
try {
if($dest == "to_all"){
$replies = 0;
while(!$response || $replies < count($consumers_array)){
$channel->wait(null, false, $timeout);
$replies++;
}
}else{
while(!$response){
$channel->wait(null, false, $timeout);
}
}
}catch(PhpAmqpLib\Exception\AMQPTimeoutException $e){
echo " [x] AMQPTimeoutException thrown\n";
}
$channel->close();
$connection->close();..。
<?php
//Consumer config
$host = "remote_host";
$port = 5672;
$user = "user";
$pass = "pass";
$consumer_name = "consumerX";..。
<?php
//Consumer
require_once __DIR__ . '/config.php';
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPSSLConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPSSLConnection($host, $port, $user, $pass, "/", array('verify_peer' => false));
$channel = $connection->channel();
$channel->exchange_declare('to_all', 'fanout', false, false, false);
$channel->exchange_declare($consumer_name, 'direct', false, false, false);
$channel->queue_declare($consumer_name, false, false, true, false);
$channel->queue_bind($consumer_name, 'to_all');
$channel->queue_bind($consumer_name, $consumer_name);
echo '[*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function($msg) {
echo " [>] Received: '".$msg->body."'\n";
$rand = rand(1,3);
sleep($rand);
$reply = uniqid()." - slept ".$rand;
echo " [<] Replied: '".$reply."'\n";
$raspuns = new AMQPMessage($reply);
$msg->delivery_info['channel']->basic_publish($raspuns,'',$msg->get('reply_to'));
};
$channel->basic_consume($consumer_name, '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();不确定这是最好的方式,但它让我的“你好世界”。
https://stackoverflow.com/questions/34964784
复制相似问题