首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Luigi全局变量

Luigi全局变量
EN

Stack Overflow用户
提问于 2018-07-03 18:39:18
回答 2查看 1.1K关注 0票数 3

我想在Luigi中将一些目标路径设置为全局变量。

原因是我使用的目标路径是基于给定的数值天气预报(NWP)的最后一次运行,并且需要一些时间才能获得该值。一旦我检查了哪一个是最后一次运行,我创建了一个路径,我将在其中放置几个目标文件(具有相同的父文件夹)。

我目前正在重复一个类似的调用来获取几个任务的父路径的值,将此路径设置为全局变量会更有效。我曾尝试在luigi类调用的一个函数(get_target_path)中定义全局变量,但当我返回Luigi管道时,该全局变量似乎无法持久存在。

这就是我的代码的样子:

代码语言:javascript
复制
class GetNWP(luigi.Task):
    """
    Download the NWP data.
    """
    product_id = luigi.Parameter()
    date = luigi.Parameter(default=datetime.today().strftime('%Y%m%d'))
    run_hr = luigi.Parameter(default='latest')

    def requires(self):
        return None
    def output(self):
        path = get_target_path(self.product_id, self.date, self.run_hr,
                               type='getNWP')
        return luigi.LocalTarget(path)
    def run(self):
        download_nwp_data(self.product_id, self.date, self.run_hr)


class GetNWP_GFS(luigi.Task):
    """
    GFS data.
    """
    product_id = luigi.Parameter()
    date = luigi.Parameter(default=datetime.today().strftime('%Y%m%d'))
    run_hr = luigi.Parameter(default='latest')

    def requires(self):
        return None
    def output(self):
        path = get_target_path(self.product_id_PV, self.date, self.run_hr,
                               type='getNWP_GFS')
        return luigi.LocalTarget(path)
    def run(self):
        download_nwp_data(self.product_id, self.date, self.run_hr,
                          type='getNWP_GFS')


class Predict(luigi.Task):
    """
    Create forecast.
    """
    product_id = luigi.Parameter(default=None)
    date = luigi.Parameter(default=datetime.today().strftime('%Y%m%d'))
    run_hr = luigi.Parameter(default='latest')
    horizon = luigi.Parameter(default='DA')

    def requires(self):
        return [
                GetNWP_GFS(self.product_id, self.date, self.run_hr),
                GetNWP(self.product_id, self.date, self.run_hr)
                ]
    def output(self):
        path = get_target_path(self.product_id, self.date, self.run_hr,
                               type='predict', horizon=self.horizon)
        return luigi.LocalTarget(path)
    def run(self):
        get_forecast(self.product_id, self.date, self.run_hr)

函数get_target_path基于输入参数定义目标路径。我希望这个函数可以设置可以从Luigi访问的全局变量。例如,如下所示(仅用于getNWP任务的代码):

代码语言:javascript
复制
def get_target_path(product_id, date, run_hr, type=None, horizon='DA'):
        """
        Obtain target path.
        """
        if type == 'getNWP_GFS':
            if 'path_nwp_gfs' in globals():
                return path_nwp_gfs
            else:
                ...
        elif type == 'getNWP':
            if 'path_nwp_model' in globals():
                return path_nwp_model
            else:
                filename = f'{nwp_model}_{date}_{run_hr}_{horizon}.{ext}'
                path = Path(db_dflt['app_data']['nwp_folder'])
                create_directory(path)
                global path_nwp_model
                path_nwp_model = Path(path) / filename
        elif type == 'predict':
            if 'path_predict' in globals():
                return path_predict
            else:
                ...

当我回到Luigi时,这个函数中定义的全局变量并不存在。

任何关于如何解决这个问题的想法都将不胜感激!

EN

回答 2

Stack Overflow用户

发布于 2018-07-16 21:34:52

由于似乎没有内置的方法来存储Luigi的目标路径,我最终决定创建一个类来保存与Luigi的目标/路径相关的所有信息。当调用需要知道哪些是目标路径的外部函数时,这个类在Luigi的任务中使用。

该类在主luigy脚本中导入,并在定义任务之前实例化:

代码语言:javascript
复制
from .utils import Targets
paths = Targets()

