在我的许多项目中,我使用罗吉作为管道工具。这让我想到了使用它来实现参数搜索。标准的luigi.file.LocalTarget对于处理参数有一种非常天真的方法,在文档的示例中也显示了这一点:
def output(self):
return luigi.LocalTarget("data/artist_streams_%s.tsv" % self.date_interval)即,参数保存在文件名中。这使得检查某个参数组合是否已经计算很容易。一旦任务的参数变得更复杂,这就会变得很混乱。
下面是参数搜索的非常简单的想法:
import luigi
class Sum(luigi.Task):
long_ = luigi.Parameter()
list_ = luigi.Parameter()
of = luigi.Parameter()
parameters = luigi.Parameter()
def output(self):
return luigi.LocalTarget("task{}_{}_{}_{}.txt".format(self.long_,
self.list_,
self.of,
self.parameters))
def run(self):
sum_ = self.long_ + self.list_ + self.of + self.parameters
with self.output().open('w') as out_file:
out_file.write(str(sum_))
class ParameterSearch(luigi.Task):
def requires(self):
list_of_parameter_combinations = [
{
"long_" : 1,
"list_" : 2,
"of" : 3,
"parameters" : 4
},{
"long_" : 5,
"list_" : 6,
"of" : 7,
"parameters" : 8
}
]
for pc in list_of_parameter_combinations:
yield Sum(**pc)当然,在这个例子中,所有四个参数都可以在文件名中编码,但是它不需要太多的幻想,这种方法可以到达边界。例如,考虑类似数组的参数。
我的后续想法是将参数和结果存储在某种信封对象中,然后将其保存为目标。然后,文件名可以是用于第一次模糊搜索的参数的某种哈希。
有信封类
class Envelope(object):
@classmethod
def hashify(cls, params):
return hash(frozenset(params.items()))
def __init__(self, result, **params):
self.params = {}
for k in params:
self.params[k] = params.get(k)
def hash(self):
return Envelope.hashify(self.params)然后是新的目标,它增强了LocalTarget,并能够检查信封中的所有参数是否匹配:
class EnvelopedTarget(luigi.file.LocalTarget):
fs = luigi.file.LocalFileSystem()
def __init__(self, params, path=None, format=None, is_tmp=False):
self.path = path
self.params = params
if format is None:
format = luigi.file.get_default_format()
if not path:
if not is_tmp:
raise Exception('path or is_tmp must be set')
path = os.path.join(tempfile.gettempdir(), 'luigi-tmp-%09d' % random.randint(0, 999999999))
super(EnvelopedTarget, self).__init__(path)
self.format = format
self.is_tmp = is_tmp
def exists(self):
path = self.path
if '*' in path or '?' in path or '[' in path or '{' in path:
logger.warning("Using wildcards in path %s might lead to processing of an incomplete dataset; "
"override exists() to suppress the warning.", path)
if self.fs.exists(path):
with self.open() as fin:
envelope = pickle.load(fin)
try:
assert len(envelope.params) == len(self.params)
for param,paramval in self.params.items():
assert paramval == envelope.params.get(param)
except(AssertionError):
return False
return True
else:
return False这里的问题是,使用这个目标增加了一些样板,最初luigi的目标是最小化。我设置了一个新的基础任务
class BaseTask(luigi.Task):
def output(self, envelope):
path = '{}{}.txt'.format(type(self).__name__, envelope.hash())
params = envelope.params
return EnvelopedTarget(params, path=path)
def complete(self):
envelope = Envelope(None, **self.param_kwargs)
outputs = flatten(self.output(envelope))
if len(outputs) == 0:
warnings.warn(
"Task %r without outputs has no custom complete() method" % self,
stacklevel=2
)
return False
return all(map(lambda output: output.exists(), outputs))
def run(self):
result, outparams = self.my_run()
envelope = Envelope(result, **outparams)
with self.output(envelope).open('w') as fout:
pickle.dump(envelope, fout)由此产生的EnvelopedSum任务将非常小:
class EnvelopedSum(BaseTask):
long_ = luigi.Parameter()
list_ = luigi.Parameter()
of = luigi.Parameter()
parameters = luigi.Parameter()
def my_run(self):
return sum(self.param_kwargs.values()), self.param_kwargs这个任务可以在开始时以与Sum任务相同的方式运行。
注意:这个如何封装luigi任务-结果的示例实现远远不稳定,更多地说明了我所说的包络、结果和参数的含义。
,我的问题是:难道没有更简单的方法来处理luigi中的许多复杂参数吗?
Followup-question:有没有想过保存执行参数搜索的代码版本(和/或subtaks的包版本)的记录?
对于在哪里阅读这一主题的任何评论也将受到欢迎。
注:
您可能需要一些导入才能使其运行:
from luigi.task import flatten
import warnings
import pickle发布于 2016-10-16 20:33:21
对于邮件列表中的此类建议,您可能会得到更好的响应。Luigi任务代码已经生成了参数的MD5散列,以生成您可以获取的唯一任务标识符。
# task_id is a concatenation of task family, the first values of the first 3 parameters
# sorted by parameter name and a md5hash of the family/parameters as a cananocalised json.
param_str = json.dumps(params, separators=(',', ':'), sort_keys=True)
param_hash = hashlib.md5(param_str.encode('utf-8')).hexdigest()https://stackoverflow.com/questions/39996544
复制相似问题