首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >量化之通达信条件预警miniqmt自动下单例子

量化之通达信条件预警miniqmt自动下单例子

作者头像
子晓聊技术
发布2026-04-23 15:20:21
发布2026-04-23 15:20:21
1680
举报
文章被收录于专栏:子晓AI量化子晓AI量化

我认识的同学中,用通达信的居多,之前我也推荐过开源项目 tdxtrader。一些同学使用开源项目运行下单遇到问题。

这里我提供一个例子演示通达信条件预警miniqmt下单例子。

先说下通达信使用步骤

1、先安装通达信,这个不用多说。打开通达信pc端,我提前盘后下载补充好日线数据。

怎么设置条件预警呢。右上角功能-预警和复盘-条件预警设置。

2、添加预警股票池:预警品种设置-添加品种,选择所需股票。这里可以自行选择自己所需要监控的股票,比如说整个上证A股的所有股票。

3、添加预警公式:预警公式设置-添加公式,选择自己所需的预警条件,比如“连涨5天” 举例。这里可以选择通达信自带的那些指标公式,也可以根据需求自己先有个公式。

4、在电脑上新建个“sign.txt”文本文档,后在通达信“其他设置”里选择“全部预警结果输出到以下文件”并选中之前新建的“sign.txt”即可。

网上很多大QMT读取通达信预警的例子,我这里以miniqmt作为演示。

以国金为例,勾选独立交易, 就是开启了miniqmt。

修改代码中的账号 和 路径, 自己设置的通达信预警条件。 下单演示效果

这里贴一下完整代码,参考下思路, 具体根据自己的实际情况改造。 备注:如果发现格式有多余的特殊字符,用普通浏览器打开复制应该没问题。

代码语言:javascript
复制
from xtquant import xttrader, xtconstant, xtdata
from xtquant.xttype import StockAccount
from xtquant.xttrader import XtQuantTraderCallback
import logging
import time
import pandas as pd

# 日志配置
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

# 参数配置
account_id = 'XXXXX'  # 账号ID
mini_qmt_path = r'D:/国金QMT交易端模拟/userdata_mini'
file_path = r'D:/new_tdx/sign.txt'
interval = 1  # 轮询间隔(秒)
buy_sign = '连涨数天'
sell_sign = 'XXXX'
session_id = int(time.time() * 1000)  # 唯一会话ID

# 全局状态跟踪
pending_orders = {}  # 格式: {股票代码: 最后提交时间}
held_positions = {}  # 格式: {股票代码: 持仓数量}


# --- 异步回调处理器 ---
class OrderCallback(XtQuantTraderCallback):
    def on_stock_order(self, order):
        """订单状态更新回调"""
        stock_code = order.stock_code.split('.')[0]  # 去除后缀

        # 更新委托状态
        if order.order_status in [1, 4]:  # 已报/部成
            pending_orders[stock_code] = time.time()
        elif order.order_status in [2, 3, 5]:  # 成交/已撤/废单
            if stock_code in pending_orders:
                del pending_orders[stock_code]

        logging.info(
            f"委托更新: ID={order.order_id}, "
            f"状态={self._get_status_desc(order.order_status)}, "
            f"股票={stock_code}"
        )

    def on_stock_trade(self, trade):
        """成交回报回调"""
        stock_code = trade.stock_code.split('.')[0]  # 去除后缀

        # 更新持仓
        if trade.trade_type == 0:  # 买入
            held_positions[stock_code] = held_positions.get(stock_code, 0) + trade.traded_volume
        elif trade.trade_type == 1:  # 卖出
            held_positions[stock_code] = held_positions.get(stock_code, 0) - trade.traded_volume
            if held_positions.get(stock_code, 0) <= 0:
                if stock_code in held_positions:
                    del held_positions[stock_code]

        # 如果完全成交,从pending_orders中移除
        if stock_code in pending_orders:
            del pending_orders[stock_code]

        logging.info(
            f"成交回报: {stock_code}, "
            f"数量={trade.traded_volume}, 价格={trade.traded_price}"
        )

    def on_order_error(self, error):
        """委托失败回调"""
        if hasattr(error, 'error_msg'):
            msg = error.error_msg
        else:
            msg = str(error)
        logging.error(f"委托失败: {msg}")

    def _get_status_desc(self, status_code):
        """转换状态码为文字描述"""
        status_map = {
            0: "未报", 1: "已报", 2: "已成交",
            3: "已撤", 4: "部成", 5: "废单"
        }
        return status_map.get(status_code, f"未知状态({status_code})")


# --- 持仓初始化函数 ---
def init_positions():
    """初始化时获取当前持仓"""
    global held_positions
    try:
        if not xt_trader or not account:
            return

        positions = xt_trader.query_stock_positions(account)
        if positions is None:
            logging.error("持仓查询失败")
            return

        for pos in positions:
            code = pos.stock_code.split('.')[0]
            held_positions[code] = pos.volume

        logging.info(f"初始持仓加载完成: {len(held_positions)}只股票")
    except Exception as e:
        logging.error(f"持仓初始化异常: {e}", exc_info=True)


# --- 交易初始化 ---
def init_trader():
    try:
        xt_trader = xttrader.XtQuantTrader(mini_qmt_path, session_id)
        account = StockAccount(account_id)
        callback = OrderCallback()

        # 注册回调并启动
        xt_trader.register_callback(callback)
        xt_trader.start()

        # 连接交易服务器
        if xt_trader.connect() != 0:
            logging.error("交易服务器连接失败")
            return None, None

        # 订阅账户
        if xt_trader.subscribe(account) != 0:
            logging.error("账户订阅失败")
            return None, None

        logging.info("交易系统初始化完成")

        # 初始化持仓
        init_positions()
        return xt_trader, account

    except Exception as e:
        logging.error(f"初始化异常: {e}", exc_info=True)
        return None, None


