最近写了几篇关于量化识别底部形态的。 有同学问我,怎么用miniqmt实时监控通达信自选股的涨跌情况。
其实miniqmt我之前写的文章并不少,但貌似大家不太感兴趣,阅读量非常少,毕竟一些同学没开券商miniqmt, 写的代码没有miniqmt软件 没法验证。
大家都知道, 通达信公式可以轻松实现策略选股 或 条件预警 。我知道一些同学习惯 盘后复盘,通过通达信产生 策略选股 当成自选股,方便第二天的操作备选。监控第二天的涨跌幅或者其他 判断是否进行下一步操作。
这里贴一下完整代码,参考下思路, 具体根据自己的实际情况改造。 备注:如果发现格式有多余的特殊字符,用普通浏览器打开复制应该没问题。 希望我的分享对大家有所帮助
import os
from xtquant import xtdata
def read_tdx_blk(blk_file_path):
"""
读取通达信自选股.blk文件,并返回转换后的标准股票代码列表(格式:'000651.SZ')
Parameters:
blk_file_path (str): 通达信自选股ZXG.blk文件的完整路径
Returns:
list: 标准格式的股票代码列表,例如 ['000001.SZ', '600000.SH']
"""
standard_codes = []
try:
with open(blk_file_path, 'r', encoding='gbk') as f: # 通达信文件常用编码
lines = f.readlines()
for line in lines:
tdx_code = line.strip()
if len(tdx_code) == 7: # 通达信代码格式为7位(数字前缀+6位代码)
exchange_prefix = tdx_code[0]
stock_code = tdx_code[1:7]
if exchange_prefix == '0':
standard_code = stock_code + '.SZ' # 深交所
elif exchange_prefix == '1':
standard_code = stock_code + '.SH' # 上交所
elif exchange_prefix == '2':
standard_code = stock_code + '.BJ' # 北交所
else:
standard_code = None
if standard_code:
standard_codes.append(standard_code)
print(f"成功从 {blk_file_path} 读取 {len(standard_codes)} 只自选股")
except Exception as e:
print(f"读取自选股文件出错: {e}")
return standard_codes
def get_stock_name(full_code):
"""
通过QMT API根据股票代码获取对应的股票名称
Parameters:
full_code (str): 标准格式的股票代码,例如 '000001.SZ'
Returns:
str: 股票名称,如果获取失败则返回"未知证券"
"""
if not isinstance(full_code, str) or '.' not in full_code:
return "未知证券"
stock_name = "未知证券"
try:
instrument_detail = xtdata.get_instrument_detail(full_code)
if instrument_detail and isinstance(instrument_detail, dict):
stock_name = instrument_detail.get('InstrumentName', "未知证券")
except Exception as e:
print(f"获取证券名称失败(QMT API错误): {e}")
return stock_name
def get_realtime_stock_info(stock_list):
"""
获取指定股票列表的实时行情信息(名称、涨跌幅、最新价)
Parameters:
stock_list (list): 标准格式的股票代码列表,例如 ['000001.SZ', '600000.SH']
Returns:
dict: 包含股票实时信息的字典
"""
if not stock_list:
return {}
# 订阅实时行情
for stock_code in stock_list:
xtdata.subscribe_quote(stock_code, period='tick', count=1)
# 等待数据更新
import time
time.sleep(3)
# 获取实时数据
realtime_data = {}
for stock_code in stock_list:
try:
tick_data = xtdata.get_full_tick([stock_code])
print(tick_data)
if tick_data and stock_code in tick_data:
tick_info = tick_data[stock_code]
last_price = tick_info.get('lastPrice', 0)
pre_close = tick_info.get('lastClose', 1)
print(last_price)
print(pre_close)
# 计算涨跌幅
if pre_close and pre_close != 0:
change_percent = (last_price - pre_close) / pre_close * 100
else:
change_percent = 0
# 获取股票名称
stock_name = get_stock_name(stock_code)
realtime_data[stock_code] = {
'name': stock_name,
'change_percent': round(change_percent, 2),
'last_price': round(last_price, 3)
}
else:
print(f"未能获取 {stock_code} 的实时数据")
realtime_data[stock_code] = None
except Exception as e:
print(f"获取 {stock_code} 实时数据时出错: {e}")
realtime_data[stock_code] = None
return realtime_data
def main():
"""
主函数:读取自选股并获取实时行情
"""
# 1. 设置通达信自选股文件路径(请根据你的实际路径修改)
tdx_blk_path = r"D:\lwj\new_tdx\T0002\blocknew\zxg.blk" # 通达信自选股文件路径
# 2. 读取自选股代码
stock_list = read_tdx_blk(tdx_blk_path)
if not stock_list:
print("未读取到自选股代码,请检查文件路径")
return
print(f"自选股列表: {stock_list}")
# 3. 获取实时行情数据
print("\n开始获取实时行情数据...")
realtime_info = get_realtime_stock_info(stock_list)
# 4. 打印结果
print("\n实时行情信息:")
print("-" * 60)
for code, info in realtime_info.items():
if info:
print(
f"{code} {info['name']:>10} | 涨跌幅: {info['change_percent']:>6.2f}% | 最新价: {info['last_price']:>6.3f}")
else:
print(f"{code} | 数据获取失败")
print("-" * 60)
if __name__ == "__main__":
main()扩展一下,有了自选股涨跌幅, 我们还可以做哪些事情呢?
1、满足条件自动交易,通过企业微信、QQ邮箱等把涨跌幅实时监控到一个阈值发送到目标手机, 方便上班不盯盘的同学。
2、我们可以根据自选股 code , 和其他数据源的概念进行链表处理, 展示 对应股的 概念。
3、我们有了自选股数据,可以监控 该股的分时均线, 是否站稳分时均线 进行下一步操作。
凡事预则立,不预则废。 之前的时候我也盘中出信号直接上车, 后来发现某个个股我没搞懂它的概念或财务情况, 碰到过一些乌龙情况。 后来我就习惯通过头一天的策略先筛选出自己的股票目标池,然后第二天监控目标池的变化进行操作。