
之前写了一篇文章 miniqmt、backtesting实现双均线策略例子 ,有同学问我多资产回测怎么处理。 backtesting并不直接支持多股票组合回测。backtesting.py 框架的核心设计是针对单一资产的策略回测, 如果通过循环多支股票回测,其实无法模拟真实组合。
那怎么办呢,我这里用backtrader实现下。
Backtrader 是一个基于Python 实现功能强大且开源的量化回测交易框架,相对于其他专门用于策略回测的框架,Backtrader 不仅仅是可以回测交易策略,还可以连接Broker 实盘交易。 Backtrader 有着完整的基础设施,支持编写可重用组件,如策略、指标和分析器,支持多品种多周期多策略的回测。
这篇将Backtrader与MiniQMT结合,我们既能利用Backtrader强大的回测引擎,又能获取MiniQMT提供的准确、完整的历史数据,实现专业级的量化回测。
这里列一下代码处理步骤:
1、以多股票为例, 获取多股票真实数据
2、创建Cerebro回测引擎,添加股票、 把股票添加到策略中,设置初始资金、设置佣金,添加分析器 。
3、执行回测
4、安全地获取和分析回测结果
5、详细性能指标输出
6. 各股票表现统计
7. 绘制回测图表
# -*- coding: utf-8 -*-
import pandas as pd
import numpy as np
import backtrader as bt
from datetime import datetime
import time
from xtquant import xtdata
class MultiStockDualMovingAverageStrategy(bt.Strategy):
"""
多股票双均线交叉策略回测类
基于MiniQMT的xtdata数据源
"""
params = (
('short_window', 5),
('long_window', 20),
('printlog', True)
)
def __init__(self):
"""
初始化函数:为每只股票创建独立的技术指标
"""
# 为每个数据源创建独立的技术指标字典
self.indicators = {}
self.orders = {} # 跟踪每只股票的订单状态
for i, data in enumerate(self.datas):
symbol = data._name
self.indicators[symbol] = {
'short_ma': bt.indicators.SMA(
data.close, period=self.params.short_window
),
'long_ma': bt.indicators.SMA(
data.close, period=self.params.long_window
),
'crossover': bt.indicators.CrossOver(
bt.indicators.SMA(data.close, period=self.params.short_window),
bt.indicators.SMA(data.close, period=self.params.long_window)
)
}
self.orders[symbol] = None
self.trade_count = 0
self.entry_dates = {} # 记录每只股票的入场日期
def log(self, txt, dt=None, doprint=False):
"""日志记录函数"""
if self.params.printlog or doprint:
dt = dt or self.datas[0].datetime.date(0)
print(f'{dt.isoformat()}, {txt}')
def notify_order(self, order):
"""
订单状态通知
"""
if order.status in [order.Submitted, order.Accepted]:
return
# 获取股票代码
symbol = order.data._name
if order.status in [order.Completed]:
if order.isbuy():
self.log(
f'{symbol} 买入执行, 价格: {order.executed.price:.2f}, 成本: {order.executed.value:.2f}, 佣金: {order.executed.comm:.2f}')
self.entry_dates[symbol] = self.datetime.date(0) # 记录入场日期
elif order.issell():
self.log(
f'{symbol} 卖出执行, 价格: {order.executed.price:.2f}, 成本: {order.executed.value:.2f}, 佣金: {order.executed.comm:.2f}')
if symbol in self.entry_dates:
del self.entry_dates[symbol]
self.trade_count += 1
self.orders[symbol] = None
elif order.status in [order.Canceled, order.Margin, order.Rejected]:
self.log(f'{symbol} 订单取消/保证金不足/拒绝')
self.orders[symbol] = None
def notify_trade(self, trade):
"""交易结果通知"""
if trade.isclosed:
symbol = trade.data._name
self.log(f'{symbol} 交易利润, 毛利润: {trade.pnl:.2f}, 净利润: {trade.pnlcomm:.2f}')
def next(self):
"""
每个交易日执行的主要交易逻辑
"""
# 遍历所有股票执行交易逻辑[1](@ref)
for i, data in enumerate(self.datas):
symbol = data._name
# 跳过已有未完成订单的股票
if self.orders[symbol]:
continue
# 检查数据长度是否足够
if len(data) < self.params.long_window:
continue
indicator = self.indicators[symbol]
# 双均线金叉信号:买入
if not self.getposition(data) and indicator['crossover'] > 0:
# 等权重分配资金到每只股票
available_cash_per_stock = self.broker.getcash() / len(self.datas)
size = int(available_cash_per_stock * 0.95 / data.close[0])
if size > 0:
self.orders[symbol] = self.buy(data=data, size=size)
self.log(f'{symbol} 金叉信号,买入 {size} 股')
# 双均线死叉信号:卖出
elif self.getposition(data) and indicator['crossover'] < 0:
self.orders[symbol] = self.close(data=data)
self.log(f'{symbol} 死叉信号,平仓卖出')
def get_miniqmt_data(stock_code, start_time, end_time, period='1d'):
"""
使用miniQMT的xtdata接口获取股票历史数据
"""
print(f"正在获取 {stock_code} 从 {start_time} 到 {end_time} 的{period}数据...")
try:
# 下载历史数据到本地缓存
print("步骤1: 下载历史数据...")
xtdata.download_history_data2(
stock_list=[stock_code],
period=period,
start_time=start_time,
end_time=end_time
)
# 等待数据下载完成
time.sleep(3)
# 获取数据
print("步骤2: 从本地缓存读取数据...")
raw_data = xtdata.get_market_data_ex(
field_list=['open', 'high', 'low', 'close', 'volume', 'amount', 'time'],
stock_list=[stock_code],
period=period,
start_time=start_time,
end_time=end_time
)
if not raw_data or stock_code not in raw_data:
print(f"获取数据失败,请检查股票代码 {stock_code} 是否正确或数据是否存在")
return None
# 数据格式转换
stock_data = raw_data[stock_code]
# 确保数据是DataFrame格式
if not isinstance(stock_data, pd.DataFrame):
print("获取的数据格式不符合预期")
return None
# 重命名列以符合backtrader的要求
column_mapping = {
'open': 'Open',
'high': 'High',
'low': 'Low',
'close': 'Close',
'volume': 'Volume'
}
# 应用列名映射
stock_data = stock_data.rename(columns=column_mapping)
# 确保包含所有必需的列
required_columns = ['Open', 'High', 'Low', 'Close', 'Volume']
missing_columns = [col for col in required_columns if col not in stock_data.columns]
if missing_columns:
print(f"数据缺少必要的列: {missing_columns}")
print(f"可用的列: {list(stock_data.columns)}")
return None
# 选择所需的列并确保数据类型正确
data_df = stock_data[required_columns].copy()
data_df = data_df.apply(pd.to_numeric, errors='coerce')
data_df = data_df.dropna()
# 确保索引是日期时间类型
if not isinstance(data_df.index, pd.DatetimeIndex):
try:
data_df.index = pd.to_datetime(data_df.index)
except:
print("无法将索引转换为日期时间格式,将使用默认索引")
print(f"{stock_code} 数据获取成功!共获取 {len(data_df)} 条记录")
return data_df
except Exception as e:
print(f"获取 {stock_code} 数据过程中发生错误: {e}")
return None
def get_multiple_stock_data(stock_codes, start_time, end_time, period='1d'):
"""
获取多只股票历史数据
"""
all_data = {}
for stock_code in stock_codes:
print(f"\n正在获取 {stock_code} 数据...")
single_data = get_miniqmt_data(stock_code, start_time, end_time, period)
if single_data is not None and not single_data.empty:
all_data[stock_code] = single_data
else:
print(f"警告: 无法获取 {stock_code} 的数据,将跳过该股票")
return all_data
def enhanced_safe_get_analysis_value(analyzer, keys, default=0):
try:
analysis = analyzer.get_analysis()
if not analysis:
return default
# 支持嵌套键访问
value = analysis
if isinstance(keys, str):
keys = [keys]
for key in keys:
if isinstance(value, dict) and key in value:
value = value[key]
else:
return default
# 特殊处理:当值为None或NaN时返回默认值
if value is None or (isinstance(value, float) and np.isnan(value)):
return default
return float(value) if value is not None else default
except (KeyError, TypeError, ValueError, AttributeError) as e:
print(f"获取分析器值错误: {e}")
return default
def run_multi_stock_backtest():
"""
主函数:执行多股票回测
"""
# --- 配置参数 ---
STOCK_CODES = ['510300.SH', '510500.SH', '159915.SZ'] # 多只ETF
START_TIME = '20240101'
END_TIME = '20251024'
INITIAL_CASH = 1000000
COMMISSION = 0.002
# 短期和长期均线参数
SHORT_WINDOW = 5
LONG_WINDOW = 20
print("=" * 60)
print("MiniQMT多股票双均线策略回测系统")
print("=" * 60)
# 1. 获取多股票真实数据
print("\n1. 从MiniQMT获取多股票行情数据...")
stocks_data = get_multiple_stock_data(STOCK_CODES, START_TIME, END_TIME)
if not stocks_data:
print("数据获取失败,请检查网络连接、股票代码和时间参数")
return
print(f"\n成功获取 {len(stocks_data)} 只股票的数据")
# 2. 创建Cerebro回测引擎
print("\n2. 初始化Backtrader回测环境...")
cerebro = bt.Cerebro()
# 添加策略
cerebro.addstrategy(MultiStockDualMovingAverageStrategy,
short_window=SHORT_WINDOW,
long_window=LONG_WINDOW)
# 添加多股票数据到引擎
for symbol, data_df in stocks_data.items():
data_feed = bt.feeds.PandasData(dataname=data_df, name=symbol)
cerebro.adddata(data_feed)
print(f"已添加股票: {symbol}")
# 设置初始资金
cerebro.broker.setcash(INITIAL_CASH)
# 设置佣金
cerebro.broker.setcommission(commission=COMMISSION)
# 添加分析器
cerebro.addanalyzer(bt.analyzers.Returns, _name='returns')
cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe',
riskfreerate=0.02, timeframe=bt.TimeFrame.Days)
cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown')
cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name='trades')
cerebro.addanalyzer(bt.analyzers.PyFolio, _name='pyfolio')
cerebro.addanalyzer(bt.analyzers.TimeReturn, _name='timereturn')
# 添加观察者
cerebro.addobserver(bt.observers.Value)
cerebro.addobserver(bt.observers.BuySell)
cerebro.addobserver(bt.observers.DrawDown)
# 3. 运行回测
print("\n3. 运行多股票双均线策略回测...")
print(f'初始资金: ¥{cerebro.broker.getvalue():,.2f}')
print(f'回测股票数量: {len(stocks_data)}')
print(f'回测期间: {START_TIME} 至 {END_TIME}')
# 运行回测并获取结果
strategies = cerebro.run()
strat = strategies[0]
final_value = cerebro.broker.getvalue()
print(f'最终资金: ¥{final_value:,.2f}')
# 4. 安全地获取和分析回测结果
print("\n" + "=" * 60)
print("多股票回测结果统计")
print("=" * 60)
# 使用增强版方法获取各项指标
sharpe_ratio = enhanced_safe_get_analysis_value(strat.analyzers.sharpe, 'sharperatio', 0)
total_return = enhanced_safe_get_analysis_value(strat.analyzers.returns, 'rtot', 0) * 100
max_drawdown = enhanced_safe_get_analysis_value(strat.analyzers.drawdown, ['max', 'drawdown'], 0)
# 交易统计
trade_analysis = strat.analyzers.trades.get_analysis()
total_trades = trade_analysis.get('total', {}).get('total', 0)
won_trades = trade_analysis.get('won', {}).get('total', 0)
win_rate = (won_trades / total_trades * 100) if total_trades > 0 else 0
# 计算年化收益率
days_in_period = max([len(data) for data in stocks_data.values()]) if stocks_data else 252
annualized_return = ((final_value / INITIAL_CASH) ** (252 / days_in_period) - 1) * 100 if days_in_period > 0 else 0
print(f"标的股票: {', '.join(STOCK_CODES)}")
print(f"策略参数: {SHORT_WINDOW}日/{LONG_WINDOW}日均线")
print(f"初始资金: ¥{INITIAL_CASH:,.2f}")
print(f"最终净值: ¥{final_value:,.2f}")
print(f"总收益率: {total_return:.2f}%")
print(f"年化收益率: {annualized_return:.2f}%")
print(f"夏普比率: {sharpe_ratio:.4f}")
print(f"最大回撤: {max_drawdown:.2f}%")
print(f"总交易次数: {total_trades}")
print(f"胜率: {win_rate:.2f}%")
# 5. 详细性能指标输出
print("\n详细性能指标:")
analyzers_to_check = [
('夏普比率', strat.analyzers.sharpe, 'sharperatio'),
('总收益率', strat.analyzers.returns, 'rtot'),
('最大回撤', strat.analyzers.drawdown, ['max', 'drawdown']),
]
for name, analyzer, keys in analyzers_to_check:
value = enhanced_safe_get_analysis_value(analyzer, keys, 'N/A')
print(f"{name}: {value}")
# 6. 各股票表现统计
print("\n各股票持仓统计:")
for i, data in enumerate(strat.datas):
symbol = data._name
position = strat.getposition(data)
print(f"{symbol}: 持仓 {position.size} 股, 当前价格 {data.close[0]:.2f}")
# 7. 绘制回测图表
try:
print("\n5. 生成回测图表...")
cerebro.plot(style='candlestick', volume=True, figsize=(15, 10))
except Exception as e:
print(f"图表生成失败: {e}")
print("这可能是因为数据量过大或图表配置问题,但不影响回测结果准确性")
if __name__ == '__main__':
print("请注意: 运行前请确保已启动MiniQMT并成功登录!")
run_multi_stock_backtest()如果我的分享对你投资有所帮助,不吝啬给个点赞关注呗。 这个号主要分享AI量化技术相关, 当天的灵感相关记录,相对比较杂。