
今天刚从老家出来, 累惨的一天。上午在高速上还有点担心手头的持仓,虽然持仓轻心态比较平和,但再小的仓位也是money。 趁着间隙看了一眼,居然红的。 没时间看,感觉非主流概念,就手动清仓了, 这样的行情红盘还算不错。 比较坑的点, 自己的qmt止盈止损冲高回落策略 居然没执行, qmt估计周末抽风了,看来后续自己写一个 自动重启程序, 如果量化程序正常运行还有肉, 自己看到手动操作只剩肉沫了。
上午到家后,匆匆吃饭后。赶紧去菜地看一个月没打理怎么样, 各种菜开始开花了。 折腾了一下午菜品分类用蛇皮袋统统收割回家。 最近青菜白菜自由了, 在老家天天吃肉正好换换清淡口味。
今天写点什么呢, 星球同学问了下有没有飞书群机器人消息通知的例子, 之前写过 钉钉、 企业微信、 邮箱的例子, 那就写一个例子 凑个数。 毕竟 飞书发送消息 和钉钉什么的比较类似。
简单介绍下代码:
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
import pywencai
import requests
from datetime import datetime
from chinese_calendar import is_workday
# 飞书机器人配置
# 获取方式:飞书群设置 -> 群机器人 -> 添加机器人 -> 自定义机器人
FEISHU_WEBHOOK = "https://open.feishu.cn/open-apis/bot/v2/hook/xxxxx" # 请替换为你的飞书Webhook地址
def get_auction_data():
"""获取竞价排名前10的股票数据"""
try:
query = "非ST,竞价涨停" # 改成自己的策略
# 注意:pywencai 返回的数据可能包含停牌或无数据项,需视情况处理
df = pywencai.get(query=query, sort_key='竞价涨停封单金额', sort_order='desc')
return df[['股票代码', '股票简称']].head(10)
except Exception as e:
print(f"数据获取失败: {str(e)}")
return None
def send_feishu_msg(content):
"""发送消息到飞书"""
headers = {"Content-Type": "application/json"}
# 飞书卡片消息格式
data = {
"msg_type": "interactive",
"card": {
"config": {
"wide_screen_mode": True
},
"header": {
"title": {
"tag": "plain_text",
"content": "🕘 股票监控"
},
"template": "blue"
},
"elements": [
{
"tag": "markdown",
"content": content
}
]
}
}
try:
response = requests.post(FEISHU_WEBHOOK, json=data, headers=headers)
print(f"消息发送状态: {response.status_code}, 返回信息: {response.text}")
except Exception as e:
print(f"发送消息异常: {str(e)}")
def job():
"""定时任务主逻辑"""
# 检查是否为工作日(注释掉则每天都发)
# if not is_workday(datetime.now()):
# return
data = get_auction_data()
if data is not None:
markdown_content = ""
for _, row in data.iterrows():
# 格式:序号. **股票名称** (股票代码)
markdown_content += f"**{row['股票简称']}** ({row['股票代码']})\n"
# 打印日志调试
print("准备发送数据:\n", markdown_content)
# 发送消息
send_feishu_msg(markdown_content)
if __name__ == "__main__":
scheduler = BlockingScheduler(timezone="Asia/Shanghai")
scheduler.add_job(
job,
CronTrigger(day_of_week='mon-fri', hour=19, minute=19)
)
print("定时任务启动,等待执行...")
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()