我试着用芹菜(V4.0)来完成以下任务,
task = group([tasks1.s(), task2.s()) | generate_job_requests.s() | execute_job.map() | aggregate_result.s()
result = task.get()以上部分工作良好,直到generate_job_requests作为和弦。但是问题从execute_job开始,它从generate_job_requests获得作业列表,为此我需要创建并行任务,然后再创建所有作业的聚合结果。
我试图验证这种任务图是否可以用芹菜?是否有任何可能的替代工作流来解决这种依赖的问题?我在文件里遗漏的任何东西。
发布于 2016-11-20 05:01:29
我在中间任务创建者中使用了类似于map的功能,它的作用就像chord,
@shared_task(ignore_result=False)
def dmap(it, callback, end_task):
callback = subtask(callback)
grp = group(callback.clone([arg, ]) for arg in it)
c = (grp | end_task)
return c()任务流程就这样减少了,
task = (group([tasks1.s(), task2.s()) | generate_job_requests.s() | dmap.s(
execute_job.s(), aggregate_result.s())).apply_async()为了得到任务的最终结果,我做了几次调整,
# As we get dmap task id here
dmap_task = celery_app.AsyncResult(task.id)
dmap_result = dmap_task.get()
# Get actual aggregate_result task id
aggr_res_task_id = dmap_result[0][0]
result = celery_app.AsyncResult(aggr_res_task_id)
# Here we receive actual output of overall task
result.get()我提到了answer
https://stackoverflow.com/questions/40571477
复制相似问题