首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在luigi中处理很多参数

在luigi中处理很多参数
EN

Stack Overflow用户
提问于 2016-10-12 10:39:48
回答 1查看 1.2K关注 0票数 2

在我的许多项目中,我使用罗吉作为管道工具。这让我想到了使用它来实现参数搜索。标准的luigi.file.LocalTarget对于处理参数有一种非常天真的方法,在文档的示例中也显示了这一点:

代码语言:javascript
复制
def output(self):
    return luigi.LocalTarget("data/artist_streams_%s.tsv" % self.date_interval)

即,参数保存在文件名中。这使得检查某个参数组合是否已经计算很容易。一旦任务的参数变得更复杂,这就会变得很混乱。

下面是参数搜索的非常简单的想法:

代码语言:javascript
复制
import luigi
class Sum(luigi.Task):

    long_ = luigi.Parameter()
    list_ = luigi.Parameter()
    of = luigi.Parameter()
    parameters = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget("task{}_{}_{}_{}.txt".format(self.long_,
                                                              self.list_,
                                                              self.of,
                                                              self.parameters))

    def run(self):

        sum_ = self.long_ + self.list_ + self.of + self.parameters
        with self.output().open('w') as out_file:
            out_file.write(str(sum_))


class ParameterSearch(luigi.Task):

    def requires(self):

        list_of_parameter_combinations = [
            {
                "long_" : 1,
                "list_" : 2,
                "of" : 3,
                "parameters" : 4

            },{
                "long_" : 5,
                "list_" : 6,
                "of" : 7,
                "parameters" : 8
            }
        ]

        for pc in list_of_parameter_combinations:
            yield Sum(**pc)

当然,在这个例子中,所有四个参数都可以在文件名中编码,但是它不需要太多的幻想,这种方法可以到达边界。例如,考虑类似数组的参数。

我的后续想法是将参数和结果存储在某种信封对象中,然后将其保存为目标。然后,文件名可以是用于第一次模糊搜索的参数的某种哈希。

有信封类

代码语言:javascript
复制
class Envelope(object):

    @classmethod
    def hashify(cls, params):
        return hash(frozenset(params.items()))

    def __init__(self, result, **params):

        self.params = {}
        for k in params:
            self.params[k] = params.get(k)

    def hash(self):
        return Envelope.hashify(self.params)

然后是新的目标,它增强了LocalTarget,并能够检查信封中的所有参数是否匹配:

代码语言:javascript
复制
class EnvelopedTarget(luigi.file.LocalTarget):

    fs = luigi.file.LocalFileSystem()

    def __init__(self, params, path=None, format=None, is_tmp=False):
        self.path = path
        self.params = params

        if format is None:
            format = luigi.file.get_default_format()

        if not path:
            if not is_tmp:
                raise Exception('path or is_tmp must be set')
            path = os.path.join(tempfile.gettempdir(), 'luigi-tmp-%09d' % random.randint(0, 999999999))
        super(EnvelopedTarget, self).__init__(path)
        self.format = format
        self.is_tmp = is_tmp

    def exists(self):
        path = self.path
        if '*' in path or '?' in path or '[' in path or '{' in path:
            logger.warning("Using wildcards in path %s might lead to processing of an incomplete dataset; "
                           "override exists() to suppress the warning.", path)
        if self.fs.exists(path):
            with self.open() as fin:
                envelope = pickle.load(fin)

                try:
                    assert len(envelope.params) == len(self.params)
                    for param,paramval in self.params.items():
                        assert paramval == envelope.params.get(param)

                except(AssertionError):
                    return False
            return True
        else:
            return False

这里的问题是,使用这个目标增加了一些样板,最初luigi的目标是最小化。我设置了一个新的基础任务

代码语言:javascript
复制
class BaseTask(luigi.Task):

    def output(self, envelope):
        path = '{}{}.txt'.format(type(self).__name__, envelope.hash())
        params = envelope.params
        return EnvelopedTarget(params, path=path)

    def complete(self):

        envelope = Envelope(None, **self.param_kwargs)

        outputs = flatten(self.output(envelope))
        if len(outputs) == 0:
            warnings.warn(
                "Task %r without outputs has no custom complete() method" % self,
                stacklevel=2
            )
            return False

        return all(map(lambda output: output.exists(), outputs))

    def run(self):

        result, outparams = self.my_run()

        envelope = Envelope(result, **outparams)

        with self.output(envelope).open('w') as fout:
            pickle.dump(envelope, fout)

由此产生的EnvelopedSum任务将非常小:

代码语言:javascript
复制
class EnvelopedSum(BaseTask):

    long_ = luigi.Parameter()
    list_ = luigi.Parameter()
    of = luigi.Parameter()
    parameters = luigi.Parameter()

    def my_run(self):
        return sum(self.param_kwargs.values()), self.param_kwargs

这个任务可以在开始时以与Sum任务相同的方式运行。

注意:这个如何封装luigi任务-结果的示例实现远远不稳定,更多地说明了我所说的包络、结果和参数的含义。

,我的问题是:难道没有更简单的方法来处理luigi中的许多复杂参数吗?

Followup-question:有没有想过保存执行参数搜索的代码版本(和/或subtaks的包版本)的记录?

对于在哪里阅读这一主题的任何评论也将受到欢迎。

注:

您可能需要一些导入才能使其运行:

代码语言:javascript
复制
from luigi.task import flatten
import warnings
import pickle
EN

回答 1

Stack Overflow用户

发布于 2016-10-16 20:33:21

对于邮件列表中的此类建议,您可能会得到更好的响应。Luigi任务代码已经生成了参数的MD5散列,以生成您可以获取的唯一任务标识符。

luigi/task.py#L 128

代码语言:javascript
复制
# task_id is a concatenation of task family, the first values of the first 3 parameters
# sorted by parameter name and a md5hash of the family/parameters as a cananocalised json.
param_str = json.dumps(params, separators=(',', ':'), sort_keys=True)
param_hash = hashlib.md5(param_str.encode('utf-8')).hexdigest()
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/39996544

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档