我正在使用luigi.build方法尝试luigi的多处理能力。但是我在执行的时候遇到了一些库错误。
for next in self._add(item,is_complete):文件请求行604,_add self._validate_dependency(d)文件异常行622,_validate_dependency raise Exception('requires() in is_complete objects')
这是我试图实现目标的一段代码。
import luigi
class TaskOne(luigi.Task):
custid= luigi.Parameter()
def requires(self):
pass
def output(self):
return luigi.LocalTarget("logs/"+str(self.custid)+"_success")
def run(self):
with self.output().open('w') as f:
f.write("%s\n" % '')
class TaskTwo(luigi.Task):
def requires(self):
customersList = ['A','B', 'C', 'D', 'E', 'F', 'G', 'H', 'I']
yield luigi.build([TaskOne(custid=cust_id) for cust_id in customersList], workers=2)
def output(self):
return luigi.LocalTarget("logs/overall_success.txt")
def run(self):
with self.output().open('w') as f:
f.write("%s\n" % "success")
if __name__ == '__main__':
luigi.run()
========================================================================
发布于 2017-08-12 05:52:34
为什么你认为你需要内置requires?
class TaskTwo(luigi.Task):
def requires(self):
customersList = ['A','B', 'C', 'D', 'E', 'F', 'G', 'H', 'I']
return [TaskOne(custid=cust_id) for cust_id in customersList]如果需要多个工作进程,可以在启动管道时在命令行中指定。
luigi --module your_module TaskTwo --workers 2
发布于 2019-03-29 23:29:27
requires()必须返回luigi.Task对象或luigi.Task对象列表。但是,luigi.build()不返回任何内容。您不需要调用luigi.build来显式地运行任务,因为Luigi自己处理运行需求。https://luigi.readthedocs.io/en/stable/tasks.html中概述的示例任务显示了它应该如何工作的基本范式。
此外,您应该从TaskOne中省略requires()。如果它没有依赖项,那么就不需要定义它。
https://stackoverflow.com/questions/45633370
复制相似问题