# 全局交易对象
xt_trader, account = init_trader()


# --- 股票代码格式化 ---
def format_stock_code(code):
    """统一处理市场后缀"""
    code = code.strip()
    if code.startswith(('6', '9', '688')):  # 沪市/科创板
        return f"{code}.SH"
    elif code.startswith(('0', '3')):  # 深市/创业板
        return f"{code}.SZ"
    else:
        logging.error(f"无效股票代码: {code}")
        return None


# --- 买入事件处理(增加持仓检查)---
def buy_event(params):
    try:
        if not xt_trader or not account:
            logging.error("交易未就绪")
            return None

        stock_info = params.get('stock')
        stock_code = stock_info['code'] if isinstance(stock_info, pd.Series) else stock_info
        if not stock_code:
            logging.error("买入信号缺少股票代码")
            return None

        # 检查1:是否已有持仓
        if stock_code in held_positions:
            logging.warning(f"跳过买入: {stock_code} 已有持仓")
            return None

        # 检查2:是否有未完成委托
        if stock_code in pending_orders:
            # 检查委托是否超时(超过60秒)
            if time.time() - pending_orders[stock_code] < 60:
                logging.warning(f"跳过买入: {stock_code} 已有未完成委托")
                return None
            else:
                logging.warning(f"移除超时委托记录: {stock_code}")
                del pending_orders[stock_code]

        formatted_code = format_stock_code(stock_code)
        if not formatted_code:
            return None

        # 获取实时价格
        current_price = float(stock_info.get('price', 0))
        if current_price <= 0:
            # 备选方案:通过xtdata获取最新价
            quote = xtdata.get_full_tick([formatted_code])
            current_price = quote[formatted_code]['lastPrice']
            if current_price <= 0:
                logging.error(f"无法获取有效价格: {formatted_code}")
                return None

        logging.info(f"触发买入: {formatted_code} @ {current_price}")

        # 异步下单
        async_seq = xt_trader.order_stock_async(
            account=account,
            stock_code=formatted_code,
            order_type=xtconstant.STOCK_BUY,
            order_volume=100,
            price_type=xtconstant.FIX_PRICE,
            price=current_price,
            strategy_name="QuantStrategy",
            order_remark=buy_sign
        )

        # 记录委托
        pending_orders[stock_code] = time.time()
        logging.info(f"买入委托已提交 | 代码: {formatted_code} | 异步序列: {async_seq}")
        return async_seq  # 返回整数用于后续查询

    except Exception as e:
        logging.error(f"买入异常: {e}", exc_info=True)
        return None


# --- 卖出事件处理(增加持仓检查)---
def sell_event(params):
    try:
        if not xt_trader or not account:
            logging.error("交易未就绪")
            return None

        stock_info = params.get('stock')
        position = params.get('position')  # 需确保传入持仓对象
        stock_code = stock_info['code'] if isinstance(stock_info, pd.Series) else stock_info
        if not stock_code:
            logging.error("卖出信号缺少股票代码")
            return None

        # 检查1:是否实际持有该股票
        if stock_code not in held_positions:
            logging.warning(f"跳过卖出: {stock_code} 无持仓")
            return None

        # 检查2:是否有未完成卖出委托
        if stock_code in pending_orders:
            # 检查委托是否超时(超过60秒)
            if time.time() - pending_orders[stock_code] < 60:
                logging.warning(f"跳过卖出: {stock_code} 已有未完成委托")
                return None
            else:
                logging.warning(f"移除超时委托记录: {stock_code}")
                del pending_orders[stock_code]

        formatted_code = format_stock_code(stock_code)
        if not formatted_code:
            return None

        # 使用实际持仓量而不是position对象
        volume = held_positions.get(stock_code, 0)
        if volume <= 0:
            logging.warning(f"无可用持仓: {formatted_code}")
            return None

        logging.info(f"触发卖出: {formatted_code} x {volume}")

        # 异步市价单
        async_seq = xt_trader.order_stock_async(
            account=account,
            stock_code=formatted_code,
            order_type=xtconstant.STOCK_SELL,
            order_volume=volume,
            price_type=xtconstant.LATEST_PRICE,
            price=-1,  # 市价单无需指定价格
            strategy_name="QuantStrategy",
            order_remark=sell_sign
        )

        # 记录委托
        pending_orders[stock_code] = time.time()
        logging.info(f"卖出委托已提交 | 代码: {formatted_code} | 异步序列: {async_seq}")
        return async_seq

    except Exception as e:
        logging.error(f"卖出异常: {e}", exc_info=True)
        return None


# --- 主程序 ---
if __name__ == "__main__":
    try:
        # 启动事件监听器
        import tdxtrader

        logging.info("启动交易事件循环...")

        tdxtrader.start(
            account_id=account_id,
            mini_qmt_path=mini_qmt_path,
            file_path=file_path,
            interval=interval,
            buy_sign=buy_sign,
            sell_sign=sell_sign,
            buy_event=buy_event,
            sell_event=sell_event,
            cancel_after=10
        )

        # 主线程保活
        while True:
            time.sleep(5)
            logging.debug("心跳检测: 交易线程运行中...")

    except Exception as e:
        logging.critical(f"系统崩溃: {e}", exc_info=True)
    finally:
        if xt_trader:
            xt_trader.stop()
            logging.info("交易线程安全退出")

如果我的分享对你投资有所帮助,不吝啬给个点赞关注呗。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-08-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 子晓聊技术 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档