
我认识的同学中,用通达信的居多,之前我也推荐过开源项目 tdxtrader。一些同学使用开源项目运行下单遇到问题。
这里我提供一个例子演示通达信条件预警miniqmt下单例子。
先说下通达信使用步骤
1、先安装通达信,这个不用多说。打开通达信pc端,我提前盘后下载补充好日线数据。
怎么设置条件预警呢。右上角功能-预警和复盘-条件预警设置。
2、添加预警股票池:预警品种设置-添加品种,选择所需股票。这里可以自行选择自己所需要监控的股票,比如说整个上证A股的所有股票。
3、添加预警公式:预警公式设置-添加公式,选择自己所需的预警条件,比如“连涨5天” 举例。这里可以选择通达信自带的那些指标公式,也可以根据需求自己先有个公式。
4、在电脑上新建个“sign.txt”文本文档,后在通达信“其他设置”里选择“全部预警结果输出到以下文件”并选中之前新建的“sign.txt”即可。
网上很多大QMT读取通达信预警的例子,我这里以miniqmt作为演示。
以国金为例,勾选独立交易, 就是开启了miniqmt。
修改代码中的账号 和 路径, 自己设置的通达信预警条件。 下单演示效果

这里贴一下完整代码,参考下思路, 具体根据自己的实际情况改造。 备注:如果发现格式有多余的特殊字符,用普通浏览器打开复制应该没问题。
from xtquant import xttrader, xtconstant, xtdata
from xtquant.xttype import StockAccount
from xtquant.xttrader import XtQuantTraderCallback
import logging
import time
import pandas as pd
# 日志配置
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# 参数配置
account_id = 'XXXXX' # 账号ID
mini_qmt_path = r'D:/国金QMT交易端模拟/userdata_mini'
file_path = r'D:/new_tdx/sign.txt'
interval = 1 # 轮询间隔(秒)
buy_sign = '连涨数天'
sell_sign = 'XXXX'
session_id = int(time.time() * 1000) # 唯一会话ID
# 全局状态跟踪
pending_orders = {} # 格式: {股票代码: 最后提交时间}
held_positions = {} # 格式: {股票代码: 持仓数量}
# --- 异步回调处理器 ---
class OrderCallback(XtQuantTraderCallback):
def on_stock_order(self, order):
"""订单状态更新回调"""
stock_code = order.stock_code.split('.')[0] # 去除后缀
# 更新委托状态
if order.order_status in [1, 4]: # 已报/部成
pending_orders[stock_code] = time.time()
elif order.order_status in [2, 3, 5]: # 成交/已撤/废单
if stock_code in pending_orders:
del pending_orders[stock_code]
logging.info(
f"委托更新: ID={order.order_id}, "
f"状态={self._get_status_desc(order.order_status)}, "
f"股票={stock_code}"
)
def on_stock_trade(self, trade):
"""成交回报回调"""
stock_code = trade.stock_code.split('.')[0] # 去除后缀
# 更新持仓
if trade.trade_type == 0: # 买入
held_positions[stock_code] = held_positions.get(stock_code, 0) + trade.traded_volume
elif trade.trade_type == 1: # 卖出
held_positions[stock_code] = held_positions.get(stock_code, 0) - trade.traded_volume
if held_positions.get(stock_code, 0) <= 0:
if stock_code in held_positions:
del held_positions[stock_code]
# 如果完全成交,从pending_orders中移除
if stock_code in pending_orders:
del pending_orders[stock_code]
logging.info(
f"成交回报: {stock_code}, "
f"数量={trade.traded_volume}, 价格={trade.traded_price}"
)
def on_order_error(self, error):
"""委托失败回调"""
if hasattr(error, 'error_msg'):
msg = error.error_msg
else:
msg = str(error)
logging.error(f"委托失败: {msg}")
def _get_status_desc(self, status_code):
"""转换状态码为文字描述"""
status_map = {
0: "未报", 1: "已报", 2: "已成交",
3: "已撤", 4: "部成", 5: "废单"
}
return status_map.get(status_code, f"未知状态({status_code})")
# --- 持仓初始化函数 ---
def init_positions():
"""初始化时获取当前持仓"""
global held_positions
try:
if not xt_trader or not account:
return
positions = xt_trader.query_stock_positions(account)
if positions is None:
logging.error("持仓查询失败")
return
for pos in positions:
code = pos.stock_code.split('.')[0]
held_positions[code] = pos.volume
logging.info(f"初始持仓加载完成: {len(held_positions)}只股票")
except Exception as e:
logging.error(f"持仓初始化异常: {e}", exc_info=True)
# --- 交易初始化 ---
def init_trader():
try:
xt_trader = xttrader.XtQuantTrader(mini_qmt_path, session_id)
account = StockAccount(account_id)
callback = OrderCallback()
# 注册回调并启动
xt_trader.register_callback(callback)
xt_trader.start()
# 连接交易服务器
if xt_trader.connect() != 0:
logging.error("交易服务器连接失败")
return None, None
# 订阅账户
if xt_trader.subscribe(account) != 0:
logging.error("账户订阅失败")
return None, None
logging.info("交易系统初始化完成")
# 初始化持仓
init_positions()
return xt_trader, account
except Exception as e:
logging.error(f"初始化异常: {e}", exc_info=True)
return None, None
# 全局交易对象
xt_trader, account = init_trader()
# --- 股票代码格式化 ---
def format_stock_code(code):
"""统一处理市场后缀"""
code = code.strip()
if code.startswith(('6', '9', '688')): # 沪市/科创板
return f"{code}.SH"
elif code.startswith(('0', '3')): # 深市/创业板
return f"{code}.SZ"
else:
logging.error(f"无效股票代码: {code}")
return None
# --- 买入事件处理(增加持仓检查)---
def buy_event(params):
try:
if not xt_trader or not account:
logging.error("交易未就绪")
return None
stock_info = params.get('stock')
stock_code = stock_info['code'] if isinstance(stock_info, pd.Series) else stock_info
if not stock_code:
logging.error("买入信号缺少股票代码")
return None
# 检查1:是否已有持仓
if stock_code in held_positions:
logging.warning(f"跳过买入: {stock_code} 已有持仓")
return None
# 检查2:是否有未完成委托
if stock_code in pending_orders:
# 检查委托是否超时(超过60秒)
if time.time() - pending_orders[stock_code] < 60:
logging.warning(f"跳过买入: {stock_code} 已有未完成委托")
return None
else:
logging.warning(f"移除超时委托记录: {stock_code}")
del pending_orders[stock_code]
formatted_code = format_stock_code(stock_code)
if not formatted_code:
return None
# 获取实时价格
current_price = float(stock_info.get('price', 0))
if current_price <= 0:
# 备选方案:通过xtdata获取最新价
quote = xtdata.get_full_tick([formatted_code])
current_price = quote[formatted_code]['lastPrice']
if current_price <= 0:
logging.error(f"无法获取有效价格: {formatted_code}")
return None
logging.info(f"触发买入: {formatted_code} @ {current_price}")
# 异步下单
async_seq = xt_trader.order_stock_async(
account=account,
stock_code=formatted_code,
order_type=xtconstant.STOCK_BUY,
order_volume=100,
price_type=xtconstant.FIX_PRICE,
price=current_price,
strategy_name="QuantStrategy",
order_remark=buy_sign
)
# 记录委托
pending_orders[stock_code] = time.time()
logging.info(f"买入委托已提交 | 代码: {formatted_code} | 异步序列: {async_seq}")
return async_seq # 返回整数用于后续查询
except Exception as e:
logging.error(f"买入异常: {e}", exc_info=True)
return None
# --- 卖出事件处理(增加持仓检查)---
def sell_event(params):
try:
if not xt_trader or not account:
logging.error("交易未就绪")
return None
stock_info = params.get('stock')
position = params.get('position') # 需确保传入持仓对象
stock_code = stock_info['code'] if isinstance(stock_info, pd.Series) else stock_info
if not stock_code:
logging.error("卖出信号缺少股票代码")
return None
# 检查1:是否实际持有该股票
if stock_code not in held_positions:
logging.warning(f"跳过卖出: {stock_code} 无持仓")
return None
# 检查2:是否有未完成卖出委托
if stock_code in pending_orders:
# 检查委托是否超时(超过60秒)
if time.time() - pending_orders[stock_code] < 60:
logging.warning(f"跳过卖出: {stock_code} 已有未完成委托")
return None
else:
logging.warning(f"移除超时委托记录: {stock_code}")
del pending_orders[stock_code]
formatted_code = format_stock_code(stock_code)
if not formatted_code:
return None
# 使用实际持仓量而不是position对象
volume = held_positions.get(stock_code, 0)
if volume <= 0:
logging.warning(f"无可用持仓: {formatted_code}")
return None
logging.info(f"触发卖出: {formatted_code} x {volume}")
# 异步市价单
async_seq = xt_trader.order_stock_async(
account=account,
stock_code=formatted_code,
order_type=xtconstant.STOCK_SELL,
order_volume=volume,
price_type=xtconstant.LATEST_PRICE,
price=-1, # 市价单无需指定价格
strategy_name="QuantStrategy",
order_remark=sell_sign
)
# 记录委托
pending_orders[stock_code] = time.time()
logging.info(f"卖出委托已提交 | 代码: {formatted_code} | 异步序列: {async_seq}")
return async_seq
except Exception as e:
logging.error(f"卖出异常: {e}", exc_info=True)
return None
# --- 主程序 ---
if __name__ == "__main__":
try:
# 启动事件监听器
import tdxtrader
logging.info("启动交易事件循环...")
tdxtrader.start(
account_id=account_id,
mini_qmt_path=mini_qmt_path,
file_path=file_path,
interval=interval,
buy_sign=buy_sign,
sell_sign=sell_sign,
buy_event=buy_event,
sell_event=sell_event,
cancel_after=10
)
# 主线程保活
while True:
time.sleep(5)
logging.debug("心跳检测: 交易线程运行中...")
except Exception as e:
logging.critical(f"系统崩溃: {e}", exc_info=True)
finally:
if xt_trader:
xt_trader.stop()
logging.info("交易线程安全退出")如果我的分享对你投资有所帮助,不吝啬给个点赞关注呗。