首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >当我试图使用zato传出的AMQP获取消息时,rabbitMQ返回队列为空。

当我试图使用zato传出的AMQP获取消息时,rabbitMQ返回队列为空。
EN

Stack Overflow用户
提问于 2016-05-19 09:15:28
回答 1查看 2.9K关注 0票数 0

我有一个由ESB ( zato )调用的服务,该服务的作用是通过AMQP传出在rabbitMQ中发布消息,但是当我查阅rabbitMQ并获取消息时,答案是empty.this is服务在zato中。

代码语言:javascript
复制
from zato.server.service import Service

class HelloService(Service):
    def handle(self):

        # Request parameters
        msg = 'Hello AMQP broker!'
        out_name = 'My CRM connection'
        exchange = 'My exchange'
        routing_key = ''
        properties = {'app_id': 'ESB'}
        headers = {'X-Foo': 'bar'}

        # Send a message to the broker
        self.outgoing.amqp.send(msg, out_name, exchange, routing_key,
            properties, headers)
EN

回答 1

Stack Overflow用户

发布于 2016-06-02 20:07:36

从zato服务的兔子队列中使用的完整示例如下:

兔体内

  1. 创建一个交换
  2. 创建队列
  3. 将队列绑定到交换
  4. 在zato中创建连接定义
  5. 在zato中创建一个传出的AMQP连接定义
  6. 编写zato服务以发布或使用

可以通过多种方式完成前三个步骤,下面是一个简单的python脚本(只需安装kombu,然后单击):

代码语言:javascript
复制
import click
import os
import sys
import settings
from kombu import Connection, Exchange, Queue


BROKER_URL = 'amqp://{user}:{password}@{server}:{port}/{vhost}'.format(user=settings.RABBIT_USER,
                                                                       password=settings.RABBIT_PASS,
                                                                       server=settings.RABBIT_SERVER,
                                                                       port=settings.RABBIT_PORT,
                                                                       vhost=settings.RABBIT_VHOST)


@click.command()
@click.option('--remove/--no-remove', default=False, help='Remove current Queues/Exchanges.')
@click.option('--create/--no-create', default=False, help='Create needed Queues/Exchanges')
def job(remove, create):
    exchanges = {'dead_letter': Exchange(name=settings.DEAD_LETTER_EXCHANGE,
                                         type=settings.DEAD_LETTER_EXCHANGE_TYPE,
                                         durable=settings.DEAD_LETTER_EXCHANGE_DURABLE),
                 'results': Exchange(name=settings.RESULTS_EXCHANGE_NAME,
                                     type=settings.RESULTS_EXCHANGE_TYPE,
                                     durable=settings.RESULTS_EXCHANGE_DURABLE)}

    queues = {'dead_letter': Queue(name=settings.DEAD_LETTER_QUEUE,
                                   exchange=exchanges['dead_letter'],
                                   routing_key=settings.DEAD_LETTER_ROUTING,
                                   durable=settings.DEAD_LETTER_EXCHANGE_DURABLE),
              'results': Queue(name=settings.RESULTS_QUEUE_NAME,
                               exchange=exchanges['results'],
                               routing_key=settings.RESULTS_QUEUE_ROUTING,
                               durable=settings.RESULTS_EXCHANGE_DURABLE),
              'task': Queue(name=settings.TASK_QUEUE_NAME,
                            exchange=exchanges['results'],
                            routing_key=settings.TASK_ROUTING_KEY,
                            queue_arguments={
                                "x-message-ttl": settings.TASK_QUEUE_TTL,
                                "x-dead-letter-exchange": settings.DEAD_LETTER_EXCHANGE,
                                "x-dead-letter-routing-key": settings.DEAD_LETTER_ROUTING})}

    print 'using broker: {}'.format(BROKER_URL)

    with Connection(BROKER_URL) as conn:
        channel = conn.channel()
        if remove:
            # remove exchanges
            for (key, exchange) in exchanges.items():
                print 'removing exchange: {}'.format(exchange.name)
                bound_exchange = exchange(channel)
                bound_exchange.delete()

            # remove queues
            for (key, queue) in queues.items():
                print 'removing queue {} '.format(queues[key].name)
                bound_queue = queues[key](channel)
                bound_queue.delete()

        if create:
            # create exchanges
            for (key, exchange) in exchanges.items():
                print 'creating exchange: {}'.format(exchange.name)
                bound_exchange = exchange(channel)
                bound_exchange.declare()

            # add queues
            for (key, queue) in queues.items():
                # if key in exchanges:
                print 'binding queue {} to exchange {} with routing key {}'.format(queue.name,
                                                                                   queue.exchange.name,
                                                                                   queue.routing_key)
                bound_queue = queue(channel)
                bound_queue.declare()