class GetNWP(luigi.Task):
    """Download NWP data required to prepare the prediction."""

    product_id = luigi.Parameter()
    date = luigi.Parameter(default=datetime.today().strftime('%Y%m%d'))
    run_hr = luigi.Parameter(default='latest')

    def requires(self):
        return GetProductInfo(self.product_id)
    def output(self):
        path = paths.getpath_nwp(self.product_id, self.date, self.run_hr)
        path_gfs = paths.getpath_nwp_GFS(self.product_id, self.date, self.run_hr)
        return [luigi.LocalTarget(path),
                luigi.LocalTarget(path_gfs)]
    def run(self):
        download_nwp_data(self.product_id, date=self.date, run_hr=self.run_hr,
                          paths=paths, nwp_model=paths.nwp_model)
        download_nwp_data(self.product_id, date=self.date, run_hr=self.run_hr,
                          paths=paths, nwp_model=paths.gfs_model)   

class Predict(luigi.Task):
    """Create forecast based on the product information and NWP data."""

    product_id = luigi.Parameter()
    date = luigi.Parameter(default=datetime.today().strftime('%Y%m%d'))
    run_hr = luigi.Parameter(default='latest')

    def requires(self):
        return GetNWP(self.product_id, self.date, self.run_hr)
    def output(self):
        path = paths.getpath_predict(self.product_id, self.date, self.run_hr)
        path_gfs = paths.getpath_predict_GFS(self.product_id, self.date,
                                             self.run_hr)
        return [luigi.LocalTarget(path),
                luigi.LocalTarget(path_gfs)]
    def run(self):
        get_forecast(product_id=self.product_id, date=self.date,
                     run_hr=self.run_hr, paths=paths, nwp_model=paths.nwp_model)
        get_forecast(product_id=self.product_id, date=self.date,
                     run_hr=self.run_hr, paths=paths, nwp_model=paths.gfs_model)

其中目标类具有以下结构:

代码语言:javascript
复制
class Targets:
    """Store Luigi's target paths."""

    def __init__(self):
        """Initialize paths and variables."""
        self.path1 = None
        self.path2 = None
        self.path3 = None

    def update_object(self, product_id, date=None, run_hr=None):
        """Update object based on inputs."""
        if self.prod_id is None:
            self.prod_id = product_id
        if self.path_1 is None:
            self.get_path_1(product_id)
        if self.path_2 is None:
            self.get_path_2(product_id)
        if self.path_3 is None:
            self.get_path_3(product_id)

    def get_path_1(self, product_id, ...)
        """Generate a path 1 for a luigi Task."""
        ... define self.path_1...

    def get_path_2(self, product_id, ...)
        """Generate a path 2 for a luigi Task."""
        ... define self.path_2...

    def get_path_3(self, product_id, ...)
        """Generate a path 3 for a luigi Task."""
        ... define self.path_3...

主要思想是只设置一次目标路径,并在每个Luigi任务中使用它们作为输入参数。这使您可以:

如果目标路径因新的NWP或可用而发生更改,

  • 可更快地执行任务,并且
  • 可避免出现错误。
票数 1
EN

Stack Overflow用户

发布于 2019-10-17 06:15:20

如果愿意,您可以使用mixins,但请记住,luigi任务可以继承实例方法和参数。

代码语言:javascript
复制
import os
import luigi
LUIGI_BASE_PATH='/path/to/luigi/dir'

class BaseTask(luigi.Task)
    product_id = luigi.Parameter()
    date = luigi.Parameter(default=datetime.today().strftime('%Y%m%d'))
    run_hr = luigi.Parameter(default='latest')

    def get_path_dynamic(self):
        return os.path.join(LUIGI_BASE_PATH, 
                            self.__class__.__name__, 
                            self.product_id,
                            ...)

    def output(self):
        return luigi.LocalTarget(self.get_path_dynamic())


class Predict(BaseTask):
    def run(self):
        ...

附加的好处是您不需要重新定义相同的参数,并且子任务的名称(PredictGetNWP)将被插入到输出路径中。我不确定path1path2等属性与gpath_nwp()和类似函数的关系,因为它们的定义没有包含在示例中,但是您可以使用@property装饰器来定义getter和setter来模拟相同的功能。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51152485

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档