
上篇文章写了DuckDB, 对于普通人来说, 可能用mysql数据保存量化数据更合适。 毕竟对于程序员来说, 大家一开始学的数据库基本是Mysql。选择自己最适合的更重要。 最近由于东方财富频繁访问问题,经常有同学问我数据源的问题,这里写一写tushare存入mysql。 希望对大家有所启发。
先下载mysql资源
1、mysql数据库下载, 比如选择5.7或8.0
2、我们可以选择下载mysql客户端比如navicat, 网上搜索很多。
3、新建一个库,比如db_stock库,设置用户名root,密码假设12345678。
这里我写一个处理方案对日线数据入库的操作。
1、首先获取交易日日历(由于tushare需要2000积分, 这个接口暂时用akshare获取)
2、数据源, 最初我准备用miniqmt的。 想了想,就用tushare保存吧。 也方便不使用windows的同学。
3、我们看下tushare 的日线行情
接口:daily,可以通过数据工具调试和查看数据 数据说明:交易日每天15点~16点之间入库。本接口是未复权行情,停牌期间不提供数据 调取说明:基础积分每分钟内可调取500次,每次6000条数据,一次请求相当于提取一个股票23年历史 描述:获取股票行情数据,或通过通用行情接口获取数据,包含了前后复权数据
4、替换掉代码中的token为自己的
既然接口基础积分每分钟内可调取500次,每次6000条数据。 我就做个判断每分钟最多调用多少次就sleep。 第一次初始化需要一些时间。 后续每天16:00定时获取一次就可以
最后附上完整代码,需要的自取。 备注:如果发现格式有多余的特殊字符,用普通浏览器打开复制应该没问题。
import tushare as ts
import pandas as pd
from sqlalchemy import create_engine
import schedule
import time
from datetime import datetime, timedelta
import akshare as ak
# 初始化Tushare Pro
ts.set_token('XXXXX') # 替换为实际Token
pro = ts.pro_api()
# 数据库配置
engine = create_engine('mysql://root:12345678@localhost/db_stock?charset=utf8')
def get_trade_dates(start_date, end_date):
"""
使用AkShare获取指定日期范围内的交易日历
参数格式:YYYYMMDD (如:20200101)
"""
# 获取完整的交易日历(为啥不用tushare,积分不够)
trade_date_df = ak.tool_trade_date_hist_sina()
# 格式转换:将日期列转换为YYYYMMDD格式
trade_date_df['trade_date'] = pd.to_datetime(trade_date_df['trade_date']).dt.strftime('%Y%m%d')
# 过滤指定日期范围
mask = (trade_date_df['trade_date'] >= start_date) & (trade_date_df['trade_date'] <= end_date)
trade_dates = trade_date_df.loc[mask, 'trade_date'].tolist()
return trade_dates
def init_historical_data():
"""
初始化历史数据(20200102 - 20250719)
按交易日获取全市场数据,避免个股循环
"""
# 获取交易日历
trade_dates = get_trade_dates('20200101', '20250719')
print(f"📅 共获取 {len(trade_dates)} 个交易日数据")
for i, date in enumerate(trade_dates):
# 频率控制:每分钟不超过500次
if i % 400 == 0 and i > 0:
print(f"⏳ 已完成 {i} 个交易日,暂停60秒避免超频...")
time.sleep(60)
try:
# 单次获取当日全市场数据(约4800条)[6](@ref)
df = pro.daily(trade_date=date)
# 数据存储
df.to_sql('stock_daily', engine, if_exists='append', index=False)
print(f"✅ {date} 写入 {len(df)} 条")
except Exception as e:
print(f"❌ {date} 获取失败: {e}")
# 失败重试逻辑可扩展
def daily_update():
"""每日16:00执行的更新任务"""
today = datetime.now().strftime("%Y%m%d")
# 检查今日数据是否存在
if pd.read_sql(f"SELECT 1 FROM stock_daily WHERE trade_date='{today}'", engine).empty:
try:
df = pro.daily(trade_date=today)
if not df.empty:
df.to_sql('stock_daily', engine, if_exists='append', index=False)
print(f"🔄 更新{today}数据: {len(df)}条")
else:
print(f"⚠️ 今日{today}无交易数据(可能为节假日)")
except Exception as e:
print(f"🔥 更新失败: {e}")
else:
print(f"⏩ {today}数据已存在,跳过更新")
# 主程序逻辑
if __name__ == '__main__':
print("=" * 50)
print("📊 股票数据初始化系统启动")
print("=" * 50)
# 首次初始化历史数据
start_time = time.time()
init_historical_data()
duration = time.time() - start_time
print(f"⏱️ 历史数据初始化完成,耗时: {duration // 60:.0f}分{duration % 60:.0f}秒")
# 设置每日定时任务
schedule.every().day.at("16:00").do(daily_update)
print("\n🚀 定时任务守护进程已启动 (每日16:00执行)")
print("=" * 50)
while True:
schedule.run_pending()
time.sleep(60)如果我的分享对你投资有所帮助,不吝啬给个点赞关注呗。