首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >量化之tushare日线数据入库Mysql例子

量化之tushare日线数据入库Mysql例子

作者头像
子晓聊技术
发布2026-04-23 15:08:21
发布2026-04-23 15:08:21
2140
举报
文章被收录于专栏:子晓AI量化子晓AI量化

上篇文章写了DuckDB, 对于普通人来说, 可能用mysql数据保存量化数据更合适。 毕竟对于程序员来说, 大家一开始学的数据库基本是Mysql。选择自己最适合的更重要。 最近由于东方财富频繁访问问题,经常有同学问我数据源的问题,这里写一写tushare存入mysql。 希望对大家有所启发。

先下载mysql资源

1、mysql数据库下载, 比如选择5.7或8.0

2、我们可以选择下载mysql客户端比如navicat, 网上搜索很多。

3、新建一个库,比如db_stock库,设置用户名root,密码假设12345678。

这里我写一个处理方案对日线数据入库的操作。

1、首先获取交易日日历(由于tushare需要2000积分, 这个接口暂时用akshare获取)

2、数据源, 最初我准备用miniqmt的。 想了想,就用tushare保存吧。 也方便不使用windows的同学。

3、我们看下tushare 的日线行情

接口:daily,可以通过数据工具调试和查看数据 数据说明:交易日每天15点~16点之间入库。本接口是未复权行情,停牌期间不提供数据 调取说明:基础积分每分钟内可调取500次,每次6000条数据,一次请求相当于提取一个股票23年历史 描述:获取股票行情数据,或通过通用行情接口获取数据,包含了前后复权数据

4、替换掉代码中的token为自己的

既然接口基础积分每分钟内可调取500次,每次6000条数据。 我就做个判断每分钟最多调用多少次就sleep。 第一次初始化需要一些时间。 后续每天16:00定时获取一次就可以

最后附上完整代码,需要的自取。 备注:如果发现格式有多余的特殊字符,用普通浏览器打开复制应该没问题。

代码语言:javascript
复制
import tushare as ts
import pandas as pd
from sqlalchemy import create_engine
import schedule
import time
from datetime import datetime, timedelta
import akshare as ak
# 初始化Tushare Pro
ts.set_token('XXXXX')  # 替换为实际Token
pro = ts.pro_api()
# 数据库配置 
engine = create_engine('mysql://root:12345678@localhost/db_stock?charset=utf8')
def get_trade_dates(start_date, end_date):
    """
    使用AkShare获取指定日期范围内的交易日历
    参数格式:YYYYMMDD (如:20200101)
    """
    # 获取完整的交易日历(为啥不用tushare,积分不够)
    trade_date_df = ak.tool_trade_date_hist_sina()
    # 格式转换:将日期列转换为YYYYMMDD格式
    trade_date_df['trade_date'] = pd.to_datetime(trade_date_df['trade_date']).dt.strftime('%Y%m%d')
    # 过滤指定日期范围
    mask = (trade_date_df['trade_date'] >= start_date) & (trade_date_df['trade_date'] <= end_date)
    trade_dates = trade_date_df.loc[mask, 'trade_date'].tolist()
    return trade_dates
def init_historical_data():
    """
    初始化历史数据(20200102 - 20250719)
    按交易日获取全市场数据,避免个股循环
    """
    # 获取交易日历
    trade_dates = get_trade_dates('20200101', '20250719')
    print(f"📅 共获取 {len(trade_dates)} 个交易日数据")
    for i, date in enumerate(trade_dates):
        # 频率控制:每分钟不超过500次
        if i % 400 == 0 and i > 0:
            print(f"⏳ 已完成 {i} 个交易日,暂停60秒避免超频...")
            time.sleep(60)
        try:
            # 单次获取当日全市场数据(约4800条)[6](@ref)
            df = pro.daily(trade_date=date)
            # 数据存储
            df.to_sql('stock_daily', engine, if_exists='append', index=False)
            print(f"✅ {date} 写入 {len(df)} 条")
        except Exception as e:
            print(f"❌ {date} 获取失败: {e}")
            # 失败重试逻辑可扩展
def daily_update():
    """每日16:00执行的更新任务"""
    today = datetime.now().strftime("%Y%m%d")
    # 检查今日数据是否存在
    if pd.read_sql(f"SELECT 1 FROM stock_daily WHERE trade_date='{today}'", engine).empty:
        try:
            df = pro.daily(trade_date=today)
            if not df.empty:
                df.to_sql('stock_daily', engine, if_exists='append', index=False)
                print(f"🔄 更新{today}数据: {len(df)}条")
            else:
                print(f"⚠️ 今日{today}无交易数据(可能为节假日)")
        except Exception as e:
            print(f"🔥 更新失败: {e}")
    else:
        print(f"⏩ {today}数据已存在,跳过更新")
# 主程序逻辑
if __name__ == '__main__':
    print("=" * 50)
    print("📊 股票数据初始化系统启动")
    print("=" * 50)
    # 首次初始化历史数据
    start_time = time.time()
    init_historical_data()
    duration = time.time() - start_time
    print(f"⏱️ 历史数据初始化完成,耗时: {duration // 60:.0f}分{duration % 60:.0f}秒")
    # 设置每日定时任务
    schedule.every().day.at("16:00").do(daily_update)
    print("\n🚀 定时任务守护进程已启动 (每日16:00执行)")
    print("=" * 50)
    while True:
        schedule.run_pending()
        time.sleep(60)

如果我的分享对你投资有所帮助,不吝啬给个点赞关注呗。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-07-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 子晓聊技术 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档