首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >芹菜-转换成字典的任务签名,不能调用chord -在chord的标题组中使用任务链

芹菜-转换成字典的任务签名,不能调用chord -在chord的标题组中使用任务链
EN

Stack Overflow用户
提问于 2020-03-23 22:36:00
回答 1查看 1K关注 0票数 0

遵循celery chord with group of chains with exception in chain上的答案

我得到了一个错误,芹菜似乎将任务签名链转换为字典,导致了下面的错误。

芹菜- 4.4红葡萄酒- 3.3.11

代码-来自@bigzbig

代码语言:javascript
复制
@celery_app.task
def task_one():
    return 'OKIDOKI'

@celery_app.task
def task_two(str):
    return f'{str} YOUPI'

@celery_app.task
def task_three(str):
    return f'{str} MAKAPAKA'

@celery_app.task
def task_exception(str):
    raise KeyError(f'{str} Ups')

@celery_app.task(ignore_result=True)
def task_wrapper(*args, **kwargs):
    if 'job' in kwargs:
        kwargs['job'].apply()

@celery_app.task(ignore_result=True)
def callback_task(*args, **kwargs):
    return (args, kwargs, 'Yeah')

def test():
    chains = []

    tasks = [
        task_one.s(),
        task_two.s(),
        task_exception.s(),
        task_three.s(),
    ]
    chains.append(task_wrapper.s(job=chain(*tasks)))

    tasks = [
        task_one.s(),
        task_two.s(),
        task_three.s(),
    ]
    chains.append(task_wrapper.s(job=chain(*tasks)))

    chord(chains, callback_task.s()).apply_async()

打印kwargs的工作

代码语言:javascript
复制
celeryworker2_1  | [2020-03-23 22:31:01,646: WARNING/ForkPoolWorker-1] {'task': 'celery.chain', 'args': [], 'kwargs': {'tasks': [{'task': 'portfolio.tasks.task_one', 'args': [], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'chord_size': None, 'immutable': False}, {'task': 'portfolio.tasks.task_two', 'args': [], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'chord_size': None, 'immutable': False}, {'task': 'portfolio.tasks.task_exception', 'args': [], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'chord_size': None, 'immutable': False}, {'task': 'portfolio.tasks.task_three', 'args': [], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'immutable': False, 'chord_size': None}]}, 'options': {}, 'subtask_type': 'chain', 'immutable': False, 'chord_size': None}
celeryworker_1   | [2020-03-23 22:31:01,650: WARNING/ForkPoolWorker-1] {'task': 'celery.chain', 'args': [], 'kwargs': {'tasks': [{'task': 'portfolio.tasks.task_one', 'args': [], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'chord_size': None, 'immutable': False}, {'task': 'portfolio.tasks.task_two', 'args': [], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'chord_size': None, 'immutable': False}, {'task': 'portfolio.tasks.task_three', 'args': [], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'immutable': False, 'chord_size': None}]}, 'options': {}, 'subtask_type': 'chain', 'immutable': False, 'chord_size': None}

误差

代码语言:javascript
复制
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/celery/app/trace.py", line 385, in trace_task
R = retval = fun(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/celery/app/trace.py", line 650, in __protected_call__
return self.run(*args, **kwargs)
File "/app/portfolio/tasks.py", line 241, in task_wrapper
kwargs['job'].apply()
AttributeError: 'dict' object has no attribute 'apply'
EN

回答 1

Stack Overflow用户

发布于 2020-08-20 15:58:15

您正在尝试将签名传递给另一个任务。所以芹菜把它变成了切块。你可以从dict建立签名。

"task_always_eager=True“被设置为在同一进程下运行,而不是在不同的芹菜任务下运行,因为执行链本身是一个不同的任务。这样,您将保持链接或link_error是任何给定的。

代码语言:javascript
复制
from celery.canvas import Signature
callback = Signature(kwargs['job'])
callback.delay(task_always_eager=True)
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60822478

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档