首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >WSO2 - DAS消费MQTT消息

WSO2 - DAS消费MQTT消息
EN

Stack Overflow用户
提问于 2015-12-02 15:51:31
回答 1查看 504关注 0票数 2

我正在使用Eclipse 1.2.2、WSO2 DAS3.0.0和ActiveMQ 5.12.1在IoT世界中做一些实验。到目前为止,我成功地将DAS设置为M2M中间件服务器,Kura在Raspberry PI2上设置为IoT网关,ActiveMQ设置为MQTT服务器。

我还编写了一个非常基本的MQTT消息生成器,定期向MQTT服务器发送一条非常简单的MQTT消息,以模拟实际设备发送MQTT消息。这个想法是用一个定期发送数据的BlueTooth设备来代替这个应用程序。

当我使用MQTTSpy监视传入的消息时,我注意到MQTT消息是格式化为二进制的。库拉在使用MQTT发送数据时使用Google协议缓冲区时,这一点在文档中得到了明确的说明。由于DAS不支持这种类型的MQTT消息,我假设这会导致服务器不响应任何传入消息。

我使用以下定义配置了DAS流:

代码语言:javascript
复制
{
  "streamId": "mqtt_sample_01:1.0.0",
  "name": "mqtt_sample_01",
  "version": "1.0.0",
  "nickName": "mqtt_sample_01",
  "description": "mqtt_sample_01",
  "metaData": [],
  "correlationData": [],
  "payloadData": [
    {
      "name": "temperature",
      "type": "FLOAT"
    }
  ]
}

我还使用以下代码为传入的MQTT消息创建了一个接收器:

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<eventReceiver name="mqtt_sample_receiver_protobuf" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver">
    <from eventAdapterType="mqtt-protobuf">
        <property name="topic">mqtt-sender-topic/mqtt-client-01/MQTT_APP_V1/mydata</property>
        <property name="clientId">mqtt-client-01</property>
        <property name="url">tcp://192.168.1.42:1883</property>
        <property name="cleanSession">false</property>
    </from>
    <mapping customMapping="disable" type="map"/>
    <to streamName="mqtt_sample_01" version="1.0.0"/>
</eventReceiver>

注意:我也尝试过JSON和XML作为映射类型。

为了在DAS控制台上显示所有内容,我使用以下方法添加了一个发布者:

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<eventPublisher name="mqtt_sample_logger_01" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventpublisher">
    <from streamName="mqtt_sample_01" version="1.0.0"/>
    <mapping customMapping="disable" type="text"/>
    <to eventAdapterType="logger">
        <property name="uniqueId">mqtt_sample_logger_01</property>
    </to>
</eventPublisher>  

Kura使用Google协议缓冲区格式化MQTT消息,WSO2-DAS不理解它。为了解决这个问题,有几种可能:

  1. 可以在库拉修改MQTT消息格式,使其不使用Google协议缓冲区进行编码。我发现了一个关于SO的文章,它或多或少类似于这种方法,这导致了CloudClient类所提供的所有优势的丧失。
  2. 一种可能是编写您自己的DAS接收器,如这篇文章这篇文章中所描述的。
  3. 第三种选择是浏览Kura代码,并创建自己的CloudService/CloudClient实现实现。

在我个人看来,最好的解决方案是选择第二个选项,编写一个自定义事件接收器来理解和解码由Kura生成的Google协议缓冲区格式。其他甚至更好的解决办法也非常受欢迎。

重要通知: 在GUI (mqtt-sender-topic.mqtt-client-01.MQTT_APP_V1.mydata).中,ActiveMQ使用主题名的点符号。但是主题的实名使用/-表示法(mqtt-sender-topic/mqtt-client-01/MQTT_APP_V1/mydata).。

为了构建自定义接收器,我决定从原始的MQTT接收器复制现有代码,并将其修改为处理protobuf格式并将其转换为XML (至少这就是其中的想法)。在为正确设置所有依赖项而苦苦挣扎之后,我成功地构建了一个正常工作的自定义接收器。

不幸的是,我们并没有完全达到我想要的程度。与MQTT代理的连接似乎存在问题。接收方启动,但似乎经常失去连接,在日志中写入以下消息。

代码语言:javascript
复制
DEBUG {test.wso2.mqtt.receiver.internal.util.MQTTProtobufAdapterListener} -  MQTT client subscribed to : mqtt-sender-topic/mqtt-client-01/MQTT_APP_V1/mydata
INFO {test.wso2.mqtt.receiver.internal.util.MQTTProtobufAdapterListener} -  MQTT Connection successful
WARN {test.wso2.mqtt.receiver.internal.util.MQTTProtobufAdapterListener} -  MQTT connection not reachable
Connection lost (32109) - java.io.EOFException
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:138)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
at java.io.DataInputStream.readByte(DataInputStream.java:267)
at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:56)
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:100)
... 1 more

至于它的价值,代理(ActiveMQ)抱怨说:

代码语言:javascript
复制
WARN Stealing link for clientId mqtt-client-01 From Connection Transport Connection to: tcp://192.168.1.42:4594

我的代码一定是做错了什么,导致连接被删除。问题是什么。因此,任何建议、想法、解决方案都是非常欢迎的!

提示: 使用-DosgiConsole选项启动DAS,允许您调查部署的包的状态。成功部署接收器后,命令diag bundle_number应该输出以下内容: osgi> diag 473 reference:file:../dropins/test.wso2.mqtt.receiver.MqttProtobufReceiver->1.0.0.jar 473 没有未解决的限制。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2015-12-09 09:57:39

能够处理由Eclipse (KuraPayload格式)创建的谷歌协议缓冲区格式消息的KuraPayload产品(例如数据分析服务器)的输入接收器的示例可以是在Google Drive下载

发送消息的库样例应用程序也可以是在Google Drive下载

接收方接收二进制格式化的KuraPayload格式并将其转换为XML。检查示例应用程序的XML格式。

请分享您在接收器上所做的改进/修改,以帮助他人。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/34047048

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档