首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Influxdb大容量插入使用进水数据库-python

Influxdb大容量插入使用进水数据库-python
EN

Stack Overflow用户
提问于 2020-07-07 05:39:33
回答 1查看 1.5K关注 0票数 1

我使用influxDB-Python插入从Redis-Stream读取的大量数据。因为Redis流和set maxlen=600和数据是以100ms的速度插入的,所以我需要保留它的所有数据。因此,我读取它并将其传输到influxDB(我不知道什么是更好的数据库),但是使用批处理只插入了每个batch_size的末尾的数据片段,似乎是覆盖的。以下代码

代码语言:javascript
复制
import redis
from apscheduler.schedulers.blocking import BlockingScheduler
import time
import datetime

import os
import struct
from influxdb import InfluxDBClient

def parse(datas):
    ts,data = datas
    w_json = {
    "measurement": 'sensor1',
    "fields": {
        "Value":data[b'Value'].decode('utf-8')
        "Count":data[b'Count'].decode('utf-8')
        }
    }
    return w_json

def archived_data(rs,client):
    results= rs.xreadgroup('group1', 'test', {'test1': ">"}, count=600)
    if(len(results)!=0):
        print("len(results[0][1]) = ",len(results[0][1]))
        datas = list(map(parse,results[0][1]))
        client.write_points(datas,batch_size=300)
        print('insert success')
    else:
        print("No new data is generated")

if __name__=="__main__":
    try:
        rs = redis.Redis(host="localhost", port=6379, db=0)
        rs.xgroup_destroy("test1", "group1")
        rs.xgroup_create('test1','group1','0-0')
    except Exception as e:
        print("error = ",e)
    try:
        client = InfluxDBClient(host="localhost", port=8086,database='test')
    except Exception as e:
        print("error = ", e)
    try:
        sched = BlockingScheduler()
        sched.add_job(test1, 'interval', seconds=60,args=[rs,client])
        sched.start()
    except Exception as e:
        print(e)

influxDB的数据更改如下

代码语言:javascript
复制
> select count(*) from sensor1;
name: sensor1
time count_Count count_Value
---- ----------- -----------
0    6           6
> select count(*) from sensor1;
name: sensor1
time count_Count count_Value
---- ----------- -----------
0    8           8

> select Count from sensor1;
name: sensor1
time                Count
----                -----
1594099736722564482 00000310
1594099737463373188 00000610
1594099795941527728 00000910
1594099796752396784 00001193
1594099854366369551 00001493
1594099855120826270 00001777
1594099913596094653 00002077
1594099914196135122 00002361

为什么数据似乎被覆盖,我如何解析它一次插入所有数据?

如果你能告诉我怎么解决这个问题,我会很感激的。

EN

回答 1

Stack Overflow用户

发布于 2020-07-07 08:33:08

您能提供更多关于您希望存储在流入DB中的数据结构的详细信息吗?但是,我希望下面的信息对您有所帮助。

在Influxdb中,时间戳+标记是唯一的(即两个具有相同标记值的数据点和时间戳不存在)。与SQL进水数据库不同,它不会抛出唯一的约束冲突,它会用传入的数据覆盖现有数据。您的数据似乎没有标记,因此,如果某些传入数据的时间戳已经存在于进水数据库中,则将覆盖现有数据。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62768787

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档