if __name__ == '__main__':
    job()

以及设置文件:

代码语言:javascript
复制
# rabbit stuff
RABBIT_SERVER = 'localhost'
RABBIT_USER = 'guest'
RABBIT_PASS = 'guest'
RABBIT_PORT = 5672
RABBIT_VHOST = '/'

# default task queue
TASK_EXCHANGE_NAME = 'test.service.request'
TASK_EXCHANGE_TYPE = 'direct'
TASK_EXCHANGE_DURABLE = True
TASK_QUEUE_NAME = 'test.service.request'
TASK_ROUTING_KEY = 'request'
TASK_QUEUE_TTL = 604800000

# dead letter settings
DEAD_LETTER_EXCHANGE = 'test.service.deadletter'
DEAD_LETTER_EXCHANGE_TYPE = 'direct'
DEAD_LETTER_EXCHANGE_DURABLE = True
DEAD_LETTER_QUEUE = 'test.service.deadletter'
DEAD_LETTER_ROUTING = 'deadletter'

# results settings
RESULTS_EXCHANGE_NAME = 'test.service.results'
RESULTS_EXCHANGE_TYPE = 'direct'
RESULTS_EXCHANGE_DURABLE = True
RESULTS_QUEUE_NAME = 'test.service.results'
RESULTS_QUEUE_ROUTING = 'results'

现在,让我们使用python 2.7在一个新的虚拟服务器上创建运行上述脚本的队列:

代码语言:javascript
复制
$ virtualenv rabbit_test
New python executable in /home/ivan/rabbit_test/bin/python
Installing setuptools, pip, wheel...done.

$ source /home/ivan/rabbit_test/bin/activate

$ pip install kombu
Collecting kombu
...
$ pip install click
Collecting click
...

复制上面的脚本

代码语言:javascript
复制
$ mkdir ~/rabbit_test/app
$ vi ~/rabbit_test/app/create_queues.py
$ vi ~/rabbit_test/app/settings.py

运行create_queues.py。

代码语言:javascript
复制
$ cd ~/rabbit_test/app
$ python create_queues.py --create
using broker: amqp://guest:guest@localhost:5672//
creating exchange: test.service.results
creating exchange: test.service.deadletter
binding queue test.service.request to exchange test.service.results with routing key request
binding queue test.service.results to exchange test.service.results with routing key results
binding queue test.service.deadletter to exchange test.service.deadletter with routing key deadletter

您可以使用cli工具或管理插件验证交换和队列在兔子上。

代码语言:javascript
复制
$ rabbitmqadmin list exchanges
+-------------------------+---------+
|          name           |  type   |
+-------------------------+---------+
| test.service.deadletter | direct  |
| test.service.results    | direct  |
+-------------------------+---------+

$ rabbitmqadmin list queues
+-------------------------+----------+
|          name           | messages |
+-------------------------+----------+
| test.service.deadletter | 0        |
| test.service.request    | 0        |
| test.service.results    | 0        |
+-------------------------+----------+

$ rabbitmqadmin list bindings
+-------------------------+-------------------------+-------------------------+
|         source          |       destination       |       routing_key       |
+-------------------------+-------------------------+-------------------------+
|                         | test.service.deadletter | test.service.deadletter |
|                         | test.service.request    | test.service.request    |
|                         | test.service.results    | test.service.results    |
| test.service.deadletter | test.service.deadletter | deadletter              |
| test.service.results    | test.service.request    | request                 |
| test.service.results    | test.service.results    | results                 |
+-------------------------+-------------------------+-------------------------+

现在可以使用公共api或webadmin来完成zato部分(步骤4、5和6),我将向您展示如何使用公共api来完成,但是通过UI更容易完成,因为这只完成了很少的几次。

