昨天没更文, 周五晚上先看了下凡人修仙传电视剧, 觉得剧情太慢,10多年前看过的凡人修仙小说剧情有点忘记了,又重温了下,囫囵吞枣看了1千多章。 只能说看小说误事。
简单写点正能量的感悟:
现实中,我们大多如韩立般平凡,没有耀眼的光环,可以凭借不懈的努力、谨慎的态度,抓住生活中的每一次“机缘”。不必羡慕他人的天赋异禀,脚踏实地,步步为营,平凡的我们,也能走出属于自己的不凡之路。
也许有人会说韩立有逆天小绿瓶,也许是的。 以炒股举例,我们需要有复利思维。紧跟每一次机遇,如果错过了就等下一次。
回到正题, 有同学问我 miniqmt复权的问题, 这里写一下。
download_history_data或 download_history_data2下载未复权数据(原始数据)到本地。这一步仅缓存原始数据,不涉及复权处理。2、数据查询阶段
通过 get_market_data或 get_market_data_ex的 dividend_type参数指定复权方式
复权类型 | 参数值 | 特点 | 适用场景 |
|---|---|---|---|
前复权 | front | 以当前价格为准调整历史价格,K线连续性好,反映实际交易成本 | 技术分析、实时交易策略 |
后复权 | back | 以历史价格为准调整当前价格,反映长期真实收益 | 长期收益回测、基本面分析 |
不复权 | none | 原始除权数据,存在价格断层 | 除权事件研究 |
而miniqmt提供了5种方式, 不复权、前复权、后复权、等比前复权、等比后复权这 5种复权方式。
有些同学可能不清楚 普通复权 和等比复权的区别,这里简单解释下。
普通复权(前/后复权)仅调整价格缺口,忽略分红再投资;而等比复权通过复权因子模拟“分红再投”的复利效应,更真实反映长期收益。
回测是为了更贴近历史数据,但实际中各类配股、增发的动作,会造成价格的异常波动,为了避免这样的波动对回测的影响,我们推荐用户在回测中使用等比前复权价,这样在回测过程中,无需考虑配股、增发带来的变化,始终以统一标准的价格进行买卖,方便的同时也能得到更贴合历史数据的回测收益和表现。
这里写一个简单例子
from xtquant import xtdata
import time
# 1. 下载未复权数据
xtdata.download_history_data2(
stock_list=["000001.SZ", "600000.SH"],
period="1d",
start_time="20250102",
callback=lambda data: print(f"进度: {data['finished']}/{data['total']}")
)
time.sleep(5) # 等待下载完成
# 2. 查询前复权数据
data_front = xtdata.get_market_data(
field_list=["open", "close", "high", "low"],
stock_list=["000001.SZ"],
period="1d",
start_time="20250102",
dividend_type="front" # 前复权
)
# 3. 输出前复权收盘价
print(data_front["close"])
# 2. 查询后复权数据
data_back = xtdata.get_market_data(
field_list=["open", "close", "high", "low"],
stock_list=["000001.SZ"],
period="1d",
start_time="20250102",
dividend_type="back" # 前复权
)
# 3. 输出后复权收盘价
print(data_back["close"])这里需要注意, 前复权价格大部分软件基本一致,而后复权一些软件的计算不一样。 比如常见的券商软件 的计算方式 是基于股票以上市首日为基准,而miniqmt采用递归法,从最新日期倒推计算历史复权价格。
这里可能还存在一种情况,有些同学可能需要把miniqmt获取的原始数据保存数据库中,后续使用用到前复权或后复权数据, 那怎么处理呢。
看过官方文档的同学应该看到这样一个示例
#coding:utf-8
import numpy as np
import pandas as pd
from xtquant import xtdata
#def gen_divid_ratio(quote_datas, divid_datas):
# drl = []
# for qi in range(len(quote_datas)):
# q = quote_datas.iloc[qi]
# dr = 1.0
# for di in range(len(divid_datas)):
# d = divid_datas.iloc[di]
# if d.name <= q.name:
# dr *= d['dr']
# drl.append(dr)
# return pd.DataFrame(drl, index = quote_datas.index, columns = quote_datas.columns)
def gen_divid_ratio(quote_datas, divid_datas):
drl = []
dr = 1.0
qi = 0
qdl = len(quote_datas)
di = 0
ddl = len(divid_datas)
while qi < qdl and di < ddl:
qd = quote_datas.iloc[qi]
dd = divid_datas.iloc[di]
if qd.name >= dd.name:
dr *= dd['dr']
di += 1
if qd.name <= dd.name:
drl.append(dr)
qi += 1
while qi < qdl:
drl.append(dr)
qi += 1
return pd.DataFrame(drl, index = quote_datas.index, columns = quote_datas.columns)
def process_forward_ratio(quote_datas, divid_datas):
drl = gen_divid_ratio(quote_datas, divid_datas)
drlf = drl / drl.iloc[-1]
result = (quote_datas * drlf).apply(lambda x: round(x, 2))
return result
def process_backward_ratio(quote_datas, divid_datas):
drl = gen_divid_ratio(quote_datas, divid_datas)
result = (quote_datas * drl).apply(lambda x: round(x, 2))
return result
def process_forward(quote_datas1, divid_datas):
quote_datas = quote_datas1.copy()
def calc_front(v, d):
return ((v - d['interest'] + d['allotPrice'] * d['allotNum'])
/ (1 + d['allotNum'] + d['stockBonus'] + d['stockGift']))
for qi in range(len(quote_datas)):
q = quote_datas.iloc[qi]
for di in range(len(divid_datas)):
d = divid_datas.iloc[di]
if d.name <= q.name:
continue
q.iloc[0] = calc_front(q.iloc[0], d)
return quote_datas
def process_backward(quote_datas1, divid_datas):
quote_datas = quote_datas1.copy()
def calc_back(v, d):
return ((v * (1.0 + d['stockGift'] + d['stockBonus'] + d['allotNum'])
+ d['interest'] - d['allotNum'] * d['allotPrice']))
for qi in range(len(quote_datas)):
q = quote_datas.iloc[qi]
for di in range(len(divid_datas) - 1, -1, -1):
d = divid_datas.iloc[di]
if d.name > q.name:
continue
q.iloc[0] = calc_back(q.iloc[0], d)
return quote_datas
#--------------------------------
s = '002594.SZ'
#xtdata.download_history_data(s, '1d', '20100101', '')
dd = xtdata.get_divid_factors(s)
print(dd)
#复权计算用于处理价格字段
field_list = ['open', 'high', 'low', 'close']
datas_ori = xtdata.get_market_data(field_list, [s], '1d', dividend_type = 'none')['close'].T
#print(datas_ori)
#等比前复权
datas_forward_ratio = process_forward_ratio(datas_ori, dd)
print('datas_forward_ratio', datas_forward_ratio)
#等比后复权
datas_backward_ratio = process_backward_ratio(datas_ori, dd)
print('datas_backward_ratio', datas_backward_ratio)
#前复权
datas_forward = process_forward(datas_ori, dd)
print('datas_forward', datas_forward)
#后复权
datas_backward = process_backward(datas_ori, dd)
print('datas_backward', datas_backward)
这时候技术方案有2种:
第一种数据表设计, 每天定时把这些数据都保存下来,然后进行处理。
原始行情表 | code, trade_date, open, close, ... | 存储未复权的开盘价、收盘价等 |
|---|---|---|
复权因子表 | code, ex_date, factor | 记录除权日及对应复权因子(动态更新) |
元信息表 | code, list_date, delist_date | 股票上市/退市日期,确定复权时间范围 |
第二种保存原始行情数据, 而复权数据 实时通过xtdata实时获取计算,参考官方的做法。
具体怎么用, 看下你对复权数据的使用场景和频次。
如果我的分享对你投资有所帮助,不吝啬给个点赞关注呗。