

今天星球同学提了一个问题, 平时用 通达信选股有个自选股池, 怎么借助miniqmt监听股票池个股 价格、成交量 变化。 实际需求肯定比这个复杂, 这里就以 类似的场景为例,写一个例子。
监听通达信自选股, 通过miniqmt获取 昨日的成交量, 以及实时监听今日的成交量, 如果今日 成交量>昨日的成交量,则打印。
有些同学希望做爆量逻辑,比如9点40分之前 成交量爆量, 量价起升, 就可以用类似的思路。 这里只是提供技术思路和代码, 至于是否用于实际场景,自己评估, 亏钱了不要找我。
代码分为几个步骤:
1、我们先找到通达信自选股的路径,我这里设置的 D:\new_tdx\T0002\blocknew\zxg.blk , 你需要改成自己的路径。
2、 打开自选股文件,你会发现 保存的
举个例0002165 、1600727, 我们可以知道0 开头是 深圳深股, 1开头是上海沪股。
我们需要把自选股 改成miniqmt是识别的 002165.SZ, 600727.SH 这类的代码。 这些代码我只是举例
3、利用xtdata.get_market_data 获取 日线数据获取昨日的成交量
4、利用订阅xtdata.subscribe_whole_quote 获取股票今日的价格、成交量。
如果你要了解大单等, 可能需要采购L2数据。
5、自己的量化逻辑处理后,就可以直接借助order_stock下单。 通过miniqmt下单可以参考下 我封装的 交易常见操作封装类方法。
最后附上完整代码,需要的自取。 备注:如果发现格式有多余的特殊字符,用普通浏览器打开复制应该没问题。
import os
import time
import pandas as pd
from xtquant import xtdata
from xtquant.xttrader import XtQuantTrader
#功能描述,读取通达信自选股,监听miniqmt,如果今天的成交量>昨日的成交量 考虑买入
# ===== 1. 股票代码转换函数 =====
defconvert_blk_to_qmt(blk_codes):
valid_codes = []
for code in blk_codes:
code = code.strip()
iflen(code) < 7: # 至少7位(首位+6位代码)
continue
prefix_char = code[0]
stock_num = code[1:].lstrip('0').zfill(6) # 关键:去前导零后补足6位
if prefix_char == '0':
valid_codes.append(f"{stock_num}.SZ")
elif prefix_char == '1':
valid_codes.append(f"{stock_num}.SH")
elif prefix_char == '9':
valid_codes.append(f"{stock_num}.BJ")
return valid_codes
# ===== 2. 加载自选股文件 =====
defload_block_stocks(block_path):
"""读取.blk文件并转换为标准代码"""
try:
withopen(block_path, 'r', encoding='utf-8') as f:
raw_codes = [line.strip() for line in f if line.strip()]
return convert_blk_to_qmt(raw_codes)
except Exception as e:
print(f"读取自选股文件失败: {e}")
return []
# ===== 3. 全局变量初始化 =====
block_path = r"D:\new_tdx\T0002\blocknew\zxg.blk"
#换成自己的路径
last_volume = {} # 存储昨日成交量:{股票代码: 昨日成交量}
monitored_stocks = set() # 当前监听的股票集合
trader = XtQuantTrader("", 123456) # 初始化交易对象
subscribe_seq = None # 存储订阅序列号
# ===== 4. 行情回调函数 =====
defon_data(datas):
global last_volume, monitored_stocks
for stock_code, data in datas.items():
if stock_code notin monitored_stocks:
continue
# 确保获取的是标量值
realtime_vol = data.get('volume', 0)
ifisinstance(realtime_vol, pd.Series): # 如果是Series,取最后一个值
realtime_vol = realtime_vol.iloc[-1].item()
prev_vol = last_volume.get(stock_code, 0)
# 确保prev_vol是数字类型
ifisinstance(prev_vol, pd.Series):
prev_vol = prev_vol.iloc[-1].item()
if realtime_vol > prev_vol and prev_vol > 0:
print(f"[信号] {stock_code} 实时成交量({realtime_vol}) > 昨日成交量({prev_vol})")
#买入逻辑,检测持仓是否已经有或已提交订单,如无则考虑买入
# ===== 5. 主监控循环 =====
defmain():
global last_volume, monitored_stocks, subscribe_seq
last_mtime = 0 # 文件最后修改时间
# 首次加载自选股
stocks = load_block_stocks(block_path)
monitored_stocks = set(stocks)
print(f"初始监控列表: {monitored_stocks}")
# 获取昨日成交量
for stock in monitored_stocks:
df = xtdata.get_market_data(stock_list=[stock], period='1d', count=1)
if df isnotNone and'volume'in df:
# 处理多维数组场景
volume_data = df['volume'].iloc[-1]
ifhasattr(volume_data, '__len__') andlen(volume_data) > 1:
last_volume[stock] = volume_data[0] # 取首个元素
else:
last_volume[stock] = volume_data
print(f"初始化 {stock} 昨日成交量: {last_volume[stock]}")
else:
print(f"警告:无法获取 {stock} 的昨日成交量")
# 订阅实时行情(全推模式)
subscribe_seq = xtdata.subscribe_whole_quote(list(monitored_stocks), callback=on_data)
print(f"订阅序列号: {subscribe_seq}")
# 持续监听.blk文件变化
whileTrue:
try:
current_mtime = os.path.getmtime(block_path)
if current_mtime != last_mtime:
print("检测到自选股文件变化,重新加载...")
new_stocks = set(load_block_stocks(block_path))
# 处理新增股票
added = new_stocks - monitored_stocks
for stock in added:
df = xtdata.get_market_data(stock_list=[stock], period='1d', count=1)
if df isnotNoneandnot df.empty and'volume'in df:
last_volume[stock] = df['volume'].iloc[-1].item()
print(f"新增 {stock} 昨日成交量: {last_volume[stock]}")
else:
print(f"警告:无法获取 {stock} 的昨日成交量")
# 更新订阅(先取消旧订阅,再订阅新列表)
if subscribe_seq isnotNone:
xtdata.unsubscribe_quote(subscribe_seq)
subscribe_seq = xtdata.subscribe_whole_quote(list(new_stocks), callback=on_data)
monitored_stocks = new_stocks
last_mtime = current_mtime
print(f"更新后监控列表: {monitored_stocks}")
time.sleep(5) # 每5秒检查一次文件
except Exception as e:
print(f"监控循环异常: {e}")
time.sleep(10)
if __name__ == "__main__":
main()如果我的分享对你投资有所帮助,不吝啬给个点赞关注呗。