我使用MQTT协议在两台计算机之间发送消息。我已经从这段代码中得到了模式。出版商:
import paho.mqtt.client as mqtt
from random import randrange, uniform
import time
mqttBroker ="mqtt.eclipse.org"
client = mqtt.Client("Temperature_Inside")
client.connect(mqttBroker)
while True:
randNumber = randrange(10)
client.publish("TEMPERATURE", randNumber)
print("Just published " + str(randNumber) + " to topic TEMPERATURE")
time.sleep(1)订阅者:
import paho.mqtt.client as mqtt
import time
def on_message(client, userdata, message):
print("received message: " ,str(message.payload.decode("utf-8")))
mqttBroker ="mqtt.eclipse.org"
client = mqtt.Client("Smartphone")
client.connect(mqttBroker)
client.loop_start()
client.subscribe("TEMPERATURE")
client.on_message=on_message
time.sleep(1)
client.loop_stop()我希望在收到消息时向发布者发送反馈。有没有办法获得消息反馈?
发布于 2020-11-22 15:52:40
在MQTT协议中没有端到端的传送通知。这是非常有意为之的。
MQTT是一个发布/订阅系统,旨在将信息的生产者和消费者完全分离。当生产者发布消息时,可能有0到无限数量的订阅者订阅主题。也可能有离线订阅者,他们将在重新连接时传递消息(可能是消息发布后的任何时间)
MQTT提供的是QOS级别,但重要的是要记住,这些级别只适用于交付过程的一个阶段。例如,在QOS 2发布的消息确保它将到达代理,但不保证任何订阅者,因为他们的订阅可能是QOS 0。
如果您的系统需要端到端传递通知,那么您将需要自己实现响应消息,这通常是通过在初始消息中包含唯一ID并在也包含该ID的不同主题中发送单独的响应消息来实现的
发布于 2020-11-22 19:12:30
为了确保您的消息能够被传递,您可以使用QoS。这可以在发布或订阅时设置。因此,对于您的情况,您需要QoS 1或2。QoS 2确保它在发布时只到达代理一次,如果在QoS 2订阅,它将确保订阅者只获得一次消息。请注意,尽管QoS 2是最慢的发布和订阅形式。我发现处理消息最常见的方法是使用QoS 1,然后在您的subscribe on_message中,您可以决定如何处理重复的消息。paho mqtt客户端允许您在发布或订阅时设置QoS,但它的缺省值为0。我已经更新了你下面的代码。
# publisher
import paho.mqtt.client as mqtt
from random import randrange, uniform
import time
import json
mqttBroker ="mqtt.eclipse.org"
client = mqtt.Client("Temperature_Inside")
client.connect(mqttBroker)
id = 1
while True:
randNumber = randrange(10)
message_dict = { 'id': id, 'value': randNumber }
client.publish("TEMPERATURE", json.dumps(message_dict), 1)
print("Just published " + str(randNumber) + " to topic TEMPERATURE")
id += 1
time.sleep(1)
# subscriber
import paho.mqtt.client as mqtt
import time
import json
from datetime import datetime
parsed_messages = {}
def on_message(client, userdata, message):
json_body = json.loads(str(message.payload.decode("utf-8")))
msg_id = json_body['id']
if msg_id in parsed_messages.keys
print("Message already recieved at: ", parsed_messages[msg_id].strftime("%H:%M:%S"))
else
print("received message: " ,json_body['value'])
parsed_messages[msg_key] = datetime.now()
mqttBroker ="mqtt.eclipse.org"
client = mqtt.Client("Smartphone")
client.connect(mqttBroker)
client.loop_start()
client.subscribe("TEMPERATURE", 1)
client.on_message=on_message
time.sleep(1)
client.loop_stop()请注意,重要的是,订阅者还在订阅时定义QoS 1,否则它将使用QoS 0进行订阅,这是paho客户端的默认设置,消息将降级为0,这意味着消息将最多传递一次(但如果丢弃数据包,则可能根本不会传递)。如上所述,上述操作仅确保消息将被订户接收。如果您希望在订阅者处理完消息时通知发布者,则需要在订阅者完成发布者可以订阅的处理后发布新主题(使用某些uuid)。然而,当我看到这样做的时候,我经常会问为什么要使用MQTT,而不仅仅是发送HTTP请求。如果您感兴趣,Here是MQTT QoS上的一个很好的链接(尽管它缺乏关于订户端发生的事情的细节)。
https://stackoverflow.com/questions/64948328
复制相似问题