我正在做一个使用IoT设备的项目,这些设备连接到托管在Azure云中的dotnet服务器。我目前正在使用for循环来读取实时数据,但我想使用Pandas从Redis数据库中读取一些实时数据。有人能给我解释一下怎么开始吗?
使用下面的脚本读取统计数据,但希望开始使用pandas。
import os
import re
import json
import traceback
from collections import Counter
import time
import datetime as dt
import redis
from tqdm import tqdm # taqadum (تقدّم) == progress
from jsonpointer import resolve_pointer as j_get
from jsonpointer import JsonPointerException
import pandas as pd
os.system("color 0c") # change console color to red
if False:
# x Redis
r = redis.Redis(host="****.redis.cache.windows.net",
port=***,
password="***",
ssl=True,)
else:
# y Redis
r = redis.Redis(host="***.redis.cache.windows.net",
port=****,
password="*****",
ssl=True,)
print(r.info())
print("Server started at: ", end="")
print(dt.datetime.now() - dt.timedelta(seconds=r.info()['uptime_in_seconds']))
print("Building pipe")
pipe = r.pipeline()
# for key in tqdm(r.scan_iter("MC:SessionInfo*")):
for key in tqdm(r.scan_iter("MC:SessionInfo*", count=2500)):
pipe.hgetall(key)
print("Executing pipe")
responses = pipe.execute()
print("Processing effluvia")
q = {}
k={}
first = True
last_contact = {}
for data in tqdm(responses):
try:
j = json.loads(data[b'LastStatusBody'])
serial = j['System']['Serial'].lower()
q[serial] = j
last_contact[serial] = time.time() - int(data[b'LastContact'])
# TODO: json searching sensibly!
vac[serial] = j['LiveA']['Unit']['Volatge_Vac']
except:
if first:
traceback.print_exc()
first = False
else:
pass
for key,value in fw_versions.items():
if value.split(',')[0]=="xx v1.0.0.0":
x_paired.append(key)
print(x_paired)
print("Total paired :", len(x_paired))`与上面的过程不同,我想从Pandas开始,以便轻松地读取数据,并为团队的日常更新做一些图表。
发布于 2021-01-04 17:09:42
我序列化/反序列化到pyarrow或pickle,然后使用一个额外的键作为元数据。这适用于本地、GCloud、AWS EB和Azure
import pandas as pd
import pyarrow as pa, os
import redis,json, os, pickle
import ebutils
from logenv import logenv
from pandas.core.frame import DataFrame
from redis.client import Redis
from typing import (Union, Optional)
class mycache():
__redisClient:Redis
CONFIGKEY = "cacheconfig"
def __init__(self) -> None:
try:
ep = os.environ["REDIS_HOST"]
except KeyError:
if os.environ["HOST_ENV"] == "GCLOUD":
os.environ["REDIS_HOST"] = "redis://10.0.0.3"
elif os.environ["HOST_ENV"] == "EB":
os.environ["REDIS_HOST"] = "redis://" + ebutils.get_redis_endpoint()
elif os.environ["HOST_ENV"] == "AZURE":
#os.environ["REDIS_HOST"] = "redis://ignore:password@redis-sensorvenv.redis.cache.windows.net"
pass # should be set in azure env variable
elif os.environ["HOST_ENV"] == "LOCAL":
os.environ["REDIS_HOST"] = "redis://127.0.0.1"
else:
raise "could not initialise redis"
return # no known redis setup
#self.__redisClient = redis.Redis(host=os.environ["REDIS_HOST"])
self.__redisClient = redis.Redis.from_url(os.environ["REDIS_HOST"])
self.__redisClient.ping()
# get config as well...
self.config = self.get(self.CONFIGKEY)
if self.config is None:
self.config = {"pyarrow":True, "pickle":False}
self.set(self.CONFIGKEY, self.config)
self.alog = logenv.alog()
def redis(self) -> Redis:
return self.__redisClient
def exists(self, key:str) -> bool:
if self.__redisClient is None:
return False
return self.__redisClient.exists(key) == 1
def get(self, key:str) -> Union[DataFrame, str]:
keytype = "{k}.type".format(k=key)
valuetype = self.__redisClient.get(keytype)
if valuetype is None:
if (key.split(".")[-1] == "pickle"):
return pickle.loads(self.redis().get(key))
else:
ret = self.redis().get(key)
if ret is None:
return ret
else:
return ret.decode()
elif valuetype.decode() == str(pd.DataFrame):
# fallback to pickle serialized form if pyarrow fails
# https://issues.apache.org/jira/browse/ARROW-7961
try:
return pa.deserialize(self.__redisClient.get(key))
except pa.lib.ArrowIOError as err:
self.alog.warning("using pickle from cache %s - %s - %s", key, pa.__version__, str(err))
return pickle.loads(self.redis().get(f"{key}.pickle"))
except OSError as err:
if "Expected IPC" in str(err):
self.alog.warning("using pickle from cache %s - %s - %s", key, pa.__version__, str(err))
return pickle.loads(self.redis().get(f"{key}.pickle"))
else:
raise err
elif valuetype.decode() == str(type({})):
return json.loads(self.__redisClient.get(key).decode())
else:
return self.__redisClient.get(key).decode() # type: ignore
def set(self, key:str, value:Union[DataFrame, str]) -> None:
if self.__redisClient is None:
return
keytype = "{k}.type".format(k=key)
if str(type(value)) == str(pd.DataFrame):
self.__redisClient.set(key, pa.serialize(value).to_buffer().to_pybytes())
if self.config["pickle"]:
self.redis().set(f"{key}.pickle", pickle.dumps(value))
# issue should be transient through an upgrade....
# once switched off data can go away
self.redis().expire(f"{key}.pickle", 60*60*24)
elif str(type(value)) == str(type({})):
self.__redisClient.set(key, json.dumps(value))
else:
self.__redisClient.set(key, value)
self.__redisClient.set(keytype, str(type(value)))
if __name__ == '__main__':
os.environ["HOST_ENV"] = "LOCAL"
r = mycache()
rr = r.redis()
for k in rr.keys("cache*"):
print(k.decode(), rr.ttl(k))
print(rr.get(k.decode()))https://stackoverflow.com/questions/65557067
复制相似问题