首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >芹菜链在完成组任务前执行子任务

芹菜链在完成组任务前执行子任务
EN

Stack Overflow用户
提问于 2014-09-04 00:57:12
回答 1查看 1.2K关注 0票数 2

我要做的是执行一组任务,并将组中每个任务的结果发送到另一个任务,然后将所有这些任务的结果发送到一个最终任务。

例如:

代码语言:javascript
复制
jobgroup = (
           group(tasks.task1.s(p) for p in params) |
           tasks.dmap.s(tasks.task2.s())
          )

这个部分对我来说没问题,但我遇到的问题是,如果我想把所有的结果都写进一个最后的任务中。

代码语言:javascript
复制
mychain = chain(jobgroup | tasks.task3.s())

我看到的是,task3()正在被调用,而来自task2的任务仍然处于挂起状态,方法是在进入状态时打印出它们(下面的片段是从task2函数中打印出来的:

代码语言:javascript
复制
@task()
def task3(input):
   for item in input.results:
      logger.info(item.status)

测井结果

代码语言:javascript
复制
[2014-09-04 10:48:41,905: INFO/Worker-7] tasks.task3[27053688-3c5c-4ca5-975f-356a66d55364]: PENDING
[2014-09-04 10:48:41,905: INFO/Worker-7] tasks.task3[27053688-3c5c-4ca5-975f-356a66d55364]: PENDING

那么,我如何设置它,以便在作业组中的所有任务完成后才调用task3?

EN

回答 1

Stack Overflow用户

发布于 2014-09-04 19:21:44

和弦

A chord consists of a header and a body. The header is a group of tasks that MUST COMPLETE before the callback is called. A chord is essentially a callback for a group of tasks.

示例:

代码语言:javascript
复制
>>> from celery import chord
>>> res = chord([add.s(2, 2), add.s(4, 4)])(sum_task.s())

您可以使用chord来完成任务。

代码语言:javascript
复制
from celery import chord, group, chain

task_1 = group(tasks.task1.s(p) for p in params)
task_2 = group(tasks.dmap.s(tasks.task2.s())
task_3 = tasks.task3.s()

workflow = chord(chain([task_1, task_2]))(task_3)
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/25655802

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档