首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Luigi -覆盖任务需要/输入

Luigi -覆盖任务需要/输入
EN

Stack Overflow用户
提问于 2018-04-24 02:22:52
回答 1查看 2K关注 0票数 5

我正在使用luigi执行一系列任务,如下所示:

代码语言:javascript
复制
class Task1(luigi.Task):
    stuff = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget('test.json')

    def run(self):
        with self.output().open('w') as f:
            f.write(stuff)


class Task2(luigi.Task):
    stuff = luigi.Parameter()

    def requires(self):
        return Task1(stuff=self.stuff)

    def output(self):
        return luigi.LocalTarget('something-else.json')

    def run(self):
        with self.output().open('w') as f:
            f.write(stuff)

当我像这样启动整个工作流时,这完全可以正常工作:

代码语言:javascript
复制
luigi.build([Task2(stuff='stuff')])

在使用luigi.build时,您还可以通过显式传递参数as per this example in the documentation来运行多个任务。

然而,在我的情况下,我也希望能够完全独立于Task2在工作流程中的参与来运行它的业务逻辑。这对于没有实现requiresas per this example的任务很有效。

我的问题是,如何将此方法既作为工作流的一部分运行,又独立运行?显然,我可以添加一个新的私有方法,比如_my_custom_run,它接受数据并返回结果,然后在run中使用这个方法,但它感觉像是应该被烘焙到框架中的东西,所以它让我觉得我误解了Luigi的最佳实践(仍在学习框架)。任何建议都很感谢,谢谢!

EN

回答 1

Stack Overflow用户

发布于 2018-11-02 09:30:02

听起来你想要dynamic requirements.使用这个例子中所示的模式,你可以读取一个配置或者传递一个带有任意数据的参数,然后根据配置中的字段只对你想要的任务进行yield

代码语言:javascript
复制
# tasks.py
import luigi
import json
import time


class Parameterizer(luigi.Task):
    params = luigi.Parameter() # Arbitrary JSON

    def output(self):
        return luigi.LocalTarget('./config.json')

    def run(self):
        with self.output().open('w') as f:
            json.dump(params, f)

class Task1(luigi.Task):
    stuff = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget('{}'.format(self.stuff[:6]))

    def run(self):
        with self.output().open('w') as f:
            f.write(self.stuff)


class Task2(luigi.Task):
    stuff = luigi.Parameter()
    params = luigi.Parameter()


    def output(self):
        return luigi.LocalTarget('{}'.format(self.stuff[6:]))

    def run(self):

        config = Parameterizer(params=self.params)
        yield config

        with config.output().open() as f:
            parameters = json.load(f)

        if parameters["runTask1"]:
            yield Task1(stuff=self.stuff)
        else:
            pass
        with self.output().open('w') as f:
            f.write(self.stuff)

if __name__ == '__main__':
    cf_json = '{"runTask1": True}'

    print("Trying to run with Task1...")
    luigi.build([Task2(stuff="Task 1Task 2", params='{"runTask1":true}')], local_scheduler=True)

    time.sleep(10)

    cf_json = '{"runTask1": False}'

    print("Trying to run WITHOUT Task1...")
    luigi.build([Task2(stuff="Task 1Did just task 2", params='{"runTask1":false}')], local_scheduler=True)

(只需调用python tasks.py即可执行)

我们可以很容易地想象将多个参数映射到多个任务,或者在允许执行各种任务之前应用自定义测试。我们也可以重写这段代码,从luigi.Config中获取参数。

还要注意来自Task2的以下控制流

代码语言:javascript
复制
    if parameters["runTask1"]:
        yield Task1(stuff=self.stuff)
    else:
        pass

在这里,我们可以运行另一个任务,或者动态调用任务,就像我们在luigi代码库的示例中看到的那样。例如:

代码语言:javascript
复制
    if parameters["runTask1"]:
        yield Task1(stuff=self.stuff)
    else:
        # self.stuff is not automatically parsed to int, so this list comp is valid
        data_dependent_deps = [Task1(stuff=x) for x in self.stuff] 
        yield data_dependent_deps

这可能比简单的run_standalone()方法更复杂,但我认为它最接近您在文档中所寻找的luigi模式。

来源:https://luigi.readthedocs.io/en/stable/tasks.html?highlight=dynamic#dynamic-dependencies

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/49987595

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档