首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >miniqmt交易封装增加成交明细查询

miniqmt交易封装增加成交明细查询

作者头像
子晓聊技术
发布2026-04-23 16:46:05
发布2026-04-23 16:46:05
1030
举报
文章被收录于专栏:子晓AI量化子晓AI量化

这2天有同学在群里问qmt成交相关的细节, 我看了下之前的miniqmt代码 封装了 委托,没有封装成交明细, 趁着有空, 把之前的封装类 优化下, 增加 成交明细查询。

这样miniqmt交易封装包括了

1、miniqmt链接

2、查询资金账户

3、查询持仓

4、查询当日委托

5、查询成交明细

6、买

7、卖

这里说下4、5的区别, 比如你挂 某只股票 1200股, 而成交可能分为多笔,比如一笔300股,一笔900股。 那么成交明细就有2笔数据。

最近有同学问怎么区分买、卖, 正常情况下,买传递23, 卖传递24就可以了。

大家说一说,除了这些常用操作,miniqmt交易还有其他常用的操作么? 如果还有其他的常用方法,可以留言区说一说,我继续加一下。

这里说一下,为什么我喜欢miniqmt,作为一个程序员, 灵活方便, 我主要用它的行情类(日K、分K、tick等)、以及交易, 其他数据源我完全可以选择自己觉得方便的。

像一些同学关心的量比、换手率、概念什么的信息, 我完全可以不用xtquant的数据,用同花顺问财、开盘啦、 东方财富、tushare等其他数据源做技术方案。

这里贴一下完整代码,参考下思路, 具体根据自己的实际情况改造。 备注:如果发现格式有多余的特殊字符,用普通浏览器打开复制应该没问题。 希望我的分享对大家有所帮助

代码语言:javascript
复制
import logging
import os.path
import time
import random
import os
import json
import pandas as pd
import psutil  # 需要 pip install psutil
from pathlib import Path
from typing import Tuple, Union
from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
from xtquant.xttype import StockAccount
from xtquant import xtconstant, xtdata
# 初始化日志配置
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[logging.StreamHandler()]
)
logger = logging.getLogger('QMT_Trader')
#默认配置
DEFAULT_CONFIG = {
    "trader_path": r"D:/lwj/国金QMT交易端模拟/userdata_mini",
    "account": "XXXXXXX"
}
# 配置文件管理
def load_config():
    """加载配置文件"""
    config_path = Path(__file__).parent / 'config.json'
    try:
        if config_path.exists():
            with open(config_path, 'r', encoding='utf-8') as f:
                return json.load(f)
    except Exception as e:
        logger.error(f"配置文件加载失败: {str(e)}")
    # 创建默认配置
    with open(config_path, 'w', encoding='utf-8') as f:
        json.dump(DEFAULT_CONFIG, f, indent=4)
    return DEFAULT_CONFIG
# 常量定义
ORDER_TYPE_MAPPING = {
    'buy': xtconstant.STOCK_BUY,
    'sell': xtconstant.STOCK_SELL
}
STOCK_EXCHANGE_MAP = {
    '60': '.SH',
    '68': '.SH',  # 科创板
    '00': '.SZ',
    '30': '.SZ',  # 创业板
    '92': '.BJ'  # 北交所
}
def conv_time(ct: int) -> str:
    '''
    conv_time(1476374400000) --> '20161014000000.000'
    '''
    local_time = time.localtime(ct / 1000)
    data_head = time.strftime('%Y%m%d%H%M%S', local_time)
    data_secs = (ct - int(ct)) * 1000
    time_stamp = f'{data_head}.{int(data_secs):03d}'
    return time_stamp
class MyXtQuantTraderCallback(XtQuantTraderCallback):
    """交易回调处理类"""
    def __init__(self, trader_instance):
        super().__init__()
        self.trader = trader_instance
    def on_disconnected(self):
        """连接断开回调"""
        logger.error("QMT连接断开,尝试重连...")
        try:
            self.trader.reconnect()
        except:
            logger.error("自动重连失败,请手动重连")
    def on_stock_order(self, order):
        """委托回报推送"""
        logger.info(f"委托更新: {order.stock_code} 状态={order.order_status} 系统ID={order.order_sysid}")
    def on_stock_asset(self, asset):
        """资金变动推送"""
        logger.info(f"资产更新: 账号={asset.account_id} 现金={asset.cash} 总资产={asset.total_asset}")
    def on_stock_trade(self, trade):
        """成交变动推送"""
        logger.info(f"成交: {trade.stock_code} 数量={trade.traded_volume} 价格={trade.traded_price}")
    def on_stock_position(self, position):
        """持仓变动推送"""
        logger.info(f"持仓更新: {position.stock_code} 数量={position.volume}")
    def on_order_error(self, order_error):
        """委托失败推送"""
        logger.error(f"委托失败: ID={order_error.order_id} 错误码={order_error.error_id} 信息={order_error.error_msg}")
    def on_cancel_error(self, cancel_error):
        """撤单失败推送"""
        logger.error(
            f"撤单失败: ID={cancel_error.order_id} 错误码={cancel_error.error_id} 信息={cancel_error.error_msg}")
