我正在使用luigi执行一系列任务,如下所示:
class Task1(luigi.Task):
stuff = luigi.Parameter()
def output(self):
return luigi.LocalTarget('test.json')
def run(self):
with self.output().open('w') as f:
f.write(stuff)
class Task2(luigi.Task):
stuff = luigi.Parameter()
def requires(self):
return Task1(stuff=self.stuff)
def output(self):
return luigi.LocalTarget('something-else.json')
def run(self):
with self.output().open('w') as f:
f.write(stuff)当我像这样启动整个工作流时,这完全可以正常工作:
luigi.build([Task2(stuff='stuff')])在使用luigi.build时,您还可以通过显式传递参数as per this example in the documentation来运行多个任务。
然而,在我的情况下,我也希望能够完全独立于Task2在工作流程中的参与来运行它的业务逻辑。这对于没有实现requires、as per this example的任务很有效。
我的问题是,如何将此方法既作为工作流的一部分运行,又独立运行?显然,我可以添加一个新的私有方法,比如_my_custom_run,它接受数据并返回结果,然后在run中使用这个方法,但它感觉像是应该被烘焙到框架中的东西,所以它让我觉得我误解了Luigi的最佳实践(仍在学习框架)。任何建议都很感谢,谢谢!
发布于 2018-11-02 09:30:02
听起来你想要dynamic requirements.使用这个例子中所示的模式,你可以读取一个配置或者传递一个带有任意数据的参数,然后根据配置中的字段只对你想要的任务进行yield。
# tasks.py
import luigi
import json
import time
class Parameterizer(luigi.Task):
params = luigi.Parameter() # Arbitrary JSON
def output(self):
return luigi.LocalTarget('./config.json')
def run(self):
with self.output().open('w') as f:
json.dump(params, f)
class Task1(luigi.Task):
stuff = luigi.Parameter()
def output(self):
return luigi.LocalTarget('{}'.format(self.stuff[:6]))
def run(self):
with self.output().open('w') as f:
f.write(self.stuff)
class Task2(luigi.Task):
stuff = luigi.Parameter()
params = luigi.Parameter()
def output(self):
return luigi.LocalTarget('{}'.format(self.stuff[6:]))
def run(self):
config = Parameterizer(params=self.params)
yield config
with config.output().open() as f:
parameters = json.load(f)
if parameters["runTask1"]:
yield Task1(stuff=self.stuff)
else:
pass
with self.output().open('w') as f:
f.write(self.stuff)
if __name__ == '__main__':
cf_json = '{"runTask1": True}'
print("Trying to run with Task1...")
luigi.build([Task2(stuff="Task 1Task 2", params='{"runTask1":true}')], local_scheduler=True)
time.sleep(10)
cf_json = '{"runTask1": False}'
print("Trying to run WITHOUT Task1...")
luigi.build([Task2(stuff="Task 1Did just task 2", params='{"runTask1":false}')], local_scheduler=True)(只需调用python tasks.py即可执行)
我们可以很容易地想象将多个参数映射到多个任务,或者在允许执行各种任务之前应用自定义测试。我们也可以重写这段代码,从luigi.Config中获取参数。
还要注意来自Task2的以下控制流
if parameters["runTask1"]:
yield Task1(stuff=self.stuff)
else:
pass在这里,我们可以运行另一个任务,或者动态调用任务,就像我们在luigi代码库的示例中看到的那样。例如:
if parameters["runTask1"]:
yield Task1(stuff=self.stuff)
else:
# self.stuff is not automatically parsed to int, so this list comp is valid
data_dependent_deps = [Task1(stuff=x) for x in self.stuff]
yield data_dependent_deps这可能比简单的run_standalone()方法更复杂,但我认为它最接近您在文档中所寻找的luigi模式。
来源:https://luigi.readthedocs.io/en/stable/tasks.html?highlight=dynamic#dynamic-dependencies
https://stackoverflow.com/questions/49987595
复制相似问题