我有以下代码,可以从不同的MQTT主题接收数据。
#!/usr/bin/python3
import paho.mqtt.client as mqtt
import pymysql
import json
import sys
import time
#-### Constantes ####
PUERTO_MQTT = ...
USUARIO = ...
CLAVE = ...
#DB_LOCAL = ...
DB_LOCAL = ...
RETARDO_SUSCRIP = 30 #s
#-### Variables ####
idsNgsConectados = set()
clienteMQTT = mqtt.Client("receptor_datos_ngs")
db_local = None
t1 = 0
t2 = 0
#-### Funciones ####
def on_connect(clienteMQTT, userdata, flags, rc):
print("Conexion establecida con el broker MQTT correctamente")
def on_message(clienteMQTT, userdata, msg):
msg_str = msg.payload.decode('utf-8')
# procesamiento de los mensajes MQTT
posPrimeraBarra = msg.topic.find('/')
print("topic: {}".format(msg.topic))
print("datos: {}".format(msg_str))
if posPrimeraBarra != -1:
id_ngs = int(msg.topic[:posPrimeraBarra])
ref_medicion = msg.topic[posPrimeraBarra+1:]
variables = json.loads(msg_str)
if type(variables) is dict:
if id_ngs in idsNgsConectados:
try:
cursor = db_local.cursor()
cursor.execute("INSERT INTO mediciones (id_ngs, referencia) VALUES ({}, '{}')".format(id_ngs, ref_medicion))
db_local.commit()
cursor.execute("SELECT id FROM mediciones WHERE (id_ngs={} AND referencia='{}') ORDER BY id DESC LIMIT 1".format(id_ngs, ref_medicion))
id_medicion = cursor.fetchone()[0]
for var in variables:
cursor.execute("INSERT INTO valores_mediciones (id_medicion, variable, valor) VALUES ({},'{}',{})".format(id_medicion, var, variables[var]))
db_local.commit()
cursor.close()
except Exception as e:
db_local.rollback()
print("Error 1: fallo el procesamiento de un mensaje MQTT: " + str(e), file=sys.stderr)
sys.exit(1)
else:
print("Advertencia: un ngs envio datos sin haberse presentado, por lo que se le indicara que salude primero", file=sys.stderr)
clienteMQTT.publish(str(id_ngs),"saludar",qos=1)
else:
print("Advertencia: un ngs envio datos con un formato incorrecto por lo que se le indicara que se reinicie", file=sys.stderr)
clienteMQTT.publish(str(id_ngs),"reiniciar",qos=1)
def sub_topics():
global clienteMQTT
cursor = db_local.cursor()
cursor.execute('SELECT id FROM ngs')
ids_ngs = cursor.fetchall()
cursor.close()
print("Topics suscriptos:")
for id_ngs in ids_ngs:
id_ngs = id_ngs[0]
idsNgsConectados.add(id_ngs)
topic = str(id_ngs) + '/#'
print("\t{}".format(topic))
clienteMQTT.subscribe(topic)
try:
db_local = pymysql.connect( unix_socket='/run/mysqld/mysqld.sock',
user=USUARIO,
password=CLAVE,
db=DB_LOCAL )
clienteMQTT.on_connect = on_connect
clienteMQTT.on_message = on_message
clienteMQTT.username_pw_set(username=USUARIO, password=CLAVE)
clienteMQTT.connect("localhost", PUERTO_MQTT)
clienteMQTT.loop_start()
# Bucle infinito
while True:
time.sleep(RETARDO_SUSCRIP)
# Suscripcion a los topics de todos los ngs
sub_topics()
except Exception as e:
db_local.close()
clienteMQTT.loop_stop()
clienteMQTT.disconnect()
print("Error 2: desconocido: " + str(e), file=sys.stderr)
sys.exit(2)该脚本以动态方式订阅主题。如果我从shell运行这个脚本,它工作得很好,但是如果将它设置为在引导时使用systemd运行,它将失败。我已经将单元文件设置为需要并在蚊子、mariadb、dhcpcd和wpa_supplicant服务之后运行。我确信数据是从另一个设备发布的,因为我可以使用"mosquitto_sub“接收它。这可能是什么原因?
它运行在树莓派的零w和蚊子的版本是1.5.7。
发布于 2021-10-19 02:24:58
我可以找到问题的原因,这不是因为mosquitto或paho-mqtt,而是因为pymysql。代码中的错误是我没有在sql事务中使用commit()方法。这导致代码无法获取由数据库中的另一个进程引入的新数据。这是因为InnoDB的默认隔离级别可重复读取。您可以阅读有关该here的更多信息。创建sql事务的正确方法如下所示:
def sub_topics():
cursor = db_local.cursor()
cursor.execute('SELECT id FROM ngs')
ids_ngs = cursor.fetchall()
print("Topics suscriptos:")
for id_ngs in ids_ngs:
id_ngs = id_ngs[0]
idsNgsConectados.add(id_ngs)
topic = str(id_ngs) + '/#'
print("\t{}".format(topic))
clienteMQTT.subscribe(topic)
db_local.commit()
cursor.close()该代码使用这些数据来确定它必须订阅哪些主题,并且由于该代码没有订阅预期的主题,所以它没有接收来自蚊子的相关数据。
pymysql文档没有涉及到这个主题,所以我建议新手(我已经意识到我是50%的菜鸟,50%的专业人员)去阅读有关ACID和您正在使用的关系型数据库管理系统的实现细节。
https://stackoverflow.com/questions/69600143
复制相似问题