首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >miniqmt、backtrader 多资产回测演示例子

miniqmt、backtrader 多资产回测演示例子

作者头像
子晓聊技术
发布2026-04-23 16:53:07
发布2026-04-23 16:53:07
1290
举报
文章被收录于专栏:子晓AI量化子晓AI量化

之前写了一篇文章 miniqmt、backtesting实现双均线策略例子 ,有同学问我多资产回测怎么处理。 backtesting并不直接支持多股票组合回测。backtesting.py 框架的核心设计是针对单一资产的策略回测, 如果通过循环多支股票回测,其实无法模拟真实组合。

那怎么办呢,我这里用backtrader实现下。

Backtrader 是一个基于Python 实现功能强大且开源的量化回测交易框架,相对于其他专门用于策略回测的框架,Backtrader 不仅仅是可以回测交易策略,还可以连接Broker 实盘交易。 Backtrader 有着完整的基础设施,支持编写可重用组件,如策略、指标和分析器,支持多品种多周期多策略的回测。

这篇将Backtrader与MiniQMT结合,我们既能利用Backtrader强大的回测引擎,又能获取MiniQMT提供的准确、完整的历史数据,实现专业级的量化回测。

这里列一下代码处理步骤:

1、以多股票为例, 获取多股票真实数据

2、创建Cerebro回测引擎,添加股票、 把股票添加到策略中,设置初始资金、设置佣金,添加分析器 。

3、执行回测

4、安全地获取和分析回测结果

5、详细性能指标输出

6. 各股票表现统计

7. 绘制回测图表

这里贴一下完整代码,参考下思路, 具体根据自己的实际情况改造。 备注:如果发现格式有多余的特殊字符,用普通浏览器打开复制应该没问题。 希望我的分享对大家有所帮助。

代码语言:javascript
复制
# -*- coding: utf-8 -*-
import pandas as pd
import numpy as np
import backtrader as bt
from datetime import datetime
import time
from xtquant import xtdata
class MultiStockDualMovingAverageStrategy(bt.Strategy):
    """
    多股票双均线交叉策略回测类
    基于MiniQMT的xtdata数据源
    """
    params = (
        ('short_window', 5),
        ('long_window', 20),
        ('printlog', True)
    )
    def __init__(self):
        """
        初始化函数:为每只股票创建独立的技术指标
        """
        # 为每个数据源创建独立的技术指标字典
        self.indicators = {}
        self.orders = {}  # 跟踪每只股票的订单状态
        for i, data in enumerate(self.datas):
            symbol = data._name
            self.indicators[symbol] = {
                'short_ma': bt.indicators.SMA(
                    data.close, period=self.params.short_window
                ),
                'long_ma': bt.indicators.SMA(
                    data.close, period=self.params.long_window
                ),
                'crossover': bt.indicators.CrossOver(
                    bt.indicators.SMA(data.close, period=self.params.short_window),
                    bt.indicators.SMA(data.close, period=self.params.long_window)
                )
            }
            self.orders[symbol] = None
        self.trade_count = 0
        self.entry_dates = {}  # 记录每只股票的入场日期
    def log(self, txt, dt=None, doprint=False):
        """日志记录函数"""
        if self.params.printlog or doprint:
            dt = dt or self.datas[0].datetime.date(0)
            print(f'{dt.isoformat()}, {txt}')
    def notify_order(self, order):
        """
        订单状态通知
        """
        if order.status in [order.Submitted, order.Accepted]:
            return
        # 获取股票代码
        symbol = order.data._name
        if order.status in [order.Completed]:
            if order.isbuy():
                self.log(
                    f'{symbol} 买入执行, 价格: {order.executed.price:.2f}, 成本: {order.executed.value:.2f}, 佣金: {order.executed.comm:.2f}')
                self.entry_dates[symbol] = self.datetime.date(0)  # 记录入场日期
            elif order.issell():
                self.log(
                    f'{symbol} 卖出执行, 价格: {order.executed.price:.2f}, 成本: {order.executed.value:.2f}, 佣金: {order.executed.comm:.2f}')
                if symbol in self.entry_dates:
                    del self.entry_dates[symbol]
            self.trade_count += 1
            self.orders[symbol] = None
        elif order.status in [order.Canceled, order.Margin, order.Rejected]:
            self.log(f'{symbol} 订单取消/保证金不足/拒绝')
            self.orders[symbol] = None
    def notify_trade(self, trade):
        """交易结果通知"""
        if trade.isclosed:
            symbol = trade.data._name
            self.log(f'{symbol} 交易利润, 毛利润: {trade.pnl:.2f}, 净利润: {trade.pnlcomm:.2f}')
    def next(self):
        """
        每个交易日执行的主要交易逻辑
        """
        # 遍历所有股票执行交易逻辑[1](@ref)
        for i, data in enumerate(self.datas):
            symbol = data._name
            # 跳过已有未完成订单的股票
            if self.orders[symbol]:
                continue
            # 检查数据长度是否足够
            if len(data) < self.params.long_window:
                continue
            indicator = self.indicators[symbol]
            # 双均线金叉信号:买入
            if not self.getposition(data) and indicator['crossover'] > 0:
                # 等权重分配资金到每只股票
                available_cash_per_stock = self.broker.getcash() / len(self.datas)
                size = int(available_cash_per_stock * 0.95 / data.close[0])
                if size > 0:
                    self.orders[symbol] = self.buy(data=data, size=size)
                    self.log(f'{symbol} 金叉信号,买入 {size} 股')
            # 双均线死叉信号:卖出
            elif self.getposition(data) and indicator['crossover'] < 0:
                self.orders[symbol] = self.close(data=data)
                self.log(f'{symbol} 死叉信号,平仓卖出')