创建AMQP连接定义 文档

代码语言:javascript
复制
$ curl -X POST -H "Authorization: Basic cHViYXBpOjEyMw==" -d '{
    "cluster_id": 1,
    "name": "SO_Test",
    "host": "127.0.0.1",
    "port": "5672",
    "vhost": "/",
    "username": "guest",
    "frame_max": 131072,
    "heartbeat": 10
}' "http://localhost:11223/zato/json/zato.definition.amqp.create"

{
  "zato_env": {
    "details": "",
    "result": "ZATO_OK",
    "cid": "K04DWBPMYF8A7768C7N482E75YM3"
  },
  "zato_definition_amqp_create_response": {
    "id": 2,
    "name": "SO_Test"
  }
}

为AMQP连接 文档设置密码

代码语言:javascript
复制
$ curl -X POST -H "Authorization: Basic cHViYXBpOjEyMw=="  -d '{
    "id": 2,
    "password1": "guest",
    "password2": "guest"
}' "http://localhost:11223/zato/json/zato.definition.amqp.change-password"

{
  "zato_env": {
    "details": "",
    "result": "ZATO_OK",
    "cid": "K07K9YY21XZAX4QKWJB3ZFXN2ZFT"
  }
}

创建一个向外的 文档连接定义

代码语言:javascript
复制
curl -X POST -H "Authorization: Basic cHViYXBpOjEyMw==" -d '{
    "cluster_id": 1,
    "name": "SO Test",
    "is_active": true,
    "def_id": 2,
    "delivery_mode": 1,
    "priority": 6,
    "content_type": "application/json",
    "content_encoding": "utf-8",
    "expiration": 30000
}' "http://localhost:11223/zato/json/zato.outgoing.amqp.create"

{
  "zato_outgoing_amqp_create_response": {
    "id": 1,
    "name": "SO Test"
  },
  "zato_env": {
    "details": "",
    "result": "ZATO_OK",
    "cid": "K05F2CR954BFNBP14KGTM26V47PC"
  }
}

最终将发送消息的服务

代码语言:javascript
复制
from zato.server.service import Service

class HelloService(Service):
    def handle(self):
        # Request parameters 
        msg = 'Hello AMQP broker!'
        out_name = 'SO Test'
        exchange = 'test.service.results'
        routing_key = 'request'
        properties = {'app_id': 'ESB', 'user_id': 'guest'}
        headers = {'X-Foo': 'bar'}

        # Send a message to the broker
        info = self.outgoing.amqp.send(msg, out_name, exchange, routing_key,
            properties, headers)
        self.logger.info(info)

如果要使用属性user_id,则必须与连接user_id匹配,否则请求将失败。

另外,请注意,这里我创建了一个死信交换,如果消息仍在test.service.request队列中,30秒后消息将在这里发送

最后一步是测试

为了验证消息是否已传递到我们的队列,我们可以创建一个http/soap通道或直接调用服务,我正在使用公共api执行后者。

代码语言:javascript
复制
curl -X POST -H "Authorization: Basic cHViYXBpOjEyMw==" -d '{
   "name": "test.hello-service",
   "data_format": "json"
}' "http://localhost:11223/zato/json/zato.service.invoke"

{
  "zato_env": {
    "details": "",
    "result": "ZATO_OK",
    "cid": "K050J64QQ8FXASXHKVCAQNC4JC4N"
  },
  "zato_service_invoke_response": {
    "response": ""
  }
}

在此之后,我们检查队列中刚刚发送的消息:

代码语言:javascript
复制
$ rabbitmqadmin get queue=test.service.request requeue=true
+-------------+----------------------+---------------+--------------------+---------------+------------------+-------------+
| routing_key |       exchange       | message_count |      payload       | payload_bytes | payload_encoding | redelivered |
+-------------+----------------------+---------------+--------------------+---------------+------------------+-------------+
| request     | test.service.results | 0             | Hello AMQP broker! | 18            | string           | False       |
+-------------+----------------------+---------------+--------------------+---------------+------------------+-------------+

记住检查兔子和zato服务器日志,以防你还有任何问题。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/37318844

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档