上一篇文章提到了DuckDB数据库, 这里介绍下语法,方便了解的同学入门学习。
DuckDB 是一款轻量级、高性能的嵌入式分析型数据库(OLAP),专为高效处理数据分析任务设计。它支持标准 SQL 语法,并提供丰富的扩展功能。
DuckDB 支持内存数据库(临时)和持久化文件数据库:
# 内存数据库(数据进程退出后消失)
conn = duckdb.connect(database=':memory:')
# 持久化数据库(数据保存到文件)
conn = duckdb.connect(database='stock_data.db') # 自动创建或连接现有文件支持标准 SQL 语法,定义列类型、主键约束等:
CREATE TABLE IF NOTEXISTS stock_daily (
code VARCHAR, -- 文本类型
date DATE, -- 日期类型
open FLOAT, -- 浮点数
volume BIGINT, -- 大整数
PRIMARY KEY (code, date) -- 联合主键(避免重复)[1,8](@ref)
);
-- 列出所有表 SHOW TABLES; -- 查看表结构 DESCRIBE stock_daily; [](@ref)
支持批量插入和冲突处理(ON CONFLICT):
-- 单条插入
INSERT INTO stock_daily VALUES ('000001', '2025-07-19', 15.2, );
-- 批量插入
INSERT INTO stock_daily
VALUES
('000001', '2025-07-18', 14.8, ),
('600000', '2025-07-18', 22.1, ); [](@ref)
-- 冲突时更新(UPSERT)
INSERTOR REPLACE INTO stock_daily
SELECT*FROM new_data; -- 若主键冲突,则更新原有记录
支持完整 SQL 查询(过滤、聚合、排序等):
-- 基础查询
SELECT code, date, close
FROM stock_daily
WHEREclose>ANDdate>='2025-01-01';
-- 聚合分析
SELECT code, AVG(close) as avg_price, SUM(volume) as total_volume
FROM stock_daily
GROUPBY code;
无需预处理,直接读取外部文件:
-- 自动推断列类型和表头
CREATE TABLE tmp ASSELECT*FROM read_csv_auto('data.csv');
-- 指定分隔符和跳过表头
COPY stock_daily FROM'data.csv' (DELIMITER ',', HEADER); [,](@ref)
-- 导出为 CSV COPY stock_daily TO'output.csv' (HEADER, DELIMITER ','); -- 导出查询结果 COPY (SELECT*FROM stock_daily) TO'result.parquet' (FORMAT PARQUET); [](@ref)
# 将 DataFrame 注册为 DuckDB 表 df = pd.DataFrame(...) conn.register('df_table', df) # 用 SQL 查询 DataFrame result = conn.execute("SELECT * FROM df_table WHERE volume > 10000").df()
conn.execute("CREATE TABLE ...") # DDL 操作 query_result = conn.execute("SELECT ...").fetchall() # 获取查询结果
最后提供一个简单的 DuckDB结合akshare 示例。需要代码的可以直接复制:
import akshare as ak
import duckdb
import pandas as pd
import os
from datetime import datetime
# 1. 初始化DuckDB数据库(自动创建)
conn = duckdb.connect(database='stock_data.db', read_only=False)
# 2. 创建表(若不存在)
conn.execute("""
CREATE TABLE IF NOT EXISTS stock_daily (
code VARCHAR, -- 股票代码
date DATE, -- 日期
open FLOAT, -- 开盘价
close FLOAT, -- 收盘价
high FLOAT, -- 最高价
low FLOAT, -- 最低价
volume BIGINT, -- 成交量(股)
turnover FLOAT, -- 成交额(元)
PRIMARY KEY (code, date) -- 主键约束,避免重复
);
""")
# 3. 获取单只股票数据(示例:平安银行)
def fetch_stock_data(code="000001", start_date="20200101", end_date=datetime.today().strftime("%Y%m%d")):
try:
# 调用AkShare接口
df = ak.stock_zh_a_hist(
symbol=code,
period="daily",
start_date=start_date,
end_date=end_date,
adjust="hfq" # 后复权
)
# 重命名列 & 转换数据类型
df = df.rename(columns={
"日期": "date",
"开盘": "open", "收盘": "close", "最高": "high", "最低": "low",
"成交量": "volume", "成交额": "turnover"
})
df['date'] = pd.to_datetime(df['date'])
df['code'] = code # 添加股票代码列
return df[['code', 'date', 'open', 'close', 'high', 'low', 'volume', 'turnover']]
except Exception as e:
print(f"获取股票 {code} 数据失败: {e}")
return pd.DataFrame()
# 4. 数据增量写入(避免重复)
def upsert_data(df, table_name="stock_daily"):
if df.empty:
return
# 使用DuckDB的UPSERT语法(主键冲突时更新)
conn.execute(f"""
INSERT OR REPLACE INTO {table_name}
SELECT * FROM df
""")
print(f"更新 {len(df)} 条数据,股票: {df['code'].iloc[0]}")
# 5. 示例调用
if __name__ == "__main__":
# 获取平安银行2025年数据
stock_df = fetch_stock_data(code="000001", start_date="20200101")
upsert_data(stock_df)
# 查询2025年收盘价大于20元的记录
result = conn.execute("""
SELECT code, date, close
FROM stock_daily
WHERE close > 20 AND date >= '2025-01-01'
""").df()
print("查询结果示例:\n", result.head())
# 6. 关闭连接
conn.close()如果我的分享对你投资有所帮助,不吝啬给个点赞关注呗。