def get_miniqmt_data(stock_code, start_time, end_time, period='1d'):
    """
    使用miniQMT的xtdata接口获取股票历史数据
    """
    print(f"正在获取 {stock_code} 从 {start_time} 到 {end_time} 的{period}数据...")
    try:
        # 下载历史数据到本地缓存
        print("步骤1: 下载历史数据...")
        xtdata.download_history_data2(
            stock_list=[stock_code],
            period=period,
            start_time=start_time,
            end_time=end_time
        )
        # 等待数据下载完成
        time.sleep(3)
        # 获取数据
        print("步骤2: 从本地缓存读取数据...")
        raw_data = xtdata.get_market_data_ex(
            field_list=['open', 'high', 'low', 'close', 'volume', 'amount', 'time'],
            stock_list=[stock_code],
            period=period,
            start_time=start_time,
            end_time=end_time
        )
        if not raw_data or stock_code not in raw_data:
            print(f"获取数据失败,请检查股票代码 {stock_code} 是否正确或数据是否存在")
            return None
        # 数据格式转换
        stock_data = raw_data[stock_code]
        # 确保数据是DataFrame格式
        if not isinstance(stock_data, pd.DataFrame):
            print("获取的数据格式不符合预期")
            return None
        # 重命名列以符合backtrader的要求
        column_mapping = {
            'open': 'Open',
            'high': 'High',
            'low': 'Low',
            'close': 'Close',
            'volume': 'Volume'
        }
        # 应用列名映射
        stock_data = stock_data.rename(columns=column_mapping)
        # 确保包含所有必需的列
        required_columns = ['Open', 'High', 'Low', 'Close', 'Volume']
        missing_columns = [col for col in required_columns if col not in stock_data.columns]
        if missing_columns:
            print(f"数据缺少必要的列: {missing_columns}")
            print(f"可用的列: {list(stock_data.columns)}")
            return None
        # 选择所需的列并确保数据类型正确
        data_df = stock_data[required_columns].copy()
        data_df = data_df.apply(pd.to_numeric, errors='coerce')
        data_df = data_df.dropna()
        # 确保索引是日期时间类型
        if not isinstance(data_df.index, pd.DatetimeIndex):
            try:
                data_df.index = pd.to_datetime(data_df.index)
            except:
                print("无法将索引转换为日期时间格式,将使用默认索引")
        print(f"{stock_code} 数据获取成功!共获取 {len(data_df)} 条记录")
        return data_df
    except Exception as e:
        print(f"获取 {stock_code} 数据过程中发生错误: {e}")
        return None
