我正在做一个实时数据流项目,每隔9分钟解析和存储数据。我的目标是丢弃第一分钟的数据(作为缓冲区),并且每4分钟从服务器存储一次数据。然后,数据将被解析为其他要进行聚类和计算的函数(此处未包括的函数)。
我已经在'on_message‘函数中初始化了条件,并在该函数中进行了数据解析。我不认为我的结构和召唤是实现我的目标的正确方式。如果你需要更多的细节,请告诉我。
on_message
def on_message(r_c_client, userdata, message):
if (message.topic == "scanning"):
c = datetime.now().time()
current = (c.hour * 60 + c.minute) * 60 + c.second
time.sleep(60) #initial delay
data = json.loads(message.payload.decode("utf-8"))
x = data['host']
y = data['data']
hostList = store(x, y)
while (current>=total_Time ):
#time.sleep(60) #initial delay
nodeList = listToDf(hostList)
nodeDf= df_reformat(nodeList)
print clustering_results_reformat(process_startTime, nodeDf)存储功能
def store(host, data):
if host in hostList:
hostList[host].append(data)
else:
hostList[host] = [data]
return hostListmain
global process_startTime
t = datetime.now().time()
process_startTime = (t.hour * 60 + t.minute) * 60 + t.second
total_Time = process_startTime + 300 #4 minutes + 1 minute
print t , process_startTime
broker_address = '10.10.0.100'
c_client = mqtt.Client("trilateration")
c_client.on_connect = on_connect
c_client.on_message = on_message
c_client.on_subscribe = on_subscribe
c_client.connect(broker_address, 1883)
c_client.loop_forever()发布于 2017-11-03 09:14:27
首先,您不应该在on_message函数中阻塞(睡眠),对于接收到的每一条消息都会调用该函数,如果您睡着了,那么系统将不得不等待那么长的时间才能转到下一条消息。
接下来,您需要跟踪on_message函数之外的开始时间,然后可以将当前时间与每个消息的这个值进行比较,并决定是否要保持/处理它。
def on_message(r_c_client, userdata, message):
global process_startTime
if (message.topic == "scanning"):
c = datetime.now().time()
current = (c.hour * 60 + c.minute) * 60 + c.second
if (current<=total_Time and current>=(process_startTime + 60)):
data = json.loads(message.payload.decode("utf-8"))
x = data['host']
y = data['data']
hostList = store(x, y)主要内容应该如下所示:
global process_startTime
t = datetime.now().time()
process_startTime = (t.hour * 60 + t.minute) * 60 + t.second
total_Time = process_startTime + 300 #4 minutes + 1 minute
print t , process_startTime
broker_address = '10.10.0.100'
c_client = mqtt.Client("trilateration")
c_client.on_connect = on_connect
c_client.on_message = on_message
c_client.on_subscribe = on_subscribe
c_client.connect(broker_address, 1883)
while (True):
c_client.loop()
c = datetime.now().time()
current = (c.hour * 60 + c.minute) * 60 + c.second
if (current >= total_Time):
nodeList = listToDf(hostList)
nodeDf= df_reformat(nodeList)
print clustering_results_reformat(process_startTime, nodeDf)
time.sleep(1)https://stackoverflow.com/questions/47088883
复制相似问题