首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Emqx :错过了mqtt协议所推动的大量信息

Emqx :错过了mqtt协议所推动的大量信息
EN

Stack Overflow用户
提问于 2019-09-25 05:52:50
回答 1查看 1.5K关注 0票数 0

我使用mqtt协议来推送一些消息,我的代理是emqx .I,通过nodejs编写这个脚本来推动200,000条关于特定主题的消息:

代码语言:javascript
复制
const mqtt = require('mqtt');

const options = {
    clientId: "tazikpush",
    clean: true
}
const client = mqtt.connect("mqtt://xxxxxxxxxx", options);
var topic = "/ApplyData/";
var pushOptions = {
    retain: false,
    qos: 2
}; 
const snooze = ms => new Promise(resolve => setTimeout(resolve, ms));

const example = async () => {
    console.log('Waiting 5 sec and then start');
    await snooze(5000);
    for (var i = 0; i < 200000; i++) {
        // await snooze(250);
        client.publish(topic, message, pushOptions);
        console.log(`done! ${i}`);
    }
};
example();

通过nodejs,我编写了一个订阅者来收听这个主题,然后将数据存储到redis数据库中。但我有个问题:

为什么侦听器应该停止直到推送客户端推送所有200000条消息?

为什么订户只收到100条消息?还有其他信息被丢弃了。

在我的订阅服务器上,我创建了一个js file.In文件,我创建了一个客户端并订阅了qos 2 mqttClient.js的主题。

代码语言:javascript
复制
const mqtt = require('mqtt');
const log4js = require('log4js');
const config = require('config');

const topic_sub = "/ApplyData/";
log4js.configure(JSON.parse(JSON.stringify(config.get('Logger'))));
var logger = log4js.getLogger('app');
logger.level = 'debug';


const options = {
    clientId: "mqttjs01",
    clean: true
}

const client = mqtt.connect("mqtt://xxxxxxx", options);

client.on("connect", () => {
    console.log("connected  " + client.connected);
    client.subscribe(topic_sub, { qos: 2 });
});

client.on("error", (error) => {
    console.log("Can't connect" + error);
    logger.debug(`Client Error : `, error);
});


module.exports = client;

我在控制器上使用客户端事件。我的订阅者实际上是一个工作人员,意味着它的任务就是订阅消息并将这些消息存储到数据库中。

在App.js中,我要求:

代码语言:javascript
复制
const client = require('./mqttClient');
const controller = require('./controller/mainController');

在主控制器中,我使用调用client.on订阅消息

代码语言:javascript
复制
client.on('message', async (topic, message, packet) => {
        console.log(topic);
        if (topic === '/ApplyData/') {
            var jobject = JSON.parse(message);
            jobject.nid = uuid()
            try {
                let res = await redis_cache.cache(jobject);
            } catch (err) {
                console.log(err);
            }

我通过从控制台运行调试代理,在我收到200,000条消息之后

代码语言:javascript
复制
2019-09-25 11:41:50.885 [warning] tazikpush [Session] Dropped qos2 packet 36998 for too many awaiting_rel
2019-09-25 11:41:50.900 [warning] tazikpush [Session] Dropped qos2 packet 36999 for too many awaiting_rel
2019-09-25 11:41:50.900 [warning] tazikpush [Session] Dropped qos2 packet 37000 for too many awaiting_rel
2019-09-25 11:41:50.900 [warning] tazikpush [Session] Dropped qos2 packet 37001 for too many awaiting_rel
2019-09-25 11:41:50.900 [warning] tazikpush [Session] Dropped qos2 packet 37002 for too many awaiting_rel.... 
2019-09-25 11:49:57.544 [warning] tazikpush [Session] Dropped qos2 packet 40292 for too many awaiting_rel
2019-09-25 11:49:57.544 [warning] tazikpush [Session] The PUBREL PacketId 36898 is not found
2019-09-25 11:49:57.544 [warning] tazikpush [Session] The PUBREL PacketId 36899 is not found
2019-09-25 11:49:57.544 [warning] tazikpush [Session] The PUBREL PacketId 36900 is not found
2019-09-25 11:49:57.544 [warning] tazikpush [Session] The PUBREL PacketId 36901 is not found ...

日志

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-09-27 02:32:54

发布太快,使用单个客户端处理消费太慢。

这可能与您的配置有关:

etc/emqx.conf

代码语言:javascript
复制
## Maximum QoS2 packets (Client -> Broker) awaiting PUBREL, 0 means no limit.
##
## Value: Number
zone.external.max_awaiting_rel = 100

## The QoS2 messages (Client -> Broker) will be dropped if awaiting PUBREL timeout.
##
## Value: Duration
zone.external.await_rel_timeout = 300s

最好的方法是使用Enterprise或共享订阅来添加更多客户端:

https://docs.emqx.io/tutorial/v3/en/advanced/share_subscribe.html

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/58091904

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档