首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >为什么脚本没有接收到来自蚊子的数据?

为什么脚本没有接收到来自蚊子的数据?
EN

Stack Overflow用户
提问于 2021-10-16 23:00:02
回答 1查看 106关注 0票数 0

我有以下代码,可以从不同的MQTT主题接收数据。

代码语言:javascript
复制
#!/usr/bin/python3
import paho.mqtt.client as mqtt
import pymysql
import json
import sys
import time

#-### Constantes ####
PUERTO_MQTT = ...
USUARIO = ...
CLAVE = ...
#DB_LOCAL = ...
DB_LOCAL = ...
RETARDO_SUSCRIP = 30 #s

#-### Variables ####
idsNgsConectados = set()
clienteMQTT = mqtt.Client("receptor_datos_ngs")
db_local = None
t1 = 0
t2 = 0

#-### Funciones ####
def on_connect(clienteMQTT, userdata, flags, rc):
    print("Conexion establecida con el broker MQTT correctamente")

def on_message(clienteMQTT, userdata, msg):
    msg_str = msg.payload.decode('utf-8')

    # procesamiento de los mensajes MQTT
    posPrimeraBarra = msg.topic.find('/')

    print("topic: {}".format(msg.topic))
    print("datos: {}".format(msg_str))

    if posPrimeraBarra != -1:
        id_ngs = int(msg.topic[:posPrimeraBarra])
        ref_medicion = msg.topic[posPrimeraBarra+1:]
        variables = json.loads(msg_str)

        if type(variables) is dict:
            if id_ngs in idsNgsConectados:
                try:
                    cursor = db_local.cursor()
                    cursor.execute("INSERT INTO mediciones (id_ngs, referencia) VALUES ({}, '{}')".format(id_ngs, ref_medicion))
                    db_local.commit()
                    cursor.execute("SELECT id FROM mediciones WHERE (id_ngs={} AND referencia='{}') ORDER BY id DESC LIMIT 1".format(id_ngs, ref_medicion))
                    id_medicion = cursor.fetchone()[0]
                    for var in variables:
                        cursor.execute("INSERT INTO valores_mediciones (id_medicion, variable, valor) VALUES ({},'{}',{})".format(id_medicion, var, variables[var]))
                    db_local.commit()
                    cursor.close()
                except Exception as e:
                    db_local.rollback()
                    print("Error 1: fallo el procesamiento de un mensaje MQTT: " + str(e), file=sys.stderr)
                    sys.exit(1)
            else:
                print("Advertencia: un ngs envio datos sin haberse presentado, por lo que se le indicara que salude primero", file=sys.stderr)
                clienteMQTT.publish(str(id_ngs),"saludar",qos=1)
        else:
            print("Advertencia: un ngs envio datos con un formato incorrecto por lo que se le indicara que se reinicie", file=sys.stderr)
            clienteMQTT.publish(str(id_ngs),"reiniciar",qos=1)

def sub_topics():
    global clienteMQTT
    cursor = db_local.cursor()
    cursor.execute('SELECT id FROM ngs')
    ids_ngs = cursor.fetchall()
    cursor.close()
    print("Topics suscriptos:")
    for id_ngs in ids_ngs:
        id_ngs = id_ngs[0]
        idsNgsConectados.add(id_ngs)
        topic = str(id_ngs) + '/#'
        print("\t{}".format(topic))
        clienteMQTT.subscribe(topic)

try:
    db_local = pymysql.connect( unix_socket='/run/mysqld/mysqld.sock',
                    user=USUARIO,
                    password=CLAVE,
                    db=DB_LOCAL )

    clienteMQTT.on_connect = on_connect
    clienteMQTT.on_message = on_message
    clienteMQTT.username_pw_set(username=USUARIO, password=CLAVE)

    clienteMQTT.connect("localhost", PUERTO_MQTT)
    clienteMQTT.loop_start()

    # Bucle infinito
    while True:
        time.sleep(RETARDO_SUSCRIP)
        # Suscripcion a los topics de todos los ngs
        sub_topics()
except Exception as e:
    db_local.close()
    clienteMQTT.loop_stop()
    clienteMQTT.disconnect()
    print("Error 2: desconocido: " + str(e), file=sys.stderr)
    sys.exit(2)

该脚本以动态方式订阅主题。如果我从shell运行这个脚本,它工作得很好,但是如果将它设置为在引导时使用systemd运行,它将失败。我已经将单元文件设置为需要并在蚊子、mariadb、dhcpcd和wpa_supplicant服务之后运行。我确信数据是从另一个设备发布的,因为我可以使用"mosquitto_sub“接收它。这可能是什么原因?

它运行在树莓派的零w和蚊子的版本是1.5.7。

EN

回答 1

Stack Overflow用户

发布于 2021-10-19 02:24:58

我可以找到问题的原因,这不是因为mosquittopaho-mqtt,而是因为pymysql。代码中的错误是我没有在sql事务中使用commit()方法。这导致代码无法获取由数据库中的另一个进程引入的新数据。这是因为InnoDB的默认隔离级别可重复读取。您可以阅读有关该here的更多信息。创建sql事务的正确方法如下所示:

代码语言:javascript
复制
def sub_topics():
    cursor = db_local.cursor()
    cursor.execute('SELECT id FROM ngs')
    ids_ngs = cursor.fetchall()
    print("Topics suscriptos:")
    for id_ngs in ids_ngs:
        id_ngs = id_ngs[0]
        idsNgsConectados.add(id_ngs)
        topic = str(id_ngs) + '/#'
        print("\t{}".format(topic))
        clienteMQTT.subscribe(topic)
    db_local.commit()
    cursor.close()

该代码使用这些数据来确定它必须订阅哪些主题,并且由于该代码没有订阅预期的主题,所以它没有接收来自蚊子的相关数据。

pymysql文档没有涉及到这个主题,所以我建议新手(我已经意识到我是50%的菜鸟,50%的专业人员)去阅读有关ACID和您正在使用的关系型数据库管理系统的实现细节。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69600143

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档