class EasyQMTTrader:
    """QMT交易接口优化版 - 增强连接诊断"""
    def __init__(
            self,
            path: str = None,
            session_id: int = None,
            account: str = None,
            account_type: str = 'STOCK',
            is_slippage: bool = True,
            slippage: float = 0.01
    ) -> None:
        config = load_config()
        self.path = path or config.get('trader_path', DEFAULT_CONFIG['trader_path'])
        self.account = account or config.get('account', DEFAULT_CONFIG['account'])
        self.session_id = session_id or self._generate_session_id()
        self.account_type = account_type
        self.slippage = slippage if is_slippage else 0.0
        self.xt_trader = None
        self.acc = None
        self.callback = None
        self.connected = False
    def _generate_session_id(self) -> int:
        """生成随机会话ID"""
        return random.randint(10000000, 99999999)
    def check_qmt_status(self):
        """检查QMT软件运行状态"""
        logger.info("=== QMT状态诊断 ===")
        print(self)
        # 检查路径
        if os.path.exists(self.path):
            logger.info(f"✅ QMT路径存在: {self.path}")
        else:
            logger.error(f"❌ QMT路径不存在: {self.path}")
            return False
        # 检查QMT进程
        qmt_processes = []
        for proc in psutil.process_iter(['pid', 'name', 'status']):
            try:
                proc_name = proc.info['name'].lower()
                if any(x in proc_name for x in ['xtitclient', 'qmt', 'thinktrader']):
                    qmt_processes.append(proc.info)
                    logger.info(
                        f"✅ 发现QMT进程: {proc.info['name']} (PID: {proc.info['pid']}, 状态: {proc.info['status']})")
            except (psutil.NoSuchProcess, psutil.AccessDenied):
                continue
        if not qmt_processes:
            logger.error("❌ 未发现QMT相关进程!")
            logger.error("请先启动QMT软件并登录账户")
            return False
        logger.info(f"✅ 发现 {len(qmt_processes)} 个QMT相关进程")
        return True
    def adjust_stock(self, stock: str) -> str:
        """标准化证券代码格式"""
        if '.' in stock:  # 已有后缀
            return stock.upper()
        prefix = stock[:2]
        suffix = STOCK_EXCHANGE_MAP.get(prefix, '.SZ')  # 默认深市
        return f"{stock}{suffix}"
    def get_security_type(self, stock: str) -> str:
        """获取证券类型"""
        stock = self.adjust_stock(stock)
        if stock[:3] in ['110', '113', '123', '127', '128', '111', '118'] or stock[:2] in ['11', '12']:
            return 'bond'
        elif stock[:3] in ['510', '511', '512', '513', '514', '515', '516', '517', '518', '588', '159', '501',
                           '164'] or stock[:2] in ['16']:
            return 'fund'
        else:
            return 'stock'
    def apply_slippage(self, stock: str, price: float, trade_type: Union[str, int]) -> float:
        """应用滑点调整"""
        security_type = self.get_security_type(stock)
        adjusted_price = price
        if trade_type in ['buy', xtconstant.STOCK_BUY]:
            if security_type in ['fund', 'bond']:
                adjusted_price += self.slippage / 10
            else:
                adjusted_price += self.slippage
        else:  # sell
            if security_type in ['fund', 'bond']:
                adjusted_price -= self.slippage / 10
            else:
                adjusted_price -= self.slippage
        logger.debug(f"滑点调整: {stock} 原价={price} 调整后={adjusted_price}")
        return round(adjusted_price, 3)  # 保留3位小数
    def is_trading_time(
            self,
            trader_days: int = 4,
            start_hour: int = 9,
            end_hour: int = 15,
            start_min: int = 30,
            jhjj: bool = False
    ) -> bool:
        """验证当前是否为交易时间"""
        jhjj_time = 15 if jhjj else 30
        now = time.localtime()
        weekday = now.tm_wday
        if weekday > trader_days:  # 周末
            logger.warning("非交易日(周末)")
            return False
        hour, minute = now.tm_hour, now.tm_min
        # 检查交易时段
        if start_hour <= hour <= end_hour:
            if hour == 9:
                return minute >= jhjj_time
            elif hour == 11 and minute > 30:  # 午间休市
                return False
            elif hour == 12 and minute < 1:  # 下午开盘
                return minute >= 0
            elif hour == end_hour and minute >= start_min:
                return False
            return True
        return False
    def connect(self) -> Tuple[XtQuantTrader, StockAccount]:
        """建立交易连接 - 增强诊断功能"""
        if self.connected:
            logger.info("已存在有效连接")
            return self.xt_trader, self.acc
        # 先检查QMT状态
        if not self.check_qmt_status():
            raise ConnectionError("QMT软件未正确启动,请先启动QMT并登录账户")
        logger.info(f"连接QMT: path={self.path} session={self.session_id} account={self.account}")
        self.xt_trader = XtQuantTrader(self.path, self.session_id)
        self.acc = StockAccount(self.account, self.account_type)
        self.callback = MyXtQuantTraderCallback(self)
        try:
            logger.info("注册回调函数...")
            self.xt_trader.register_callback(self.callback)
            logger.info("启动QMT交易接口...")
            self.xt_trader.start()
            logger.info("正在连接QMT...")
            connect_result = self.xt_trader.connect()
            logger.info(f"连接结果码: {connect_result}")
            if connect_result == 0:
                logger.info("✅ QMT连接成功!正在订阅账户...")
                subscribe_result = self.xt_trader.subscribe(self.acc)
                logger.info(f"账户订阅结果: {subscribe_result}")
                # 测试账户连接
                if self.test_account_connection():
                    self.connected = True
                    logger.info("🎉 QMT连接和账户验证成功!")
                    return self.xt_trader, self.acc
                else:
                    logger.warning("⚠️ QMT连接成功但账户验证失败")
                    logger.warning("请确保在QMT中已登录账户: " + self.account)
                    # 仍然标记为已连接,允许用户尝试操作
                    self.connected = True
                    return self.xt_trader, self.acc
            else:
                error_msg = self._get_connect_error_msg(connect_result)
                logger.error(f"❌ QMT连接失败: {error_msg}")
                self._show_connection_help()
                raise ConnectionError(f"QMT连接失败: {error_msg}")
        except Exception as e:
            logger.error(f"连接异常: {str(e)}")
            self._show_connection_help()
            raise
    def test_account_connection(self):
        """测试账户连接"""
        try:
            logger.info("测试账户连接...")
            asset = self.xt_trader.query_stock_asset(self.acc)
            if asset and hasattr(asset, 'account_id'):
                logger.info(f"✅ 账户验证成功: {asset.account_id}")
                logger.info(f"账户总资产: {getattr(asset, 'total_asset', 'N/A')}")
                return True
            else:
                logger.warning("❌ 账户查询返回空数据")
                return False
        except Exception as e:
            logger.error(f"❌ 账户连接测试失败: {e}")
            return False
    def _get_connect_error_msg(self, error_code):
        """获取连接错误信息"""
        error_map = {
            -1: "连接失败 - QMT可能未启动或API未启用",
            -2: "登录失败 - 请检查账户信息",
            -3: "网络连接超时",
            -4: "API接口未启用",
            -5: "账户权限不足",
            -6: "session冲突 - 请重启程序",
            -7: "路径错误"
        }
        return error_map.get(error_code, f"未知错误码: {error_code}")
    def _show_connection_help(self):
        """显示连接帮助信息"""
        logger.error("=" * 50)
        logger.error("QMT连接失败,请检查以下步骤:")
        logger.error("1. 确保QMT软件已完全启动")
        logger.error("2. 在QMT中登录账户: " + self.account)
        logger.error("3. 启用QMT的API接口:")
        logger.error("   - 菜单栏 → 工具 → API管理")
        logger.error("   - 或:设置 → 系统设置 → API设置")
        logger.error("   - 勾选:启用API接口")
        logger.error("4. 确认账户有API交易权限")
        logger.error("5. 重启QMT软件后再试")
        logger.error("=" * 50)
    def reconnect(self):
        """重新连接"""
        logger.warning("执行重新连接...")
        self.connected = False
        if self.xt_trader:
            try:
                self.xt_trader.stop()
            except:
                pass
        time.sleep(2)
        self.connect()
    def order_stock(
            self,
            stock_code: str,
            order_type: int,
            order_volume: int,
            price_type: int = xtconstant.FIX_PRICE,
            price: float = 0.0,
            strategy_name: str = '',
            order_remark: str = ''
    ) -> int:
        """统一委托接口"""
        if not self.connected:
            self.connect()
        stock_code = self.adjust_stock(stock_code)
        adjusted_price = self.apply_slippage(stock_code, price, order_type)
        try:
            order_id = self.xt_trader.order_stock(
                account=self.acc,
                stock_code=stock_code,
                order_type=order_type,
                order_volume=order_volume,
                price_type=price_type,
                price=adjusted_price,
                strategy_name=strategy_name,
                order_remark=order_remark
            )
            logger.info(f"委托: {stock_code} 类型={order_type} 价={adjusted_price} 量={order_volume} ID={order_id}")
            return order_id
        except Exception as e:
            logger.error(f"委托异常: {str(e)}")
            return -1
    def buy(
            self,
            security: str,
            amount: int,
            price: float = 0.0,
            price_type: int = xtconstant.FIX_PRICE,
            strategy_name: str = '',
            order_remark: str = ''
    ) -> int:
        """买入操作"""
        return self.order_stock(
            stock_code=security,
            order_type=xtconstant.STOCK_BUY,
            order_volume=amount,
            price_type=price_type,
            price=price,
            strategy_name=strategy_name,
            order_remark=order_remark
        )
    def sell(
            self,
            security: str,
            amount: int,
            price: float = 0.0,
            price_type: int = xtconstant.FIX_PRICE,
            strategy_name: str = '',
            order_remark: str = ''
    ) -> int:
        """卖出操作"""
        return self.order_stock(
            stock_code=security,
            order_type=xtconstant.STOCK_SELL,
            order_volume=amount,
            price_type=price_type,
            price=price,
            strategy_name=strategy_name,
            order_remark=order_remark
        )
    def subscribe_quote(
            self,
            stock_code: str,
            period: str = '1d',
            callback: callable = None
    ):
        """订阅实时行情"""
        if not callback:
            callback = self.default_quote_callback
        logger.info(f"订阅行情: {stock_code} 周期={period}")
        xtdata.subscribe_quote(
            stock_code=self.adjust_stock(stock_code),
            period=period,
            callback=callback
        )
        xtdata.run()
    def default_quote_callback(self, datas):
        """默认行情回调函数"""
        logger.debug(f"行情更新: {datas}")
    def balance(self) -> pd.DataFrame:
        """查询账户资金"""
        try:
            if not self.connected:
                self.connect()
            asset = self.xt_trader.query_stock_asset(self.acc)
            if asset:
                return pd.DataFrame({
                    '账号类型': [asset.account_type],
                    '资金账户': [asset.account_id],
                    '可用金额': [asset.cash],
                    '冻结金额': [asset.frozen_cash],
                    '持仓市值': [asset.market_value],
                    '总资产': [asset.total_asset]
                })
            return pd.DataFrame()
        except Exception as e:
            logger.error(f"查询资金异常: {str(e)}")
            return pd.DataFrame()
    def position(self) -> pd.DataFrame:
        """查询持仓"""
        try:
            if not self.connected:
                self.connect()
            positions = self.xt_trader.query_stock_positions(self.acc)
            if not positions:
                logger.info("当前无持仓")
                return pd.DataFrame(columns=[
                    '账号类型', '资金账号', '证券代码',
                    '股票余额', '可用余额', '成本价',
                    '参考成本价', '市值'
                ])
            data = []
            for pos in positions:
                data.append({
                    '账号类型': pos.account_type,
                    '资金账号': pos.account_id,
                    '证券代码': self.adjust_stock(pos.stock_code),
                    '股票余额': pos.volume,
                    '可用余额': pos.can_use_volume,
                    '成本价': pos.open_price,
                    '参考成本价': pos.open_price,
                    '市值': pos.market_value
                })
            return pd.DataFrame(data)
        except Exception as e:
            logger.error(f"查询持仓异常: {str(e)}")
            return pd.DataFrame()
    def today_entrusts(self) -> pd.DataFrame:
        """查询当日委托"""
        try:
            orders = self.xt_trader.query_stock_orders(self.acc)
            if not orders:
                return pd.DataFrame(columns=[
                    '账号类型', '资金账号', '证券代码', '订单编号',
                    '柜台合同编号', '报单时间', '委托类型', '委托数量',
                    '报价类型', '委托价格', '成交数量', '成交均价',
                    '委托状态', '委托状态描述', '策略名称', '委托备注'
                ])
            data = []
            for order in orders:
                data.append({
                    '账号类型': order.account_type,
                    '资金账号': order.account_id,
                    '证券代码': self.adjust_stock(order.stock_code),
                    '订单编号': order.order_id,
                    '柜台合同编号': order.order_sysid,
                    '报单时间': conv_time(order.order_time),
                    '委托类型': order.order_type,
                    '委托数量': order.order_volume,
                    '报价类型': order.price_type,
                    '委托价格': order.price,
                    '成交数量': order.traded_volume,
                    '成交均价': order.traded_price,
                    '委托状态': order.order_status,
                    '委托状态描述': order.status_msg,
                    '策略名称': order.strategy_name,
                    '委托备注': order.order_remark
                })
            return pd.DataFrame(data)
        except Exception as e:
            logger.error(f"查询委托异常: {str(e)}")
            return pd.DataFrame()
    def conv_time(self, timestamp):
        """将时间戳转换为可读时间格式"""
        if not timestamp:
            return ""
        try:
            # 如果timestamp是秒级时间戳
            return pd.to_datetime(timestamp, unit='s').strftime("%Y-%m-%d %H:%M:%S")
        except:
            return timestamp
    def stock_trades(self) -> pd.DataFrame:
        """查询当日成交"""
        try:
            trades = self.xt_trader.query_stock_trades(self.acc)
            if not trades:
                return pd.DataFrame(columns=[
                    '资金账号', '证券代码', '成交时间', '成交编号',
                    '委托编号', '柜台合同编号', '成交数量', '成交价格',
                    '成交金额', '委托类型'
                ])
            data = []
            for trade in trades:
                data.append({
                    '资金账号': trade.account_id,
                    '证券代码': self.adjust_stock(trade.stock_code),
                    '成交时间': self.conv_time(trade.traded_time),  # 需要时间转换函数
                    '成交编号': trade.traded_id,
                    '委托编号': trade.order_id,
                    '柜台合同编号': trade.order_sysid,
                    '成交数量': trade.traded_volume,
                    '成交价格': trade.traded_price,
                    '成交金额': trade.traded_amount,
                    '委托类型': trade.order_type,  # 如STOCK_BUY, STOCK_SELL
                })
            return pd.DataFrame(data)
        except Exception as e:
            logger.error(f"查询委托异常: {str(e)}")
            return pd.DataFrame()
    def switch_quote_server(self, server_info: dict):
        """切换行情服务器"""
        try:
            logger.info(f"切换行情服务器: {server_info['ip']}:{server_info['port']}")
            qs = xtdata.QuoteServer(server_info)
            result = qs.connect()
            if result.get("result", False):
                logger.info("行情服务器切换成功")
                return True
            else:
                logger.error("行情服务器切换失败")
                return False
        except Exception as e:
            logger.error(f"切换行情服务器异常: {str(e)}")
            return False
