遵循celery chord with group of chains with exception in chain上的答案
我得到了一个错误,芹菜似乎将任务签名链转换为字典,导致了下面的错误。
芹菜- 4.4红葡萄酒- 3.3.11
代码-来自@bigzbig
@celery_app.task
def task_one():
return 'OKIDOKI'
@celery_app.task
def task_two(str):
return f'{str} YOUPI'
@celery_app.task
def task_three(str):
return f'{str} MAKAPAKA'
@celery_app.task
def task_exception(str):
raise KeyError(f'{str} Ups')
@celery_app.task(ignore_result=True)
def task_wrapper(*args, **kwargs):
if 'job' in kwargs:
kwargs['job'].apply()
@celery_app.task(ignore_result=True)
def callback_task(*args, **kwargs):
return (args, kwargs, 'Yeah')
def test():
chains = []
tasks = [
task_one.s(),
task_two.s(),
task_exception.s(),
task_three.s(),
]
chains.append(task_wrapper.s(job=chain(*tasks)))
tasks = [
task_one.s(),
task_two.s(),
task_three.s(),
]
chains.append(task_wrapper.s(job=chain(*tasks)))
chord(chains, callback_task.s()).apply_async()打印kwargs的工作
celeryworker2_1 | [2020-03-23 22:31:01,646: WARNING/ForkPoolWorker-1] {'task': 'celery.chain', 'args': [], 'kwargs': {'tasks': [{'task': 'portfolio.tasks.task_one', 'args': [], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'chord_size': None, 'immutable': False}, {'task': 'portfolio.tasks.task_two', 'args': [], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'chord_size': None, 'immutable': False}, {'task': 'portfolio.tasks.task_exception', 'args': [], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'chord_size': None, 'immutable': False}, {'task': 'portfolio.tasks.task_three', 'args': [], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'immutable': False, 'chord_size': None}]}, 'options': {}, 'subtask_type': 'chain', 'immutable': False, 'chord_size': None}
celeryworker_1 | [2020-03-23 22:31:01,650: WARNING/ForkPoolWorker-1] {'task': 'celery.chain', 'args': [], 'kwargs': {'tasks': [{'task': 'portfolio.tasks.task_one', 'args': [], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'chord_size': None, 'immutable': False}, {'task': 'portfolio.tasks.task_two', 'args': [], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'chord_size': None, 'immutable': False}, {'task': 'portfolio.tasks.task_three', 'args': [], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'immutable': False, 'chord_size': None}]}, 'options': {}, 'subtask_type': 'chain', 'immutable': False, 'chord_size': None}误差
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/celery/app/trace.py", line 385, in trace_task
R = retval = fun(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/celery/app/trace.py", line 650, in __protected_call__
return self.run(*args, **kwargs)
File "/app/portfolio/tasks.py", line 241, in task_wrapper
kwargs['job'].apply()
AttributeError: 'dict' object has no attribute 'apply'发布于 2020-08-20 15:58:15
您正在尝试将签名传递给另一个任务。所以芹菜把它变成了切块。你可以从dict建立签名。
"task_always_eager=True“被设置为在同一进程下运行,而不是在不同的芹菜任务下运行,因为执行链本身是一个不同的任务。这样,您将保持链接或link_error是任何给定的。
from celery.canvas import Signature
callback = Signature(kwargs['job'])
callback.delay(task_always_eager=True)https://stackoverflow.com/questions/60822478
复制相似问题