首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >有没有办法用pandas从redis服务器上读取数据?

有没有办法用pandas从redis服务器上读取数据?
EN

Stack Overflow用户
提问于 2021-01-04 10:40:49
回答 1查看 274关注 0票数 0

我正在做一个使用IoT设备的项目,这些设备连接到托管在Azure云中的dotnet服务器。我目前正在使用for循环来读取实时数据,但我想使用Pandas从Redis数据库中读取一些实时数据。有人能给我解释一下怎么开始吗?

使用下面的脚本读取统计数据,但希望开始使用pandas。

代码语言:javascript
复制
import os
import re
import json
import traceback
from collections import Counter
import time
import datetime as dt
import redis
from tqdm import tqdm  # taqadum (تقدّم) == progress
from jsonpointer import resolve_pointer as j_get 
from jsonpointer import JsonPointerException
import pandas as pd

os.system("color 0c")  # change console color to red

if False:
    # x Redis
    r = redis.Redis(host="****.redis.cache.windows.net", 
                    port=***,
                    password="***",
                    ssl=True,)
else:
    # y Redis
    r = redis.Redis(host="***.redis.cache.windows.net", 
                    port=****,
                    password="*****",
                    ssl=True,)


print(r.info())
print("Server started at: ", end="")
print(dt.datetime.now() - dt.timedelta(seconds=r.info()['uptime_in_seconds']))

print("Building pipe")
pipe = r.pipeline()
# for key in tqdm(r.scan_iter("MC:SessionInfo*")):
for key in tqdm(r.scan_iter("MC:SessionInfo*", count=2500)):
    pipe.hgetall(key)

print("Executing pipe")
responses = pipe.execute()
print("Processing effluvia")


q = {}
k={}
first = True
last_contact = {}
for data in tqdm(responses):
    try:
        j = json.loads(data[b'LastStatusBody'])
        serial = j['System']['Serial'].lower()
     
        q[serial] = j
        last_contact[serial] = time.time() - int(data[b'LastContact'])
        # TODO: json searching sensibly!
        vac[serial] = j['LiveA']['Unit']['Volatge_Vac']
    except:
        if first:
            traceback.print_exc()
            first = False
        else:
            pass

for key,value in fw_versions.items():
    if value.split(',')[0]=="xx v1.0.0.0":
        x_paired.append(key)
print(x_paired)
print("Total paired :", len(x_paired))`

与上面的过程不同,我想从Pandas开始,以便轻松地读取数据,并为团队的日常更新做一些图表。

EN

回答 1

Stack Overflow用户

发布于 2021-01-04 17:09:42

我序列化/反序列化到pyarrowpickle,然后使用一个额外的键作为元数据。这适用于本地、GCloud、AWS EB和Azure

代码语言:javascript
复制
import pandas as pd
import pyarrow as pa, os
import redis,json, os, pickle
import ebutils
from logenv import logenv
from pandas.core.frame import DataFrame
from redis.client import Redis
from typing import (Union, Optional)


class mycache():
    __redisClient:Redis
    CONFIGKEY = "cacheconfig"

    def __init__(self) -> None:
        try:
            ep = os.environ["REDIS_HOST"]
        except KeyError:
            if os.environ["HOST_ENV"] == "GCLOUD":
                os.environ["REDIS_HOST"] = "redis://10.0.0.3"
            elif os.environ["HOST_ENV"] == "EB":
                os.environ["REDIS_HOST"] = "redis://" + ebutils.get_redis_endpoint()
            elif os.environ["HOST_ENV"] == "AZURE":
                #os.environ["REDIS_HOST"] = "redis://ignore:password@redis-sensorvenv.redis.cache.windows.net"
                pass # should be set in azure env variable
            elif os.environ["HOST_ENV"] == "LOCAL":
                os.environ["REDIS_HOST"] = "redis://127.0.0.1"
            else:
                raise "could not initialise redis"
                return # no known redis setup

        #self.__redisClient = redis.Redis(host=os.environ["REDIS_HOST"])
        self.__redisClient = redis.Redis.from_url(os.environ["REDIS_HOST"])
        self.__redisClient.ping()
        # get config as well...
        self.config = self.get(self.CONFIGKEY)
        if self.config is None:
            self.config = {"pyarrow":True, "pickle":False}
            self.set(self.CONFIGKEY, self.config)
        self.alog = logenv.alog()

    def redis(self) -> Redis:
        return self.__redisClient


    def exists(self, key:str) -> bool:
        if self.__redisClient is None:
            return False

        return self.__redisClient.exists(key) == 1

    def get(self, key:str) -> Union[DataFrame, str]:
        keytype = "{k}.type".format(k=key)
        valuetype = self.__redisClient.get(keytype)
        if valuetype is None:
            if (key.split(".")[-1] == "pickle"):
                return pickle.loads(self.redis().get(key))
            else:
                ret = self.redis().get(key)
                if ret is None:
                    return ret
                else:
                    return ret.decode()
        elif valuetype.decode() == str(pd.DataFrame):
            # fallback to pickle serialized form if pyarrow fails
            # https://issues.apache.org/jira/browse/ARROW-7961
            try:
                return pa.deserialize(self.__redisClient.get(key))
            except pa.lib.ArrowIOError as err:
                self.alog.warning("using pickle from cache %s - %s - %s", key, pa.__version__, str(err))
                return pickle.loads(self.redis().get(f"{key}.pickle"))
            except OSError as err:
                if "Expected IPC" in str(err):
                    self.alog.warning("using pickle from cache %s - %s - %s", key, pa.__version__, str(err))
                    return pickle.loads(self.redis().get(f"{key}.pickle"))
                else:
                    raise err

        elif valuetype.decode() == str(type({})):
            return json.loads(self.__redisClient.get(key).decode())
        else:
            return self.__redisClient.get(key).decode() # type: ignore

    def set(self, key:str, value:Union[DataFrame, str]) -> None:
        if self.__redisClient is None:
            return
        keytype = "{k}.type".format(k=key)

        if str(type(value)) == str(pd.DataFrame):
            self.__redisClient.set(key, pa.serialize(value).to_buffer().to_pybytes())
            if self.config["pickle"]:
                self.redis().set(f"{key}.pickle", pickle.dumps(value))
                # issue should be transient through an upgrade....
                # once switched off data can go away
                self.redis().expire(f"{key}.pickle", 60*60*24)
        elif str(type(value)) == str(type({})):
            self.__redisClient.set(key, json.dumps(value))
        else:
            self.__redisClient.set(key, value)

        self.__redisClient.set(keytype, str(type(value)))


if __name__ == '__main__':
    os.environ["HOST_ENV"] = "LOCAL"
    r = mycache()
    rr = r.redis()
    for k in rr.keys("cache*"):
        print(k.decode(), rr.ttl(k))
        print(rr.get(k.decode()))
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65557067

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档