我正在尝试创建一个原型设备,能够接收来自hono的命令并回复它。
我已经安装了hono 1.10.0并运行了以下python代码
import threading
import time
from proton import Message
from proton.reactor import Container
from amqp import AmqpSender, AmqpReceiver
from hono import tenantId, deviceId, devicePassword, device_uri, biz_app_uri
correlation_id = 'myCorrelationId'
command_reply_to = f'command_response/{tenantId}/{correlation_id}'
print("Business application subscribing for the command reply--------------------------------------------")
cr_container = Container(AmqpReceiver(biz_app_uri, command_reply_to, "consumer@HONO", "verysecret"))
cr_thread = threading.Thread(target=lambda: cr_container.run(), daemon=True)
cr_thread.start()
# Give it some time to link
time.sleep(5)
print("Device subscribing for commands-------------------------------------------------------------------")
c_container = Container(AmqpReceiver(device_uri, f'command', f'{deviceId}@{tenantId}', devicePassword))
c_thread = threading.Thread(target=lambda: c_container.run(), daemon=True)
c_thread.start()
# Give it some time to link
time.sleep(2)
print("Business application sending a command------------------------------------------------------------")
msg = Message(
address=f'command/{tenantId}/{deviceId}',
reply_to=command_reply_to,
correlation_id=correlation_id,
content_type="text/plain",
subject="call",
body="Hello Bob!"
)
#as in example https://stackoverflow.com/questions/64698271/difficulty-in-sending-amqp-1-0-message
Container(AmqpSender(biz_app_uri, [msg], "consumer@HONO", "verysecret", address=f'command/{tenantId}')).run()
time.sleep(2)
print("Device sending a command response-----------------------------------------------------------------")
resp = Message(
address=command_reply_to,
correlation_id=correlation_id,
content_type="text/plain",
properties={
'status': 200,
'device_id': deviceId,
'tenant_id': tenantId
},
subject="call",
body="Hello Alice!"
)
Container(AmqpSender(device_uri, [resp], f'{deviceId}@{tenantId}', devicePassword)).run()
time.sleep(2)
print("Device stops listeing for commands----------------------------------------------------------------")
c_container.stop()
c_thread.join(timeout=5)
print("Business application stops listening for command responsets---------------------------------------")
cr_container.stop()
cr_thread.join(timeout=5)
print("everything stopped")根据我对https://www.eclipse.org/hono/docs/api/command-and-control/和https://www.eclipse.org/hono/docs/user-guide/amqp-adapter/#sending-a-response-to-a-command的理解,我是在Difficulty in Sending AMQP 1.0 Message的帮助下完成这个实现的。
就目前而言,我看起来并没有那么错误,因为设备接收到了命令,而且发送消息也没有显示任何错误。但是,在接收端,什么都不会到达。为了清楚起见,AmqpReceiver实现适用于我监听遥测数据的场景。因此,如果假设实现是相同的(除了不同的地址),那么这不应该是问题所在。
我非常相信我在消息中的地址/回复_to做错了什么,但我不能确认,因为hono pods中的日志没有告诉我任何事情:(
br Armin
=update==
我当前运行的代码如下
from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import AtLeastOnce
class Amqp(MessagingHandler):
def __init__(self, server, address, user, password, options=None):
super(Amqp, self).__init__()
self.server = server
self.address = address
self.user = user
self.password = password
self.options = options
self.connection = None
def create_connection(self, event):
self.connection = event.container.connect(
self.server,
sasl_enabled=True,
allowed_mechs="PLAIN",
allow_insecure_mechs=True,
user=self.user,
password=self.password
)
print("Connection established")
def on_connection_error(self, event):
print("Connection Error")
def on_link_error(self, event):
print("Link Error")
def on_transport_error(self, event):
print("Transport Error")
def on_link_opened(self, event):
if event.link.is_sender:
print("Opened sender link")
if event.link.is_receiver:
print("Opened receiver link for source address '{0}'".format(event.receiver.source.address))
class AmqpReceiver(Amqp):
def __init__(self, server, address, user, password, options=None):
super(AmqpReceiver, self).__init__(server, address, user, password, options)
self.server = server
self.user = user
self.password = password
def on_start(self, event):
self.create_connection(event)
event.container.create_receiver(context=self.connection, source=self.address, options=self.options)
print("Receiver created")
def on_message(self, event):
print(f'Receiver [{self.address}] got message:')
print(f' {event.message.reply_to}')
print(f' {event.message.correlation_id}')
print(f' {event.message.properties}')
print(f' {event.message.subject}')
print(f' {event.message.body}')
#just for test purposes - the device sends imediatelly the reply if a reply_to is given
if event.message.reply_to is not None:
reply_to = event.message.reply_to.split('/')
tenant_id = reply_to[1]
device_id = reply_to[2]
resp = Message(
address=event.message.reply_to,
correlation_id=event.message.correlation_id,
content_type="text/plain",
properties={
'status': 200,
'tenant_id': tenant_id,
'device_id': device_id
},
body=f'Reply on {event.message.body}'
)
sender = event.container.create_sender(self.connection, None, options=AtLeastOnce())
sender.send(resp)
sender.close()
print("Reply send")
class AmqpSender(Amqp):
def __init__(self, server, messages, user, password, address=None, options=None):
super(AmqpSender, self).__init__(server, address, user, password, options)
self.messages = messages
def on_start(self, event):
self.create_connection(event)
event.container.create_sender(context=self.connection, target=self.address)
print("Sender created")
def on_sendable(self, event):
print("In Msg send")
for msg in self.messages:
event.sender.send(msg)
event.sender.close()
event.connection.close()
print("Sender & connection closed")在测试脚本中,我按如下方式使用它
from __future__ import print_function, unicode_literals
import threading
import time
from proton import Message
from proton.reactor import Container
from amqp import AmqpSender, AmqpReceiver
biz_app_uri = f'amqp://localhost:15672'
device_uri = f'amqp://localhost:5672'
tenantId = 'ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8'
deviceId = 'b932fb15-fdbd-4c12-9ed7-40aaa8763412'
biz_app_user = 'consumer@HONO'
biz_app_pw = 'verysecret'
device_user = f'{deviceId}@{tenantId}'
device_pw = 'my-secret-password'
correlation_id = 'myCorrelationId'
command_reply_to = f'command_response/{tenantId}/{correlation_id}'
print("Business application subscribing for command replies-------------------------------------------")
cr_container = Container(AmqpReceiver(biz_app_uri, command_reply_to, biz_app_user, biz_app_pw))
cr_thread = threading.Thread(target=lambda: cr_container.run(), daemon=True)
cr_thread.start()
time.sleep(2)
print("Device subscribing for commands-------------------------------------------------------------------")
c_container = Container(AmqpReceiver(device_uri, f'command', device_user, device_pw))
c_thread = threading.Thread(target=lambda: c_container.run(), daemon=True)
c_thread.start()
time.sleep(2)
print("Business application sending a command------------------------------------------------------------")
msg = Message(
address=f'command/{tenantId}/{deviceId}',
reply_to=command_reply_to,
correlation_id=correlation_id,
content_type="text/plain",
subject="call",
body="Hello Bob!"
)
#as in example https://stackoverflow.com/questions/64698271/difficulty-in-sending-amqp-1-0-message
Container(AmqpSender(biz_app_uri, [msg], biz_app_user, biz_app_pw, address=f'command/{tenantId}')).run()
time.sleep(10)
print("Device stops listeing for commands----------------------------------------------------------------")
c_container.stop()
c_thread.join(timeout=5)
#print("Business application stops listening ---------------------------------------")
#cr_container.stop()
#cr_thread.join(timeout=5)
#print("everything stopped")如果我运行代码示例,我会得到以下日志(请参见下面),当命令回复接收器保持打开时,代码会被卡住。
登录hono调度路由器:
2021-11-14 19:08:29.420176 +0000 SERVER (info) enabling remote authentication service hono-1635540280-service-auth:5671
2021-11-14 19:08:29.429734 +0000 SERVER (info) [C115] Accepted connection to 0.0.0.0:5672 from 10.42.0.70:36742
2021-11-14 19:08:29.447479 +0000 AUTHSERVICE (info) authenticated as consumer@HONO
2021-11-14 19:08:29.448213 +0000 ROUTER (info) [C115] Connection Opened: dir=in host=10.42.0.70:36742 vhost= encrypted=no auth=PLAIN user=consumer@HONO container_id=a782f51c-9679-41fb-a682-8ea603ccf1ac props=
2021-11-14 19:08:29.448316 +0000 ROUTER_CORE (info) [C115][L123] Link attached: dir=out source={command_response/ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8/myCorrelationId expire:sess} target={<none> expire:sess}
2021-11-14 19:08:33.423325 +0000 SERVER (info) enabling remote authentication service hono-1635540280-service-auth:5671
2021-11-14 19:08:33.430810 +0000 SERVER (info) [C116] Accepted connection to 0.0.0.0:5672 from 10.42.0.70:36868
2021-11-14 19:08:33.445574 +0000 AUTHSERVICE (info) authenticated as consumer@HONO
2021-11-14 19:08:33.446328 +0000 ROUTER (info) [C116] Connection Opened: dir=in host=10.42.0.70:36868 vhost= encrypted=no auth=PLAIN user=consumer@HONO container_id=92cb7173-2940-4330-a995-f26eccef0905 props=
2021-11-14 19:08:33.446388 +0000 ROUTER_CORE (info) [C116][L124] Link attached: dir=in source={<none> expire:sess} target={command/ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8 expire:sess}
2021-11-14 19:08:33.447762 +0000 ROUTER_CORE (info) [C116][L124] Link detached: del=1 presett=0 psdrop=0 acc=0 rej=0 rel=0 mod=0 delay1=0 delay10=0 blocked=no登录amqp适配器
2021-11-14 19:08:31,511 INFO [org.ecl.hon.ada.mon.LoggingConnectionEventProducer] (vert.x-eventloop-thread-1) Connected - ID: 100b1859-e8a0-4bff-ad91-a48dce4babb5, Protocol Adapter: hono-amqp, Device: device [device-id: b932fb15-fdbd-4c12-9ed7-40aaa8763412, tenant-id: ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8], Data: null
2021-11-14 19:19:29,875 INFO [org.ecl.hon.ada.mon.LoggingConnectionEventProducer] (vert.x-eventloop-thread-1) Disconnected - ID: 100b1859-e8a0-4bff-ad91-a48dce4babb5, Protocol Adapter: hono-amqp, Device: device [device-id: b932fb15-fdbd-4c12-9ed7-40aaa8763412, tenant-id: ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8], Data: null发布于 2021-11-03 07:50:18
您的设备发送的命令响应似乎包含错误的地址。正如AMQP Adapter User Guide中指出的,响应的address属性必须设置为命令的reply-to属性的值。该值通常与应用程序在命令消息中设置的reply-to值不同,因为协议适配器需要将一些附加信息编码到reply to address中,以便在向下游转发命令响应时能够确定正确的设备ID。
因此,在您的代码中,您需要检查设备端的命令消息,并使用其reply-to值作为命令响应的address值。
除此之外,AMQP适配器期望命令响应中的status属性是AMQP1.0类型的int ( 32位有符号整数)。但是,对于您的代码,属性值默认情况下编码为AMQP1.0 long (64位有符号整数)。为了正确编码,您需要从proton._data导入int32类,然后将属性值设置为int32(200)。然后适配器接受命令响应并将其转发到下游。
https://stackoverflow.com/questions/69788945
复制相似问题