因此,我有一个任务,它创建一个工作目录,并在那里完成它的所有工作。任务从服务器A调用,并在worker servers上执行。
一旦任务是done/canceled.,我需要确保删除工作目录。
我添加了一个任务撤销处理程序,它看起来如下所示:
@task
def my_task(value):
task_id = current_task.request.id
work_dir = os.path.join(BASE_WORK_DIR, task_id)
os.makedirs(work_dir)
try:
# Do work...
finally:
shutil.rmtree(work_dir)
@task_revoked.connect(sender=my_task)
def my_task_revoked_handler(*args, **kwargs):
# FIXME: delete work_dir
print args
# ()
print kwargs
# {'terminated': True, 'signal': <Signal: Signal>, 'expired': False, 'sender': <@task: myapp.core.tasks.my_task>, 'signum': '15'}我的问题是,当服务器A取消任务时,我无法在撤销的处理程序中对工作目录进行清理,因为它没有task_id。
有从这个特定的信号处理程序获得任务id的方法吗?--一些其他信号有它们,我已经查看了发出它们的源,并且由于某种原因,这个信号没有提供task_id。
提供的sender任务包含一个trace_task函数:{'__trace__': <function trace_task at 0x3ee8230>},但我不知道如何使用它,因为该函数本身需要一个task_id。
任何其他想法都是受欢迎的。
发布于 2013-11-14 12:51:10
https://stackoverflow.com/questions/15633073
复制相似问题