TL;博士
当同时订阅多个客户端的同一主题树时,并非所有客户端都会收到预期的保留消息!
详细/用例
在一个实际的项目中,几个应用程序订阅同一个MQTT主题(使用通配符)(几乎是同时启动的,因为它们是并行启动的)。该主题包含大约500个保留的消息(每个在自己的子主题级别上),所有应用程序都将收到这些消息(它们订阅QoS 1)。
除了“配置”消息之外,数据主题还使用相同的MQTT连接订阅。不需要持久化状态(这里需要持久化状态)。因此,应用程序实例与cleanSession=true连接。
据我理解,如果应用程序实例与固定的clientId连接,就足够了,因为cleanSession=true应该避免任何状态处理。但是,要确保每个连接都不被认为是一个unique MQTT clientId状态,就必须确保状态是不存在的。
observerd
不幸的是,并不是所有的应用程序实例都得到保留的消息。有些人根本没有收到来自主题的消息--不管订阅持续多长时间。我最初认为maxInflight (客户端)或max_queued_messages (服务器端)配置可能是原因,但在将两者都增加到50万之后,我想这不是故障背后的原因。
复制为测试
因此,我使用repro创建了这 github项目。repro MqttSubscriptionTest中有一个单元测试类,它的测试方法是multiThreadSubscriptionTest。在执行此测试时,一些(1000)保留的消息将首先在@BeforeClass方法中发布。之后,将实例化并执行实现IMqttMessageListener和Runnable接口的一个Runnable类的10个实例。每个MqttSubscriber实例将在自己的线程中使用自己的MqttClient实例执行,并将订阅保留消息的主题树。这将按以下方式记录到控制台:
----------- perform subscriptions
Subscriber-3 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-0 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-2 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-4 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-5 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-6 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-1 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-7 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-8 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'
Subscriber-9 subscribing topic 'mqtt/client/showcase/mutliThreadSubscription/#'测试将等待一段时间,然后验证订阅。预计每个订户将收到1000条保留的消息:
----------- validate subscriptions
Subscriber-4: receivedMessages=1000; duration=372ms; succeeded=true
Subscriber-0: receivedMessages=1000; duration=265ms; succeeded=true
Subscriber-5: receivedMessages=1000; duration=475ms; succeeded=true
Subscriber-7: receivedMessages=0; duration=0ms; succeeded=false
Subscriber-6: receivedMessages=1000; duration=473ms; succeeded=true
Subscriber-8: receivedMessages=0; duration=0ms; succeeded=false
Subscriber-9: receivedMessages=1000; duration=346ms; succeeded=true
Subscriber-3: receivedMessages=1000; duration=243ms; succeeded=true
Subscriber-1: receivedMessages=1000; duration=470ms; succeeded=true
Subscriber-2: receivedMessages=1000; duration=357ms; succeeded=true大多数用户在很短的时间内(大约几百毫秒)收到预期的1000条消息。但有些(这里的订户-7/8)没有收到一条消息(持续时间为0,因为它们从未完成)。如果给订阅者更多的时间来接收消息,情况就不会更好。他们就是抓不到他们。
我不知道为什么会这样。MQTT代理或客户端没有显示错误消息。如果您能提供任何帮助,这对我来说将是非常有用的,因为我依赖于保留消息的可靠传递。
Repro on GitHub:FrVaBe/mqtt/mqtt-客户-展示/
cleanSession=true是因为我不想拥有任何状态(连接用于订阅多个主题,并且不需要传递丢失的消息)发布于 2018-02-20 14:33:54
HiveMQ的人很好地看了一下这个问题。他们怀疑泛美卫生组织客户端中的原因以及订阅中使用IMqttMessageListener的原因。有一个被描述的问题,#432的假定种族状况。
经验教训:最好使用MqttCallback而不是IMqttMessageListener
发布于 2018-02-13 22:11:59
您可能不会在连接期间使用cleanSession(true),请参阅此处的说明:http://www.steves-internet-guide.com/mqtt-retained-messages-example/
我用蚊子进行测试,默认情况下它的内部消息队列只有100条。
HiveMQ有最大排队的消息.
Emq有类似的http://emqtt.io/docs/v2/config.html#mqtt-message-queue。
我修正了回购中的示例代码。为我工作。
编辑:刚刚用Emq (docker run --rm -ti --name emq -p 18083:18083 -p 1883:1883 quodt/emq-docker:latest)测试了它,它运行得很好。
基本上,它是cleanSession:肯定是假的。除此之外,测试代码中的等待状态是不好的。我的机器太短了。使用闩锁或其他真正的同步机制。
https://stackoverflow.com/questions/48769220
复制相似问题