首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >任务选项停止任务在芹菜chord中运行

任务选项停止任务在芹菜chord中运行
EN

Stack Overflow用户
提问于 2017-07-27 04:55:42
回答 1查看 603关注 0票数 0

我正在尝试将现有的芹菜群组调用转换为chord,以防止死锁。前面的代码有重试次数和过期时间。我设法让chord在没有这些设置的情况下工作,但当我尝试应用这些设置时,我看不到正在运行的任务。我在文档中没有看到任何关于将相同的设置作为一个整体应用于和弦的内容。我运行的是celery版本3.1.6。

前面的代码:

代码语言:javascript
复制
jobs = group([reset_device.s(topoid, dev_list[i], 
              waittime_list[i], skipflag) for i in range(len(dev_list))]
              ).apply_async(expires=waittime, retry=True, retry_policy={
                                                    'max_retries': 3,
                                                    'interval_start': 0.5,
                                                    'interval_step': 0.2,
                                                    'interval_max': 0.2})
results = jobs.join_native(timeout=waittime + 600, propagate=True)

工作弦(不带设置):

代码语言:javascript
复制
jobs = chord([reset_device.s(topoid, dev_list[i], 
              waittime_list[i], skipflag) for i in range(len(dev_list))])(callback)

非工作和弦#1:

代码语言:javascript
复制
jobs = chord([reset_device.s(topoid, dev_list[i], waittime_list[i],
             skipflag).set(expires=datetime.now() + timedelta(seconds=waittime)).set(retry=True).set(retry_policy=retry_policy)
              for i in range(len(dev_list))])(callback)

非工作和弦#2

代码语言:javascript
复制
jobs = chord([reset_device.subtask(args=(topoid, dev_list[i], waittime_list[i],skipflag), 
              expires=datetime.now()+timedelta(seconds=waittime), retry=True, retry_policy=retry_policy) 
             for i in range(len(dev_list))])(callback)

在#1和#2两种情况下,chord中的任务似乎都没有运行。如何为chord中调用的每个任务应用过期时间和重试?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-08-08 10:03:37

我弄清楚了,这是一个混合的问题。

第一个问题是,expires字段不接受chord(可能还有组和链)内的整数,只接受datetime对象,尽管文档没有做任何区分。这已经在更高的版本中修复了,我使用3.1.25进行了测试,并能够验证修复。

第二个问题是,celery 3.1.6没有记录和弦中的错误(我认为组和链也是如此)。这也已经修复了,我在3.1.25中进行了测试,并能够看到失败。

第三个问题与错误消息有关:

代码语言:javascript
复制
[2017-08-07 18:39:56,043: ERROR/Worker-5] Chord '98246849-0d5d-4be2-85e3-3fc08e90011d' raised: TaskRevokedError(u'expired',)
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/celery/app/builtins.py", line 90, in unlock_chord
ret = j(timeout=3.0, propagate=propagate)
  File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 691, in join_native
raise value
TaskRevokedError: expired

这是因为时区不正确。我使用了datetime.now()而不是datetime.utcnow(),它解决了这个问题,并在3.1.6中工作。

或者,我可以设置celery配置CELERY_ENABLE_UTC = False,它在默认情况下设置为True。这让我感到困惑,因为我们将配置CELERY_TIMEZONE设置为本地时间。expires字段与datetime对象一起使用时,将根据CELERY_ENABLE_UTC设置的值使用本地时间或UTC值。我建议保持两个配置设置相同。

足够有趣的是,创建了回调函数并进行轮询,以查看chord是否已完成,尽管chord任务从未执行过,并且它将永远留在那里。我相信这个may have been fixed in celery 4.1

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/45337260

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档