首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在luigi中自动实例化?

在luigi中自动实例化?
EN

Stack Overflow用户
提问于 2016-06-03 07:11:50
回答 1查看 208关注 0票数 0

luigi.Task.run中,我们需要将对象序列化为文件/数据库/等等:

代码语言:javascript
复制
MyTask(luigi.Task):
    param = luigi.Parameter()
    def requires(self):
        AnotherTask(self.param)
    def output(self):
        luigi.LocalTarget('out_{}'.format(self.param))
    def run(self):
        with self.input().open('r') as infile:
            # instantiate incoming data
            indata = pd.read_csv(infile, index=False, parse_date=...)
        # my process
        with self.output().open('w') as outfile:
            # serialize outgoing data
            outdata.to_csv(outfile, index=False, ...)

但是为了方便起见,我想跳过pd.read_csv(...)片段,因为在重用任务时,我必须编写相同的实例化步骤。

有没有像这样在luigi中自动实例化的方法?:

代码语言:javascript
复制
AnotherTask(luigi.Task):
    param = luigi.Parameter()
    def requires(self):
        ...
    def output(self):
        ...
    def _instantiate(self):
        with self.output().open('r') as outfile:
            outdata = pd.read_csv(outfile, index=False, parse_date=...)
        return outdata

MyTask(luigi.Task):
    param = luigi.Parameter()
    def requires(self):
        AnotherTask(self.param)
    def output(self):
        luigi.LocalTarget('out_{}'.format(self.param))
    def run(self):
        # automatic instantiation via AnotherTask._instantiate()
        indata = self.input()
        # my process
        outdata = indata.someprocess()
        with self.output().open('w') as outfile:
            # serialize outgoing data
            outdata.to_csv(outfile, index=False, ...)
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2016-06-04 17:32:57

自我回答:

代码语言:javascript
复制
def getinstances(struct):
    if isinstance(struct, luigi.Task):
        return struct.instantiate()
    elif isinstance(struct, dict):
        return {k: getinstances(v) for k, v in six.iteritems(struct)}
    else:
        # Remaining case: assume r is iterable...
        try:
            s = list(struct)
        except TypeError:
            raise Exception('Cannot map %s to Task/dict/list' % str(struct))
    return [getinstances(r) for r in s]

class MyParentTask(luigi.Task):
    def requires(self):...
    def output(self):...
    def run(self):...
    def instantiate(self):
        with self.output().open() as outfile:
            reader = csv.reader(outfile)
            outdata = [row for row in reader]
        return outdata

class MyChildTask(luigi.Task):
    def requires(self):
        return MyParentTask()
    def output(self):...
    def run(self):
        indata = getinstances(self.requires())
        ...
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/37608121

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档