# 测试连接函数
def test_qmt_connection():
    """测试QMT连接"""
    print("=== QMT连接测试 ===")
    trader = EasyQMTTrader()
    try:
        print("1. 检查QMT运行状态...")
        if not trader.check_qmt_status():
            print("❌ QMT状态检查失败")
            return False
        print("2. 尝试连接QMT...")
        trader.connect()
        print("3. 测试查询功能...")
        balance = trader.balance()
        if not balance.empty:
            print("✅ 账户查询成功")
            print(balance)
        else:
            print("⚠️ 账户查询返回空数据")
        position = trader.position()
        print(f"持仓查询结果: {len(position)} 条记录")
        entrusts = trader.today_entrusts()
        print("✅ 查询委托记录")
        print(entrusts)
        trades = trader.stock_trades()
        print("✅ 查询成交记录")
        print(trades)
        print("🎉 QMT连接测试成功!")
        return True
    except Exception as e:
        print(f"❌ 连接测试失败: {e}")
        return False
    finally:
        if trader.xt_trader:
            trader.xt_trader.stop()
if __name__ == '__main__':
    # 运行连接测试
    test_qmt_connection()

如果我的分享对你投资有所帮助,不吝啬给个点赞关注呗。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-10-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 子晓聊技术 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 这里贴一下完整代码,参考下思路, 具体根据自己的实际情况改造。 备注:如果发现格式有多余的特殊字符,用普通浏览器打开复制应该没问题。 希望我的分享对大家有所帮助
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档