最近受圈内同学启发, 实现下尾盘选股量化交易。 之前个人竞价选股、盘中选股比较多, 临近国庆,最近的行情高开低走的个股较多。 在思考要不用量化尾盘选股, 次日拉高卖出套利。
先写下大体的思路。
这里说下大概思路。 下午2点55分,借助同花顺问财的一些条件, 获取后再判断下个股的技术指标是否满足我的审美, 生成筛选后的结果保存到csv文件中。
def job():
"""定时任务主逻辑"""
logger.info("开始执行定时数据保存任务")
# 获取数据
data = get_pywencai_data()
if data is not None:
# 保存数据
filename = save_to_csv_with_date(data)
if filename:
logger.info("数据保存任务完成")
else:
logger.error("数据保存失败")
else:
logger.error("未获取到数据,跳过保存")然后写miniqmt自动交易。写一个定时任务,14:57分 读取csv文件并解析, 判断资金持仓, 进行下单操作。
这里提供代码片段思路
def execute_trading_strategy(self, data):
"""
执行交易策略
根据数据生成交易信号并下单
"""
if data is None or len(data) == 0:
logger.warning("无有效数据,跳过交易")
return False
# 检查交易连接
if not self.connected:
logger.error("交易连接不可用,无法执行策略")
return False
try:
# 查询账户资金
balance = self.check_account_balance()
if not balance:
logger.error("无法获取账户资金信息,停止执行策略")
return False
# 确定CSV中股票代码的列名
# 可能的列名: 'code', 'symbol', '股票代码', 'stock_code' 等
code_columns = ['code', 'symbol', '股票代码', 'stock_code']
code_column = None
for col in code_columns:
if col in data.columns:
code_column = col
break
if code_column is None:
logger.error("CSV文件中未找到股票代码列")
return False
logger.info(f"使用列 '{code_column}' 作为股票代码源")
# 执行下单
success_count = 0
processed_count = 0
for i, row in data.iterrows():
# 从CSV行中获取股票代码
stock_code = row[code_column]
# 确保股票代码是字符串类型
stock_code = str(stock_code).strip()
# 跳过空值或无效代码
if not stock_code or stock_code == 'nan' or stock_code == 'None':
continue
logger.info(f"处理股票代码: {stock_code}")
# 获取实时最优价格
optimal_price = self.get_optimal_price(stock_code)
if optimal_price <= 0:
logger.warning(f"无法获取 {stock_code} 的有效价格,跳过")
continue
# 根据资金情况决定下单数量
available_cash = balance['cash'] - balance['frozen_cash']
quantity = int((available_cash * 0.05) / optimal_price / 100) * 100 # 取整百股
if quantity < 100: # 最少100股
quantity = 100
logger.info(f"计划下单: 代码={stock_code}, 价格≈{optimal_price:.2f}, 数量={quantity}")
# 执行买入下单(使用自动计算的最优价格)
if self.place_order(stock_code, quantity, xtconstant.STOCK_BUY, "opponent", optimal_price):
success_count += 1
processed_count += 1
# 可选: 限制处理股票数量,避免过多交易
if processed_count >= 5: # 最多处理5只股票
logger.info("已达到最大处理股票数量限制(5)")
break
logger.info(f"交易策略执行完成: 共处理 {processed_count} 只股票, 成功 {success_count} 笔")
return success_count > 0
except Exception as e:
logger.error(f"执行交易策略时发生异常: {str(e)}")
return False
def job(self):
"""定时任务主逻辑"""
logger.info("开始执行定时下单任务")
# 获取最新的CSV文件
csv_file = self.get_latest_csv_file()
if csv_file:
# 读取数据
data = self.read_csv_data(csv_file)
# 执行交易策略
self.execute_trading_strategy(data)
else:
logger.error("未找到CSV文件,跳过下单")其实和竞价选股下单思路一样,就是得出选股结果保存csv文件中。 其它代码复用。 我只需要把时间聚焦在 选股策略上, 至于到底用同花顺、还是通达信、 还是东方财富数据都无所谓。
前提说明, 最好是低频策略,对分时要求没那么高。 如果分时高频策略, 就不要这样中转了
如果我的分享对你投资有所帮助,不吝啬给个点赞关注呗。