首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >当没有背景对象时,如何使用Starlette的背景任务?

当没有背景对象时,如何使用Starlette的背景任务?
EN

Stack Overflow用户
提问于 2020-05-26 21:26:06
回答 2查看 3.1K关注 0票数 2

我希望现在不要再吃芹菜。在Starlette的文档中,他们给出了添加后台任务的两种方法:

经石墨烯:https://www.starlette.io/graphql/

代码语言:javascript
复制
class Query(graphene.ObjectType):
    user_agent = graphene.String()

    def resolve_user_agent(self, info):
        """
        Return the User-Agent of the incoming request.
        """
        user_agent = request.headers.get("User-Agent", "<unknown>")
        background = info.context["background"]
        background.add_task(log_user_agent, user_agent=user_agent)
        return user_agent

通过JSON响应:https://www.starlette.io/background/

代码语言:javascript
复制
async def signup(request):
    data = await request.json()
    username = data['username']
    email = data['email']
    task = BackgroundTask(send_welcome_email, to_address=email)
    message = {'status': 'Signup successful'}
    return JSONResponse(message, background=task)

有没有人知道有什么方法可以在Starlette和Ariadne的背景中添加任务呢?我无法在我的解析器中返回一个JSONResponse,并且我没有访问info.context“背景”的权限。我唯一附在上下文中的是我的请求对象。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2022-05-12 12:52:54

上面标记为解决方案的方法对我不起作用,因为当您使用子类BackgroundTask的中间件时,BaseHTTPMiddleware不能正常工作,请参见这里:

https://github.com/encode/starlette/issues/919

在我的例子中,任务基本上不是在后台运行的,而是等待完成的,我也没有使用Ariadne,但是这应该可以让您在后台执行任务。

编辑:这对我有用。

代码语言:javascript
复制
executor = ProcessPoolExecutor()

main.executor.submit(
bg_process_funcs,
export_file_format,
export_headers,
data,
alert_type,
floor_subtitle,
date_subtitle,
pref_datetime,
pref_timezone,
export_file_name,
export_limit,)

executor.shutdown()
logger.info("Process Pool Shutdown")
票数 1
EN

Stack Overflow用户

发布于 2020-06-02 16:43:35

解决了!

星体中间件:

代码语言:javascript
复制
class BackgroundTaskMiddleware(BaseHTTPMiddleware):
    async def dispatch(
            self, request: Request, call_next: RequestResponseEndpoint
    ) -> Response:
        request.state.background = None
        response = await call_next(request)
        if request.state.background:
            response.background = request.state.background
        return response

Ariadne Resolver:

代码语言:javascript
复制
@query.field("getUser")
@check_authentication
async def resolve_get_user(user, obj, info):
    task = BackgroundTasks()
    task.add_task(test_func)
    task.add_task(testing_func_two, "I work now")
    request = info.context["request"]
    request.state.background = task
    return True


async def test_func():
    await asyncio.sleep(10)
    print("once!!")


async def testing_func_two(message: str):
    print(message)

函数仍然同步执行,但因为它们是后台任务,所以我不太担心。

这里有更多的讨论。

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62031695

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档