首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >芹菜时间统计-任务名称

芹菜时间统计-任务名称
EN

Stack Overflow用户
提问于 2016-05-30 22:22:35
回答 1查看 2K关注 0票数 6

我有一些相当繁忙的芹菜队列,但不确定哪些任务是有问题的。是否有一种方法可以聚合结果,以确定哪些任务需要很长时间?我有10-20名员工在2-4台服务器上。

使用redis作为代理,并将其作为结果后端。我注意到了Flower上的繁忙队列,但不知道如何将每个任务的时间统计数据聚合起来。

EN

回答 1

Stack Overflow用户

发布于 2016-06-04 08:35:47

方法1:

如果您在启动芹菜工人时启用了日志记录,则他们记录每个任务所需的时间。

代码语言:javascript
复制
$ celery worker -l info -A your_app --logfile celery.log

这将生成这样的日志

代码语言:javascript
复制
[2016-06-04 13:21:30,749: INFO/MainProcess] Task sig.add[a8b648eb-9674-44f0-90bd-71cfebe22f2f] succeeded in 0.00979363399983s: 3
[2016-06-04 13:21:30,973: INFO/MainProcess] Received task: sig.add[7fd422e6-8f48-4dd2-90de-e213afbedc38]
[2016-06-04 13:21:30,982: WARNING/Worker-2] called by small_task. LOL {'signal': <Signal: Signal>, 'result': 3, 'sender': <@task: sig.add of tasks:0x7fdf33146c50>}

您可以过滤具有succeeded in的行。使用、[:作为分隔符对这些行进行拆分,打印任务名称和每个行所占用的时间,然后对所有行进行排序。

代码语言:javascript
复制
$ grep ' succeeded in ' celery.log  | awk -F'[ :\[]' '{print $9, $13}' | sort 
awk: warning: escape sequence `\[' treated as plain `['
sig.add 0.00775764500031s
sig.add 0.00802627899975s
sig.foo 12.00813863099938s
sig.foo 15.00871706100043s
sig.foo 12.00979363399983s

如您所见,add非常快& foo非常慢。

方法2:

芹菜有task_prerun_handlertask_postrun_handler信号,在任务之前/之后运行。您可以连接函数,它将跟踪时间,然后在某个地方记录时间。

代码语言:javascript
复制
from time import time
from celery.signals import task_prerun, task_postrun


tasks = {}
task_avg_time = {}
Average = namedtuple('Average', 'cum_avg count')


@task_prerun.connect
def task_prerun_handler(signal, sender, task_id, task, args, kwargs):
    tasks[task_id] = time()


@task_postrun.connect
def task_postrun_handler(signal, sender, task_id, task, args, kwargs, retval, state):
    try:
        cost = time() - tasks.pop(task_id)
    except KeyError:
        cost = None

    if not cost:
        return

    try:
        cum_avg, count = task_avg_time[task.name]
        new_count = count + 1
        new_avg = ((cum_avg * count) + cost) / new_count
        task_avg_time[task.name] = Average(new_avg, new_count)
    except KeyError:
        task_avg_time[task.name] = Average(cost, 1)

    # write to redis: task_avg_time

参考资料:https://stackoverflow.com/a/31731622/2698552

票数 5
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/37534311

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档