首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用aiogram创建后台进程

使用aiogram创建后台进程
EN

Stack Overflow用户
提问于 2021-05-21 21:17:47
回答 1查看 555关注 0票数 1

我正尝试在我正在开发的使用aiogram的电报机器人中发送加密货币的价格警报。我遇到的问题是,我不确定如何将函数作为后台、非阻塞线程启动,然后继续启动调度程序。我知道如何使用标准的同步电报机器人来做到这一点,但我不知道我应该如何使用aiogram。我读到我可以使用dp.loop.create_task here,但这抛出了一个错误Nonetype has no attribute create_task。下面是我尝试用来执行这些线程的代码:

代码语言:javascript
复制
print('Starting watchlist process, this needs to run as a non blocking daemon...')
dp.loop.create_task(wl.start_process())
print('Starting broadcaster, this needs to run as a non blocking daemon ... ')
dp.loop.create_task(broadcaster())
print('Starting the bot ...')
executor.start_polling(dp, skip_updates=True)

我只需要在后台运行wl.start_processbroadcaster函数。我该如何做到这一点?

这是start_process

代码语言:javascript
复制
async def start_process(self):
    """
    Start the watchlist process.
    :return:
    """
    threading.Thread(target=self.do_schedule).start()
    await self.loop_check_watchlist()

这是broadcaster

代码语言:javascript
复制
async def broadcaster():

count = 0
while True:
    uid_alerts = que.__next__()
    if uid_alerts:
        for i in uid_alerts:
            uid = i[0]
            alert = i[1]
            try:
                if await send_message(uid, alert):
                    count += 1
                await asyncio.sleep(.05)  # 20 messages per second (Limit: 30 messages per second)
            finally:
                log.info(f"{count} messages successful sent.")
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-05-24 00:39:35

我为你创建了以下工作示例,它演示了如何在机器人启动时创建后台任务,以及如何在用户命令的帮助下创建后台任务。

代码语言:javascript
复制
from aiogram import Dispatcher, executor, Bot
from aiogram import types
import asyncio
import logging

TOKEN = "your token!"


async def background_on_start() -> None:
    """background task which is created when bot starts"""
    while True:
        await asyncio.sleep(5)
        print("Hello World!")


async def background_on_action() -> None:
    """background task which is created when user asked"""
    for _ in range(20):
        await asyncio.sleep(3)
        print("Action!")


async def background_task_creator(message: types.Message) -> None:
    """Creates background tasks"""
    asyncio.create_task(background_on_action())
    await message.reply("Another one background task create")


async def on_bot_start_up(dispatcher: Dispatcher) -> None:
    """List of actions which should be done before bot start"""
    asyncio.create_task(background_on_start())  # creates background task


def create_bot_factory() -> None:
    """Creates and starts the bot"""
    dp = Dispatcher(Bot(token=TOKEN))
    # bot endpoints block:
    dp.register_message_handler(
        background_task_creator,
        commands=['start']
    )
    # start bot
    executor.start_polling(dp, skip_updates=True, on_startup=on_bot_start_up)


if __name__ == '__main__':
    logging.basicConfig(level=logging.INFO)
    create_bot_factory()
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67637631

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档