首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >从链任务中生成芹菜群任务

从链任务中生成芹菜群任务
EN

Stack Overflow用户
提问于 2016-11-13 07:14:16
回答 1查看 1.4K关注 0票数 1

我试着用芹菜(V4.0)来完成以下任务,

代码语言:javascript
复制
task = group([tasks1.s(), task2.s()) | generate_job_requests.s() | execute_job.map() | aggregate_result.s()
result = task.get()

以上部分工作良好,直到generate_job_requests作为和弦。但是问题从execute_job开始,它从generate_job_requests获得作业列表,为此我需要创建并行任务,然后再创建所有作业的聚合结果。

我试图验证这种任务图是否可以用芹菜?是否有任何可能的替代工作流来解决这种依赖的问题?我在文件里遗漏的任何东西。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2016-11-20 05:01:29

我在中间任务创建者中使用了类似于map的功能,它的作用就像chord,

代码语言:javascript
复制
@shared_task(ignore_result=False)
def dmap(it, callback, end_task):
    callback = subtask(callback)
    grp = group(callback.clone([arg, ]) for arg in it)
    c = (grp | end_task)
    return c()

任务流程就这样减少了,

代码语言:javascript
复制
task = (group([tasks1.s(), task2.s()) | generate_job_requests.s() | dmap.s(
        execute_job.s(), aggregate_result.s())).apply_async()

为了得到任务的最终结果,我做了几次调整,

代码语言:javascript
复制
# As we get dmap task id here
dmap_task = celery_app.AsyncResult(task.id)
dmap_result = dmap_task.get()
# Get actual aggregate_result task id
aggr_res_task_id = dmap_result[0][0]
result = celery_app.AsyncResult(aggr_res_task_id)
# Here we receive actual output of overall task
result.get()

我提到了answer

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/40571477

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档