def get_multiple_stock_data(stock_codes, start_time, end_time, period='1d'):
    """
    获取多只股票历史数据
    """
    all_data = {}
    for stock_code in stock_codes:
        print(f"\n正在获取 {stock_code} 数据...")
        single_data = get_miniqmt_data(stock_code, start_time, end_time, period)
        if single_data is not None and not single_data.empty:
            all_data[stock_code] = single_data
        else:
            print(f"警告: 无法获取 {stock_code} 的数据,将跳过该股票")
    return all_data
def enhanced_safe_get_analysis_value(analyzer, keys, default=0):
    try:
        analysis = analyzer.get_analysis()
        if not analysis:
            return default
        # 支持嵌套键访问
        value = analysis
        if isinstance(keys, str):
            keys = [keys]
        for key in keys:
            if isinstance(value, dict) and key in value:
                value = value[key]
            else:
                return default
        # 特殊处理:当值为None或NaN时返回默认值
        if value is None or (isinstance(value, float) and np.isnan(value)):
            return default
        return float(value) if value is not None else default
    except (KeyError, TypeError, ValueError, AttributeError) as e:
        print(f"获取分析器值错误: {e}")
        return default
def run_multi_stock_backtest():
    """
    主函数:执行多股票回测
    """
    # --- 配置参数 ---
    STOCK_CODES = ['510300.SH', '510500.SH', '159915.SZ']  # 多只ETF
    START_TIME = '20240101'
    END_TIME = '20251024'
    INITIAL_CASH = 1000000
    COMMISSION = 0.002
    # 短期和长期均线参数
    SHORT_WINDOW = 5
    LONG_WINDOW = 20
    print("=" * 60)
    print("MiniQMT多股票双均线策略回测系统")
    print("=" * 60)
    # 1. 获取多股票真实数据
    print("\n1. 从MiniQMT获取多股票行情数据...")
    stocks_data = get_multiple_stock_data(STOCK_CODES, START_TIME, END_TIME)
    if not stocks_data:
        print("数据获取失败,请检查网络连接、股票代码和时间参数")
        return
    print(f"\n成功获取 {len(stocks_data)} 只股票的数据")
    # 2. 创建Cerebro回测引擎
    print("\n2. 初始化Backtrader回测环境...")
    cerebro = bt.Cerebro()
    # 添加策略
    cerebro.addstrategy(MultiStockDualMovingAverageStrategy,
                        short_window=SHORT_WINDOW,
                        long_window=LONG_WINDOW)
    # 添加多股票数据到引擎
    for symbol, data_df in stocks_data.items():
        data_feed = bt.feeds.PandasData(dataname=data_df, name=symbol)
        cerebro.adddata(data_feed)
        print(f"已添加股票: {symbol}")
    # 设置初始资金
    cerebro.broker.setcash(INITIAL_CASH)
    # 设置佣金
    cerebro.broker.setcommission(commission=COMMISSION)
    # 添加分析器 
    cerebro.addanalyzer(bt.analyzers.Returns, _name='returns')
    cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe',
                        riskfreerate=0.02, timeframe=bt.TimeFrame.Days)
    cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown')
    cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name='trades')
    cerebro.addanalyzer(bt.analyzers.PyFolio, _name='pyfolio')
    cerebro.addanalyzer(bt.analyzers.TimeReturn, _name='timereturn')
    # 添加观察者
    cerebro.addobserver(bt.observers.Value)
    cerebro.addobserver(bt.observers.BuySell)
    cerebro.addobserver(bt.observers.DrawDown)
    # 3. 运行回测
    print("\n3. 运行多股票双均线策略回测...")
    print(f'初始资金: ¥{cerebro.broker.getvalue():,.2f}')
    print(f'回测股票数量: {len(stocks_data)}')
    print(f'回测期间: {START_TIME} 至 {END_TIME}')
    # 运行回测并获取结果
    strategies = cerebro.run()
    strat = strategies[0]
    final_value = cerebro.broker.getvalue()
    print(f'最终资金: ¥{final_value:,.2f}')
    # 4. 安全地获取和分析回测结果
    print("\n" + "=" * 60)
    print("多股票回测结果统计")
    print("=" * 60)
    # 使用增强版方法获取各项指标
    sharpe_ratio = enhanced_safe_get_analysis_value(strat.analyzers.sharpe, 'sharperatio', 0)
    total_return = enhanced_safe_get_analysis_value(strat.analyzers.returns, 'rtot', 0) * 100
    max_drawdown = enhanced_safe_get_analysis_value(strat.analyzers.drawdown, ['max', 'drawdown'], 0)
    # 交易统计
    trade_analysis = strat.analyzers.trades.get_analysis()
    total_trades = trade_analysis.get('total', {}).get('total', 0)
    won_trades = trade_analysis.get('won', {}).get('total', 0)
    win_rate = (won_trades / total_trades * 100) if total_trades > 0 else 0
    # 计算年化收益率
    days_in_period = max([len(data) for data in stocks_data.values()]) if stocks_data else 252
    annualized_return = ((final_value / INITIAL_CASH) ** (252 / days_in_period) - 1) * 100 if days_in_period > 0 else 0
    print(f"标的股票: {', '.join(STOCK_CODES)}")
    print(f"策略参数: {SHORT_WINDOW}日/{LONG_WINDOW}日均线")
    print(f"初始资金: ¥{INITIAL_CASH:,.2f}")
    print(f"最终净值: ¥{final_value:,.2f}")
    print(f"总收益率: {total_return:.2f}%")
    print(f"年化收益率: {annualized_return:.2f}%")
    print(f"夏普比率: {sharpe_ratio:.4f}")
    print(f"最大回撤: {max_drawdown:.2f}%")
    print(f"总交易次数: {total_trades}")
    print(f"胜率: {win_rate:.2f}%")
    # 5. 详细性能指标输出
    print("\n详细性能指标:")
    analyzers_to_check = [
        ('夏普比率', strat.analyzers.sharpe, 'sharperatio'),
        ('总收益率', strat.analyzers.returns, 'rtot'),
        ('最大回撤', strat.analyzers.drawdown, ['max', 'drawdown']),
    ]
    for name, analyzer, keys in analyzers_to_check:
        value = enhanced_safe_get_analysis_value(analyzer, keys, 'N/A')
        print(f"{name}: {value}")
    # 6. 各股票表现统计
    print("\n各股票持仓统计:")
    for i, data in enumerate(strat.datas):
        symbol = data._name
        position = strat.getposition(data)
        print(f"{symbol}: 持仓 {position.size} 股, 当前价格 {data.close[0]:.2f}")
    # 7. 绘制回测图表
    try:
        print("\n5. 生成回测图表...")
        cerebro.plot(style='candlestick', volume=True, figsize=(15, 10))
    except Exception as e:
        print(f"图表生成失败: {e}")
        print("这可能是因为数据量过大或图表配置问题,但不影响回测结果准确性")
if __name__ == '__main__':
    print("请注意: 运行前请确保已启动MiniQMT并成功登录!")
    run_multi_stock_backtest()

如果我的分享对你投资有所帮助,不吝啬给个点赞关注呗。 这个号主要分享AI量化技术相关, 当天的灵感相关记录,相对比较杂。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-10-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 子晓聊技术 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 这里贴一下完整代码,参考下思路, 具体根据自己的实际情况改造。 备注:如果发现格式有多余的特殊字符,用普通浏览器打开复制应该没问题。 希望我的分享对大家有所帮助。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档