这2天有同学在群里问qmt成交相关的细节, 我看了下之前的miniqmt代码 封装了 委托,没有封装成交明细, 趁着有空, 把之前的封装类 优化下, 增加 成交明细查询。
这样miniqmt交易封装包括了
1、miniqmt链接
2、查询资金账户
3、查询持仓
4、查询当日委托
5、查询成交明细
6、买
7、卖
这里说下4、5的区别, 比如你挂 某只股票 1200股, 而成交可能分为多笔,比如一笔300股,一笔900股。 那么成交明细就有2笔数据。
最近有同学问怎么区分买、卖, 正常情况下,买传递23, 卖传递24就可以了。
大家说一说,除了这些常用操作,miniqmt交易还有其他常用的操作么? 如果还有其他的常用方法,可以留言区说一说,我继续加一下。
这里说一下,为什么我喜欢miniqmt,作为一个程序员, 灵活方便, 我主要用它的行情类(日K、分K、tick等)、以及交易, 其他数据源我完全可以选择自己觉得方便的。
像一些同学关心的量比、换手率、概念什么的信息, 我完全可以不用xtquant的数据,用同花顺问财、开盘啦、 东方财富、tushare等其他数据源做技术方案。
import logging
import os.path
import time
import random
import os
import json
import pandas as pd
import psutil # 需要 pip install psutil
from pathlib import Path
from typing import Tuple, Union
from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
from xtquant.xttype import StockAccount
from xtquant import xtconstant, xtdata
# 初始化日志配置
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger('QMT_Trader')
#默认配置
DEFAULT_CONFIG = {
"trader_path": r"D:/lwj/国金QMT交易端模拟/userdata_mini",
"account": "XXXXXXX"
}
# 配置文件管理
def load_config():
"""加载配置文件"""
config_path = Path(__file__).parent / 'config.json'
try:
if config_path.exists():
with open(config_path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logger.error(f"配置文件加载失败: {str(e)}")
# 创建默认配置
with open(config_path, 'w', encoding='utf-8') as f:
json.dump(DEFAULT_CONFIG, f, indent=4)
return DEFAULT_CONFIG
# 常量定义
ORDER_TYPE_MAPPING = {
'buy': xtconstant.STOCK_BUY,
'sell': xtconstant.STOCK_SELL
}
STOCK_EXCHANGE_MAP = {
'60': '.SH',
'68': '.SH', # 科创板
'00': '.SZ',
'30': '.SZ', # 创业板
'92': '.BJ' # 北交所
}
def conv_time(ct: int) -> str:
'''
conv_time(1476374400000) --> '20161014000000.000'
'''
local_time = time.localtime(ct / 1000)
data_head = time.strftime('%Y%m%d%H%M%S', local_time)
data_secs = (ct - int(ct)) * 1000
time_stamp = f'{data_head}.{int(data_secs):03d}'
return time_stamp
class MyXtQuantTraderCallback(XtQuantTraderCallback):
"""交易回调处理类"""
def __init__(self, trader_instance):
super().__init__()
self.trader = trader_instance
def on_disconnected(self):
"""连接断开回调"""
logger.error("QMT连接断开,尝试重连...")
try:
self.trader.reconnect()
except:
logger.error("自动重连失败,请手动重连")
def on_stock_order(self, order):
"""委托回报推送"""
logger.info(f"委托更新: {order.stock_code} 状态={order.order_status} 系统ID={order.order_sysid}")
def on_stock_asset(self, asset):
"""资金变动推送"""
logger.info(f"资产更新: 账号={asset.account_id} 现金={asset.cash} 总资产={asset.total_asset}")
def on_stock_trade(self, trade):
"""成交变动推送"""
logger.info(f"成交: {trade.stock_code} 数量={trade.traded_volume} 价格={trade.traded_price}")
def on_stock_position(self, position):
"""持仓变动推送"""
logger.info(f"持仓更新: {position.stock_code} 数量={position.volume}")
def on_order_error(self, order_error):
"""委托失败推送"""
logger.error(f"委托失败: ID={order_error.order_id} 错误码={order_error.error_id} 信息={order_error.error_msg}")
def on_cancel_error(self, cancel_error):
"""撤单失败推送"""
logger.error(
f"撤单失败: ID={cancel_error.order_id} 错误码={cancel_error.error_id} 信息={cancel_error.error_msg}")
class EasyQMTTrader:
"""QMT交易接口优化版 - 增强连接诊断"""
def __init__(
self,
path: str = None,
session_id: int = None,
account: str = None,
account_type: str = 'STOCK',
is_slippage: bool = True,
slippage: float = 0.01
) -> None:
config = load_config()
self.path = path or config.get('trader_path', DEFAULT_CONFIG['trader_path'])
self.account = account or config.get('account', DEFAULT_CONFIG['account'])
self.session_id = session_id or self._generate_session_id()
self.account_type = account_type
self.slippage = slippage if is_slippage else 0.0
self.xt_trader = None
self.acc = None
self.callback = None
self.connected = False
def _generate_session_id(self) -> int:
"""生成随机会话ID"""
return random.randint(10000000, 99999999)
def check_qmt_status(self):
"""检查QMT软件运行状态"""
logger.info("=== QMT状态诊断 ===")
print(self)
# 检查路径
if os.path.exists(self.path):
logger.info(f"✅ QMT路径存在: {self.path}")
else:
logger.error(f"❌ QMT路径不存在: {self.path}")
return False
# 检查QMT进程
qmt_processes = []
for proc in psutil.process_iter(['pid', 'name', 'status']):
try:
proc_name = proc.info['name'].lower()
if any(x in proc_name for x in ['xtitclient', 'qmt', 'thinktrader']):
qmt_processes.append(proc.info)
logger.info(
f"✅ 发现QMT进程: {proc.info['name']} (PID: {proc.info['pid']}, 状态: {proc.info['status']})")
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
if not qmt_processes:
logger.error("❌ 未发现QMT相关进程!")
logger.error("请先启动QMT软件并登录账户")
return False
logger.info(f"✅ 发现 {len(qmt_processes)} 个QMT相关进程")
return True
def adjust_stock(self, stock: str) -> str:
"""标准化证券代码格式"""
if '.' in stock: # 已有后缀
return stock.upper()
prefix = stock[:2]
suffix = STOCK_EXCHANGE_MAP.get(prefix, '.SZ') # 默认深市
return f"{stock}{suffix}"
def get_security_type(self, stock: str) -> str:
"""获取证券类型"""
stock = self.adjust_stock(stock)
if stock[:3] in ['110', '113', '123', '127', '128', '111', '118'] or stock[:2] in ['11', '12']:
return 'bond'
elif stock[:3] in ['510', '511', '512', '513', '514', '515', '516', '517', '518', '588', '159', '501',
'164'] or stock[:2] in ['16']:
return 'fund'
else:
return 'stock'
def apply_slippage(self, stock: str, price: float, trade_type: Union[str, int]) -> float:
"""应用滑点调整"""
security_type = self.get_security_type(stock)
adjusted_price = price
if trade_type in ['buy', xtconstant.STOCK_BUY]:
if security_type in ['fund', 'bond']:
adjusted_price += self.slippage / 10
else:
adjusted_price += self.slippage
else: # sell
if security_type in ['fund', 'bond']:
adjusted_price -= self.slippage / 10
else:
adjusted_price -= self.slippage
logger.debug(f"滑点调整: {stock} 原价={price} 调整后={adjusted_price}")
return round(adjusted_price, 3) # 保留3位小数
def is_trading_time(
self,
trader_days: int = 4,
start_hour: int = 9,
end_hour: int = 15,
start_min: int = 30,
jhjj: bool = False
) -> bool:
"""验证当前是否为交易时间"""
jhjj_time = 15 if jhjj else 30
now = time.localtime()
weekday = now.tm_wday
if weekday > trader_days: # 周末
logger.warning("非交易日(周末)")
return False
hour, minute = now.tm_hour, now.tm_min
# 检查交易时段
if start_hour <= hour <= end_hour:
if hour == 9:
return minute >= jhjj_time
elif hour == 11 and minute > 30: # 午间休市
return False
elif hour == 12 and minute < 1: # 下午开盘
return minute >= 0
elif hour == end_hour and minute >= start_min:
return False
return True
return False
def connect(self) -> Tuple[XtQuantTrader, StockAccount]:
"""建立交易连接 - 增强诊断功能"""
if self.connected:
logger.info("已存在有效连接")
return self.xt_trader, self.acc
# 先检查QMT状态
if not self.check_qmt_status():
raise ConnectionError("QMT软件未正确启动,请先启动QMT并登录账户")
logger.info(f"连接QMT: path={self.path} session={self.session_id} account={self.account}")
self.xt_trader = XtQuantTrader(self.path, self.session_id)
self.acc = StockAccount(self.account, self.account_type)
self.callback = MyXtQuantTraderCallback(self)
try:
logger.info("注册回调函数...")
self.xt_trader.register_callback(self.callback)
logger.info("启动QMT交易接口...")
self.xt_trader.start()
logger.info("正在连接QMT...")
connect_result = self.xt_trader.connect()
logger.info(f"连接结果码: {connect_result}")
if connect_result == 0:
logger.info("✅ QMT连接成功!正在订阅账户...")
subscribe_result = self.xt_trader.subscribe(self.acc)
logger.info(f"账户订阅结果: {subscribe_result}")
# 测试账户连接
if self.test_account_connection():
self.connected = True
logger.info("🎉 QMT连接和账户验证成功!")
return self.xt_trader, self.acc
else:
logger.warning("⚠️ QMT连接成功但账户验证失败")
logger.warning("请确保在QMT中已登录账户: " + self.account)
# 仍然标记为已连接,允许用户尝试操作
self.connected = True
return self.xt_trader, self.acc
else:
error_msg = self._get_connect_error_msg(connect_result)
logger.error(f"❌ QMT连接失败: {error_msg}")
self._show_connection_help()
raise ConnectionError(f"QMT连接失败: {error_msg}")
except Exception as e:
logger.error(f"连接异常: {str(e)}")
self._show_connection_help()
raise
def test_account_connection(self):
"""测试账户连接"""
try:
logger.info("测试账户连接...")
asset = self.xt_trader.query_stock_asset(self.acc)
if asset and hasattr(asset, 'account_id'):
logger.info(f"✅ 账户验证成功: {asset.account_id}")
logger.info(f"账户总资产: {getattr(asset, 'total_asset', 'N/A')}")
return True
else:
logger.warning("❌ 账户查询返回空数据")
return False
except Exception as e:
logger.error(f"❌ 账户连接测试失败: {e}")
return False
def _get_connect_error_msg(self, error_code):
"""获取连接错误信息"""
error_map = {
-1: "连接失败 - QMT可能未启动或API未启用",
-2: "登录失败 - 请检查账户信息",
-3: "网络连接超时",
-4: "API接口未启用",
-5: "账户权限不足",
-6: "session冲突 - 请重启程序",
-7: "路径错误"
}
return error_map.get(error_code, f"未知错误码: {error_code}")
def _show_connection_help(self):
"""显示连接帮助信息"""
logger.error("=" * 50)
logger.error("QMT连接失败,请检查以下步骤:")
logger.error("1. 确保QMT软件已完全启动")
logger.error("2. 在QMT中登录账户: " + self.account)
logger.error("3. 启用QMT的API接口:")
logger.error(" - 菜单栏 → 工具 → API管理")
logger.error(" - 或:设置 → 系统设置 → API设置")
logger.error(" - 勾选:启用API接口")
logger.error("4. 确认账户有API交易权限")
logger.error("5. 重启QMT软件后再试")
logger.error("=" * 50)
def reconnect(self):
"""重新连接"""
logger.warning("执行重新连接...")
self.connected = False
if self.xt_trader:
try:
self.xt_trader.stop()
except:
pass
time.sleep(2)
self.connect()
def order_stock(
self,
stock_code: str,
order_type: int,
order_volume: int,
price_type: int = xtconstant.FIX_PRICE,
price: float = 0.0,
strategy_name: str = '',
order_remark: str = ''
) -> int:
"""统一委托接口"""
if not self.connected:
self.connect()
stock_code = self.adjust_stock(stock_code)
adjusted_price = self.apply_slippage(stock_code, price, order_type)
try:
order_id = self.xt_trader.order_stock(
account=self.acc,
stock_code=stock_code,
order_type=order_type,
order_volume=order_volume,
price_type=price_type,
price=adjusted_price,
strategy_name=strategy_name,
order_remark=order_remark
)
logger.info(f"委托: {stock_code} 类型={order_type} 价={adjusted_price} 量={order_volume} ID={order_id}")
return order_id
except Exception as e:
logger.error(f"委托异常: {str(e)}")
return -1
def buy(
self,
security: str,
amount: int,
price: float = 0.0,
price_type: int = xtconstant.FIX_PRICE,
strategy_name: str = '',
order_remark: str = ''
) -> int:
"""买入操作"""
return self.order_stock(
stock_code=security,
order_type=xtconstant.STOCK_BUY,
order_volume=amount,
price_type=price_type,
price=price,
strategy_name=strategy_name,
order_remark=order_remark
)
def sell(
self,
security: str,
amount: int,
price: float = 0.0,
price_type: int = xtconstant.FIX_PRICE,
strategy_name: str = '',
order_remark: str = ''
) -> int:
"""卖出操作"""
return self.order_stock(
stock_code=security,
order_type=xtconstant.STOCK_SELL,
order_volume=amount,
price_type=price_type,
price=price,
strategy_name=strategy_name,
order_remark=order_remark
)
def subscribe_quote(
self,
stock_code: str,
period: str = '1d',
callback: callable = None
):
"""订阅实时行情"""
if not callback:
callback = self.default_quote_callback
logger.info(f"订阅行情: {stock_code} 周期={period}")
xtdata.subscribe_quote(
stock_code=self.adjust_stock(stock_code),
period=period,
callback=callback
)
xtdata.run()
def default_quote_callback(self, datas):
"""默认行情回调函数"""
logger.debug(f"行情更新: {datas}")
def balance(self) -> pd.DataFrame:
"""查询账户资金"""
try:
if not self.connected:
self.connect()
asset = self.xt_trader.query_stock_asset(self.acc)
if asset:
return pd.DataFrame({
'账号类型': [asset.account_type],
'资金账户': [asset.account_id],
'可用金额': [asset.cash],
'冻结金额': [asset.frozen_cash],
'持仓市值': [asset.market_value],
'总资产': [asset.total_asset]
})
return pd.DataFrame()
except Exception as e:
logger.error(f"查询资金异常: {str(e)}")
return pd.DataFrame()
def position(self) -> pd.DataFrame:
"""查询持仓"""
try:
if not self.connected:
self.connect()
positions = self.xt_trader.query_stock_positions(self.acc)
if not positions:
logger.info("当前无持仓")
return pd.DataFrame(columns=[
'账号类型', '资金账号', '证券代码',
'股票余额', '可用余额', '成本价',
'参考成本价', '市值'
])
data = []
for pos in positions:
data.append({
'账号类型': pos.account_type,
'资金账号': pos.account_id,
'证券代码': self.adjust_stock(pos.stock_code),
'股票余额': pos.volume,
'可用余额': pos.can_use_volume,
'成本价': pos.open_price,
'参考成本价': pos.open_price,
'市值': pos.market_value
})
return pd.DataFrame(data)
except Exception as e:
logger.error(f"查询持仓异常: {str(e)}")
return pd.DataFrame()
def today_entrusts(self) -> pd.DataFrame:
"""查询当日委托"""
try:
orders = self.xt_trader.query_stock_orders(self.acc)
if not orders:
return pd.DataFrame(columns=[
'账号类型', '资金账号', '证券代码', '订单编号',
'柜台合同编号', '报单时间', '委托类型', '委托数量',
'报价类型', '委托价格', '成交数量', '成交均价',
'委托状态', '委托状态描述', '策略名称', '委托备注'
])
data = []
for order in orders:
data.append({
'账号类型': order.account_type,
'资金账号': order.account_id,
'证券代码': self.adjust_stock(order.stock_code),
'订单编号': order.order_id,
'柜台合同编号': order.order_sysid,
'报单时间': conv_time(order.order_time),
'委托类型': order.order_type,
'委托数量': order.order_volume,
'报价类型': order.price_type,
'委托价格': order.price,
'成交数量': order.traded_volume,
'成交均价': order.traded_price,
'委托状态': order.order_status,
'委托状态描述': order.status_msg,
'策略名称': order.strategy_name,
'委托备注': order.order_remark
})
return pd.DataFrame(data)
except Exception as e:
logger.error(f"查询委托异常: {str(e)}")
return pd.DataFrame()
def conv_time(self, timestamp):
"""将时间戳转换为可读时间格式"""
if not timestamp:
return ""
try:
# 如果timestamp是秒级时间戳
return pd.to_datetime(timestamp, unit='s').strftime("%Y-%m-%d %H:%M:%S")
except:
return timestamp
def stock_trades(self) -> pd.DataFrame:
"""查询当日成交"""
try:
trades = self.xt_trader.query_stock_trades(self.acc)
if not trades:
return pd.DataFrame(columns=[
'资金账号', '证券代码', '成交时间', '成交编号',
'委托编号', '柜台合同编号', '成交数量', '成交价格',
'成交金额', '委托类型'
])
data = []
for trade in trades:
data.append({
'资金账号': trade.account_id,
'证券代码': self.adjust_stock(trade.stock_code),
'成交时间': self.conv_time(trade.traded_time), # 需要时间转换函数
'成交编号': trade.traded_id,
'委托编号': trade.order_id,
'柜台合同编号': trade.order_sysid,
'成交数量': trade.traded_volume,
'成交价格': trade.traded_price,
'成交金额': trade.traded_amount,
'委托类型': trade.order_type, # 如STOCK_BUY, STOCK_SELL
})
return pd.DataFrame(data)
except Exception as e:
logger.error(f"查询委托异常: {str(e)}")
return pd.DataFrame()
def switch_quote_server(self, server_info: dict):
"""切换行情服务器"""
try:
logger.info(f"切换行情服务器: {server_info['ip']}:{server_info['port']}")
qs = xtdata.QuoteServer(server_info)
result = qs.connect()
if result.get("result", False):
logger.info("行情服务器切换成功")
return True
else:
logger.error("行情服务器切换失败")
return False
except Exception as e:
logger.error(f"切换行情服务器异常: {str(e)}")
return False
# 测试连接函数
def test_qmt_connection():
"""测试QMT连接"""
print("=== QMT连接测试 ===")
trader = EasyQMTTrader()
try:
print("1. 检查QMT运行状态...")
if not trader.check_qmt_status():
print("❌ QMT状态检查失败")
return False
print("2. 尝试连接QMT...")
trader.connect()
print("3. 测试查询功能...")
balance = trader.balance()
if not balance.empty:
print("✅ 账户查询成功")
print(balance)
else:
print("⚠️ 账户查询返回空数据")
position = trader.position()
print(f"持仓查询结果: {len(position)} 条记录")
entrusts = trader.today_entrusts()
print("✅ 查询委托记录")
print(entrusts)
trades = trader.stock_trades()
print("✅ 查询成交记录")
print(trades)
print("🎉 QMT连接测试成功!")
return True
except Exception as e:
print(f"❌ 连接测试失败: {e}")
return False
finally:
if trader.xt_trader:
trader.xt_trader.stop()
if __name__ == '__main__':
# 运行连接测试
test_qmt_connection()如果我的分享对你投资有所帮助,不吝啬给个点赞关注呗。