首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在mqtt中每隔9分钟存储和解析实时流数据(python)

如何在mqtt中每隔9分钟存储和解析实时流数据(python)
EN

Stack Overflow用户
提问于 2017-11-03 05:12:46
回答 1查看 1.2K关注 0票数 0

我正在做一个实时数据流项目,每隔9分钟解析和存储数据。我的目标是丢弃第一分钟的数据(作为缓冲区),并且每4分钟从服务器存储一次数据。然后,数据将被解析为其他要进行聚类和计算的函数(此处未包括的函数)。

我已经在'on_message‘函数中初始化了条件,并在该函数中进行了数据解析。我不认为我的结构和召唤是实现我的目标的正确方式。如果你需要更多的细节,请告诉我。

on_message

代码语言:javascript
复制
def on_message(r_c_client, userdata, message):

    if (message.topic == "scanning"):

     c = datetime.now().time()
     current = (c.hour * 60 + c.minute) * 60 + c.second

     time.sleep(60) #initial delay

     data = json.loads(message.payload.decode("utf-8"))
     x = data['host']
     y = data['data']

     hostList = store(x, y)

     while (current>=total_Time ):
      #time.sleep(60) #initial delay


        nodeList = listToDf(hostList)


        nodeDf= df_reformat(nodeList)
        print clustering_results_reformat(process_startTime, nodeDf)

存储功能

代码语言:javascript
复制
def store(host, data):





  if host in hostList:
      hostList[host].append(data)

  else:
      hostList[host] = [data]

  return hostList

main

代码语言:javascript
复制
global process_startTime

t = datetime.now().time()

process_startTime = (t.hour * 60 + t.minute) * 60 + t.second

total_Time = process_startTime + 300 #4 minutes + 1 minute 

print t , process_startTime

broker_address = '10.10.0.100'
c_client = mqtt.Client("trilateration")
c_client.on_connect = on_connect


c_client.on_message = on_message
c_client.on_subscribe = on_subscribe


c_client.connect(broker_address, 1883)

c_client.loop_forever()
EN

回答 1

Stack Overflow用户

发布于 2017-11-03 09:14:27

首先,您不应该在on_message函数中阻塞(睡眠),对于接收到的每一条消息都会调用该函数,如果您睡着了,那么系统将不得不等待那么长的时间才能转到下一条消息。

接下来,您需要跟踪on_message函数之外的开始时间,然后可以将当前时间与每个消息的这个值进行比较,并决定是否要保持/处理它。

代码语言:javascript
复制
def on_message(r_c_client, userdata, message):
  global process_startTime

  if (message.topic == "scanning"):
   c = datetime.now().time()
   current = (c.hour * 60 + c.minute) * 60 + c.second

   if (current<=total_Time and current>=(process_startTime + 60)):
     data = json.loads(message.payload.decode("utf-8"))
     x = data['host']
     y = data['data']

     hostList = store(x, y)

主要内容应该如下所示:

代码语言:javascript
复制
global process_startTime

t = datetime.now().time()

process_startTime = (t.hour * 60 + t.minute) * 60 + t.second
total_Time = process_startTime + 300 #4 minutes + 1 minute 
print t , process_startTime

broker_address = '10.10.0.100'
c_client = mqtt.Client("trilateration")
c_client.on_connect = on_connect

c_client.on_message = on_message
c_client.on_subscribe = on_subscribe
c_client.connect(broker_address, 1883)

while (True):
  c_client.loop()
  c = datetime.now().time()
  current = (c.hour * 60 + c.minute) * 60 + c.second
  if (current >= total_Time):
    nodeList = listToDf(hostList)
    nodeDf= df_reformat(nodeList)
    print clustering_results_reformat(process_startTime, nodeDf)
  time.sleep(1)
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/47088883

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档