最近写技术文章没啥思绪, 碰巧今天有同学问我miniqmt基于xtquant的封装交易方法, 其实网上有不少这样的开源项目, 自己之前自己写小客户端项目简单封装了下。 要不这里简单分享下源代码,就当水文了。
大体思路:
1、默认从配置文件读取QMT安装路径和账户
2、 qmt链接, 如果用miniqmt,记得勾选独立交易。
3、买卖操作,
4、账户查询
5、持仓查询
最后附上完整代码,需要的自取。 备注:如果发现格式有多余的特殊字符,用普通浏览器打开复制应该没问题
import logging
import time
import random
import json
import pandas as pd
from pathlib import Path
from typing import Tuple, Union
from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
from xtquant.xttype import StockAccount
from xtquant import xtconstant, xtdata
# 初始化日志配置
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger('QMT_Trader')
# 默认配置
DEFAULT_CONFIG = {
"trader_path": r"D:/国金QMT交易端模拟/userdata_mini",
"account": "55009640"
}
# 配置文件管理
def load_config():
"""加载配置文件"""
config_path = Path(__file__).parent / 'config.json'
try:
if config_path.exists():
with open(config_path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logger.error(f"配置文件加载失败: {str(e)}")
# 创建默认配置
with open(config_path, 'w', encoding='utf-8') as f:
json.dump(DEFAULT_CONFIG, f, indent=4)
return DEFAULT_CONFIG
# 常量定义
ORDER_TYPE_MAPPING = {
'buy': xtconstant.STOCK_BUY,
'sell': xtconstant.STOCK_SELL
}
STOCK_EXCHANGE_MAP = {
'60': '.SH',
'68': '.SH', # 科创板
'00': '.SZ',
'30': '.SZ', # 创业板
'83': '.BJ' # 北交所
}
def conv_time(ct: int) -> str:
'''
conv_time(1476374400000) --> '20161014000000.000'
'''
local_time = time.localtime(ct / 1000)
data_head = time.strftime('%Y%m%d%H%M%S', local_time)
data_secs = (ct - int(ct)) * 1000
time_stamp = f'{data_head}.{int(data_secs):03d}'
return time_stamp
class MyXtQuantTraderCallback(XtQuantTraderCallback):
"""交易回调处理类"""
def __init__(self, trader_instance):
super().__init__()
self.trader = trader_instance
def on_disconnected(self):
"""连接断开回调 (增加自动重连)"""
logger.error("connection lost, attempting reconnect...")
self.trader.reconnect()
def on_stock_order(self, order):
"""委托回报推送"""
logger.info(f"委托更新: {order.stock_code} 状态={order.order_status} 系统ID={order.order_sysid}")
def on_stock_asset(self, asset):
"""资金变动推送"""
logger.info(f"资产更新: 账号={asset.account_id} 现金={asset.cash} 总资产={asset.total_asset}")
def on_stock_trade(self, trade):
"""成交变动推送"""
logger.info(f"成交: {trade.stock_code} 数量={trade.traded_volume} 价格={trade.traded_price}")
def on_stock_position(self, position):
"""持仓变动推送"""
logger.info(f"持仓更新: {position.stock_code} 数量={position.volume}")
def on_order_error(self, order_error):
"""委托失败推送"""
logger.error(f"委托失败: ID={order_error.order_id} 错误码={order_error.error_id} 信息={order_error.error_msg}")
def on_cancel_error(self, cancel_error):
"""撤单失败推送"""
logger.error(
f"撤单失败: ID={cancel_error.order_id} 错误码={cancel_error.error_id} 信息={cancel_error.error_msg}")
class EasyQMTTrader:
"""QMT交易接口"""
def __init__(
self,
path: str = None,
session_id: int = None,
account: str = None,
account_type: str = 'STOCK',
is_slippage: bool = True,
slippage: float = 0.01
) -> None:
config = load_config()
self.path = path or config.get('trader_path', DEFAULT_CONFIG['trader_path'])
self.account = account or config.get('account', DEFAULT_CONFIG['account'])
self.session_id = session_id or self._generate_session_id()
self.account_type = account_type
self.slippage = slippage if is_slippage else 0.0
self.xt_trader = None
self.acc = None
self.callback = None
self.connected = False
def _generate_session_id(self) -> int:
"""生成随机会话ID"""
return random.randint(10000000, 99999999)
def adjust_stock(self, stock: str) -> str:
"""标准化证券代码格式"""
if '.' in stock: # 已有后缀
return stock.upper()
prefix = stock[0]
suffix = STOCK_EXCHANGE_MAP.get(prefix, '.SZ') # 默认深市
return f"{stock}{suffix}"
def get_security_type(self, stock: str) -> str:
"""获取证券类型"""
stock = self.adjust_stock(stock)
if stock[:3] in ['110', '113', '123', '127', '128', '111', '118'] or stock[:2] in ['11', '12']:
return 'bond'
elif stock[:3] in ['510', '511', '512', '513', '514', '515', '516', '517', '518', '588', '159', '501',
'164'] or stock[:2] in ['16']:
return 'fund'
else:
return 'stock'
def apply_slippage(self, stock: str, price: float, trade_type: Union[str, int]) -> float:
"""应用滑点调整"""
security_type = self.get_security_type(stock)
adjusted_price = price
if trade_type in ['buy', xtconstant.STOCK_BUY]:
if security_type in ['fund', 'bond']:
adjusted_price += self.slippage / 10
else:
adjusted_price += self.slippage
else: # sell
if security_type in ['fund', 'bond']:
adjusted_price -= self.slippage / 10
else:
adjusted_price -= self.slippage
logger.debug(f"滑点调整: {stock} 原价={price} 调整后={adjusted_price}")
return round(adjusted_price, 3) # 保留3位小数
def is_trading_time(
self,
trader_days: int = 4,
start_hour: int = 9,
end_hour: int = 15,
start_min: int = 30,
jhjj: bool = False
) -> bool:
"""验证当前是否为交易时间"""
jhjj_time = 15 if jhjj else 30
now = time.localtime()
weekday = now.tm_wday
if weekday > trader_days: # 周末
logger.warning("非交易日(周末)")
return False
hour, minute = now.tm_hour, now.tm_min
# 检查交易时段
if start_hour <= hour <= end_hour:
if hour == 9:
return minute >= jhjj_time
elif hour == 11 and minute > 30: # 午间休市
return False
elif hour == 12 and minute < 1: # 下午开盘
return minute >= 0
elif hour == end_hour and minute >= start_min:
return False
return True
return False
def connect(self) -> Tuple[XtQuantTrader, StockAccount]:
"""建立交易连接 (增加重试机制)"""
if self.connected:
logger.info("已存在有效连接")
return self.xt_trader, self.acc
logger.info(f"连接QMT: path={self.path} session={self.session_id}")
self.xt_trader = XtQuantTrader(self.path, self.session_id)
self.acc = StockAccount(self.account, self.account_type)
self.callback = MyXtQuantTraderCallback(self)
try:
self.xt_trader.register_callback(self.callback)
self.xt_trader.start()
connect_result = self.xt_trader.connect()
if connect_result == 0:
subscribe_result = self.xt_trader.subscribe(self.acc)
logger.info(f"连接成功 订阅结果={subscribe_result}")
self.connected = True
return self.xt_trader, self.acc
else:
logger.error(f"连接失败 返回码={connect_result}")
raise ConnectionError("QMT连接失败")
except Exception as e:
logger.error(f"连接异常: {str(e)}")
raise
def reconnect(self):
"""重新连接"""
logger.warning("执行重新连接...")
self.connected = False
self.xt_trader.stop()
time.sleep(1)
self.connect()
def order_stock(
self,
stock_code: str,
order_type: int,
order_volume: int,
price_type: int = xtconstant.FIX_PRICE,
price: float = 0.0,
strategy_name: str = '',
order_remark: str = ''
) -> int:
"""统一委托接口 (增加异常处理)"""
if not self.connected:
self.connect()
stock_code = self.adjust_stock(stock_code)
adjusted_price = self.apply_slippage(stock_code, price, order_type)
try:
order_id = self.xt_trader.order_stock(
account=self.acc,
stock_code=stock_code,
order_type=order_type,
order_volume=order_volume,
price_type=price_type,
price=adjusted_price,
strategy_name=strategy_name,
order_remark=order_remark
)
logger.info(f"委托: {stock_code} 类型={order_type} 价={adjusted_price} 量={order_volume} ID={order_id}")
return order_id
except Exception as e:
logger.error(f"委托异常: {str(e)}")
return -1
def buy(
self,
security: str,
amount: int,
price: float = 0.0,
price_type: int = xtconstant.FIX_PRICE,
strategy_name: str = '',
order_remark: str = ''
) -> int:
"""买入操作"""
return self.order_stock(
stock_code=security,
order_type=xtconstant.STOCK_BUY,
order_volume=amount,
price_type=price_type,
price=price,
strategy_name=strategy_name,
order_remark=order_remark
)
def sell(
self,
security: str,
amount: int,
price: float = 0.0,
price_type: int = xtconstant.FIX_PRICE,
strategy_name: str = '',
order_remark: str = ''
) -> int:
"""卖出操作"""
return self.order_stock(
stock_code=security,
order_type=xtconstant.STOCK_SELL,
order_volume=amount,
price_type=price_type,
price=price,
strategy_name=strategy_name,
order_remark=order_remark
)
def subscribe_quote(
self,
stock_code: str,
period: str = '1d',
callback: callable = None
):
"""订阅实时行情"""
if not callback:
callback = self.default_quote_callback
logger.info(f"订阅行情: {stock_code} 周期={period}")
xtdata.subscribe_quote(
stock_code=self.adjust_stock(stock_code),
period=period,
callback=callback
)
xtdata.run()
def default_quote_callback(self, datas):
"""默认行情回调函数"""
logger.debug(f"行情更新: {datas}")
def balance(self) -> pd.DataFrame:
"""查询账户资金"""
try:
asset = self.xt_trader.query_stock_asset(self.acc)
if asset:
return pd.DataFrame({
'账号类型': [asset.account_type],
'资金账户': [asset.account_id],
'可用金额': [asset.cash],
'冻结金额': [asset.frozen_cash],
'持仓市值': [asset.market_value],
'总资产': [asset.total_asset]
})
return pd.DataFrame() # 空数据
except Exception as e:
logger.error(f"查询资金异常: {str(e)}")
return pd.DataFrame()
def position(self) -> pd.DataFrame:
"""查询持仓 (优化性能)"""
try:
positions = self.xt_trader.query_stock_positions(self.acc)
if not positions:
return pd.DataFrame(columns=[
'账号类型', '资金账号', '证券代码',
'股票余额', '可用余额', '成本价',
'参考成本价', '市值'
])
data = []
for pos in positions:
data.append({
'账号类型': pos.account_type,
'资金账号': pos.account_id,
'证券代码': self.adjust_stock(pos.stock_code),
'股票余额': pos.volume,
'可用余额': pos.can_use_volume,
'成本价': pos.open_price,
'参考成本价': pos.open_price,
'市值': pos.market_value
})
return pd.DataFrame(data)
except Exception as e:
logger.error(f"查询持仓异常: {str(e)}")
return pd.DataFrame()
def today_entrusts(self) -> pd.DataFrame:
"""查询当日委托 (优化性能)"""
try:
orders = self.xt_trader.query_stock_orders(self.acc)
if not orders:
return pd.DataFrame(columns=[
'账号类型', '资金账号', '证券代码', '订单编号',
'柜台合同编号', '报单时间', '委托类型', '委托数量',
'报价类型', '委托价格', '成交数量', '成交均价',
'委托状态', '委托状态描述', '策略名称', '委托备注'
])
data = []
for order in orders:
data.append({
'账号类型': order.account_type,
'资金账号': order.account_id,
'证券代码': self.adjust_stock(order.stock_code),
'订单编号': order.order_id,
'柜台合同编号': order.order_sysid,
'报单时间': conv_time(order.order_time),
'委托类型': order.order_type,
'委托数量': order.order_volume,
'报价类型': order.price_type,
'委托价格': order.price,
'成交数量': order.traded_volume,
'成交均价': order.traded_price,
'委托状态': order.order_status,
'委托状态描述': order.status_msg,
'策略名称': order.strategy_name,
'委托备注': order.order_remark
})
return pd.DataFrame(data)
except Exception as e:
logger.error(f"查询委托异常: {str(e)}")
return pd.DataFrame()
def switch_quote_server(self, server_info: dict):
"""切换行情服务器"""
try:
logger.info(f"切换行情服务器: {server_info['ip']}:{server_info['port']}")
qs = xtdata.QuoteServer(server_info)
result = qs.connect()
if result.get("result", False):
logger.info("行情服务器切换成功")
return True
else:
logger.error("行情服务器切换失败")
return False
except Exception as e:
logger.error(f"切换行情服务器异常: {str(e)}")
return False
if __name__ == '__main__':
# 示例用法
trader = EasyQMTTrader()
trader.connect()
# 查询资产
print(trader.balance())
# 查询持仓
print(trader.position())
题外话:
最近空闲时候写了多套策略,测试会发现,再好的策略还是配合 大盘情绪 ,以及热门概念比较好。 虽然自己之前就知道,曾想过用策略平铺,来实现短线策略自动交易。 但遇到不好的行情,容易被骗炮, 几赚几亏其实并不赚钱,行情好另说。 就像上周 如果不买石油等战争概念, 其他概念很难赚到钱,平铺反而让自己仓位过重。 而本周一,随便买,除非买了白酒类, 基本都是赚钱。 择时、概念选择 比 单纯的选股策略 更重要。
后面想了想,还是坚持每天复复盘。复盘了解大盘情绪,分析当前炒作板块概念, 分析赚钱效应,让自己从选股策略中优中选优。
如果我的分享对你投资有所帮助,不吝啬给个点赞关注呗。