目前,我有许多luigi任务一起排队,其中有一个简单的依赖链( a -> b -> c -> d)。首先执行d,最后执行a。a是被触发的任务。
除a之外,所有目标都返回一个luigi.LocalTarget()对象,并有一个泛型luigi.Parameter(),它是一个字符串(包含日期和时间)。在luigi中央服务器上运行(该服务器已启用历史记录)。
问题是,当我重新运行该任务a时,luigi检查历史记录,查看该特定任务以前是否运行过,如果它的状态为“已完成”,它就不会运行任务(在本例中为d),而且我不能这样做,更改字符串无助于此(添加了一个随机的微秒)。如何强制运行任务?
发布于 2016-01-06 17:21:07
首先,评论: Luigi的任务是幂等的。如果运行具有相同参数值的任务,无论运行多少次,它都必须始终返回相同的输出。所以不止一次运行它是没有意义的。这让Luigi强大起来:如果你有一项大任务,它需要花费大量的时间,并且在某个地方失败了,那么你必须从头开始运行它。如果将其拆分为较小的任务,运行它,它将失败,则只需运行管道中的其余任务即可。
运行任务时,Luigi将检查该任务的输出,以查看它们是否存在。如果没有,Luigi将检查它所依赖的任务的输出。如果它们存在,那么它将只运行当前任务并生成输出Target。如果依赖项输出不存在,那么它将运行该任务。
因此,如果要重新运行任务,必须删除其Target输出。如果要重新运行整个管道,则必须删除任务所依赖的所有任务的所有输出。
Luigi存储库中有一个正在就这一问题进行讨论。看看这句话,因为它将指向一些脚本,用于获取给定任务的输出目标并删除它们。
发布于 2019-02-14 18:15:09
我通常通过重写complete()来做到这一点。
class BaseTask(luigi.Task):
force = luigi.BoolParameter()
def complete(self):
outputs = luigi.task.flatten(self.output())
for output in outputs:
if self.force and output.exists():
output.remove()
return all(map(lambda output: output.exists(), outputs))
class MyTask(BaseTask):
def output(self):
return luigi.LocalTarget("path/to/done/file.txt")
def run(self):
with self.output().open('w') as out_file:
out_file.write('Complete')运行任务时,将按预期创建输出文件。在用force=True实例化类时,输出文件仍然存在,直到调用complete()为止。
task = MyTask()
task.run()
task.complete()
# True
new_task = MyTask(force=True)
new_task.output().exists()
# True
new_task.complete()
# False
new_task.output().exists()
# False发布于 2021-03-27 06:00:07
改进@cangers BaseTask,以便在无法删除目标时引发错误。
class BaseTask(luigi.Task):
force = luigi.BoolParameter(significant=False, default=False)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.force is True:
outputs = luigi.task.flatten(self.output())
for out in outputs:
if out.exists():
try:
out.remove()
except AttributeError:
raise NotImplementedErrorhttps://stackoverflow.com/questions/34613296
复制相似问题