
熟悉我同学的都知道, 我在之前写了一个问财QMT手动交易系统(包括持仓管理、 问财多条件查询、 下单撤单等)功能。
最近同学提了一个想法, 能否直接把问财的查询条件自动交易。其实这个想法之前有同学提过, 当时个人觉得 问财直接查询出来自动交易风险太大, 再加上当时在忙其他事情。
既然需求还算有共性,趁着这个周末有空, 借助AI辅助写了这样的问财QMT自动交易系统。
需求实现大概是这样的:
1、 用PySide6做的UI,自己不擅长这玩意,全凭AI帮忙,做得比较low。
2、基于xtquant的交易文档做了简单封装, 相当于属于自己的一个qmt_trader。 这块我之前有写封装的例子
3、设置交易时间、 输入查询条件、设置条件,买入查询条件的前X只,每支金额设置比如不超过一定金额,比如1万元。 这些全部通过界面进行设置。
一切就绪, 实际效果可以看上面的截图。
这篇文章主要提供思路为主, 其实你完全可以自己去实现这样的问财QMT自动交易系统。 只有想不到的,没有做不到的。
我认识的同学中 用通达信居多, 后面写一写基于通达信实现自动交易miniqmt交易的例子。 虽然网上有一些开源项目, 但一些同学基于开源项目没跑通流程。 任何事情,只有动手干, 才能发现问题。 多动手
这里提供核心代码。
def execute_trading_strategy(self):
"""执行交易策略"""
# 获取选股结果
query = self.query_edit.text()
self.log(f"🔍 执行查询: {query}")
# 使用pywencai获取数据
df = pywencai.get(query=query, sort_key='竞价成交金额', sort_order='desc')
if df is None or df.empty:
self.log("⚠️ 未找到符合条件的股票")
return
self.log(f"📊 找到 {len(df)} 只符合条件的股票")
# 筛选前N只股票
max_stocks = self.max_stocks_spin.value()
target_stocks = df.head(max_stocks)
#print(target_stocks)
# 获取当前持仓
position_df = self.trader.position()
held_stocks = set(position_df['证券代码'].tolist()) if not position_df.empty else set()
# 执行交易
for _, row in target_stocks.iterrows():
stock_code = row['股票代码']
# 检查是否已持仓或已下单
if stock_code in held_stocks or stock_code in self.executed_orders:
self.log(f"⏭️ 跳过 {stock_code} (已持仓/已下单)")
continue
#print(row)
# 计算买入金额
max_amount = float(self.max_amount_edit.text())
raw_price = row.get('最新价', row.get('close', 0))
try:
# 清洗数据:移除逗号/货币符号,处理空值
clean_price = str(raw_price).replace(',', '').replace('¥', '').strip()
price_val = float(clean_price) if clean_price else 0.0
#print(price_val)
if price_val <= 0:
self.log(f"⚠️ {stock_code} 价格无效: {price_val}")
continue
#print(max_amount)
# 计算买入数量 (100的整数倍)
amount = min(int(max_amount // (price_val * 100)) * 100, 10000)
#print(amount)
if amount < 100:
self.log(f"⚠️ {stock_code} 买入数量不足: {amount}")
continue
# 执行买入
self.log(f"💰 尝试买入 {stock_code} {amount}股 @{price_val:.2f}")
order_id = self.trader.buy(security=stock_code, price=price_val,
amount=amount)
if order_id > 0:
self.executed_orders.add(stock_code)
self.log(f"✅ 委托成功 ID: {order_id}")
else:
self.log(f"❌ 委托失败")
except (TypeError, ValueError) as e:
self.log(f"❌ {stock_code} 价格转换失败: {raw_price},错误: {str(e)}")
continue
如果我的分享对你投资有所帮助,不吝啬给个点赞关注呗。