首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使动态Luigi任务的失败成为非关键任务

使动态Luigi任务的失败成为非关键任务
EN

Stack Overflow用户
提问于 2018-01-08 20:34:01
回答 2查看 454关注 0票数 0

我有一个luigi工作流程,它通过ftp下载一堆大文件,并将它们存放在s3上。

我有一个任务,它读取要下载的文件列表,然后创建一系列实际执行下载的任务

其思想是,此工作流的结果是一个包含已成功下载列表的单个文件,任何失败的下载都将在第二天的下一次运行时重试。

问题是,如果任何下载任务失败,则永远不会创建成功的下载列表。

这是因为动态创建的任务成为创建它们的主任务的需求,并根据它们的输出编译一个列表。

有没有一种方法可以使这些下载任务的失败变得无关紧要,以便将列表编译为减去失败任务的输出?

下面的示例代码,GetFiles是我们从命令行调用的任务。

代码语言:javascript
复制
class DownloadFileFromFtp(luigi.Task):
sourceUrl = luigi.Parameter()

def run(self):
    with self.output().open('w') as output:
        WriteFileFromFtp(sourceUrl, output)

def output(self):
    client = S3Client()
    return S3Target(path=someOutputPath, client=client, format=luigi.format.Nop)


@requires(GetListOfFileToDownload)
class GetFiles(luigi.Task):

def run(self):

    with self.input().open('r') as fileList:
        files = json.load(fileList)

        tasks = []
        taskOutputs = []

        for file in files:
            task = DownloadFileFromFtp(sourceUrl=file["ftpUrl"])
            tasks.append(task)
            taskOutputs.append(task.output())

        yield tasks

        successfulDownloads = MakeSuccessfulOutputList(taskOutputs)

    with self.output().open('w') as out:
        json.dump(successfulDownloads, out)

def output(self):
    client = S3Client()
    return S3Target(path='successfulDownloads.json', client=client)
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2018-01-23 23:47:29

此答案可能不正确-请检查THE COMMENTS

我已经阅读了文档几次,我没有发现非关键故障之类的迹象。也就是说,通过在DownloadFileFromFtp中重写Task.complete方法,同时仍然能够在GetFiles.run中使用DownloadFileFromFtp.output,可以很容易地实现此行为。

通过使用return True覆盖,无论下载是否成功,任务DownloadFileFromFtp都将成功。

代码语言:javascript
复制
class DownloadFileFromFtp(luigi.Task):
    sourceUrl = luigi.Parameter()

    def run(self):
        with self.output().open('w') as output:
            WriteFileFromFtp(sourceUrl, output)

    def output(self):
        client = S3Client()
        return S3Target(path=someOutputPath, client=client, format=luigi.format.Nop)

    def complete(self,):
        return True

但是请注意,您还可以在该complete方法中使用更复杂的逻辑-例如,只有当任务在运行时遇到特定的网络故障时才会失败。

票数 0
EN

Stack Overflow用户

发布于 2021-03-25 15:27:50

几年后,您一定已经找到了答案,但这里有一些可以帮助您的东西。

代码语言:javascript
复制
class DownloadFileFromFtp(luigi.Task):
      sourceUrl = luigi.Parameter()

      def run(self):
           with self.output().open('w') as output:
             WriteFileFromFtp(sourceUrl, output)
      
      def on_failure(self, exception):
          #If the task fails for any reason, 
          #then just indicate the task as completed.
          #From the docs, exception is a string, so you can easily.

          if "FileNotFound" in exception:
              return self.complete(ignore=True)
          return self.complete(ignore=False)

      def complete(self, ignore=False):
          return ignore

      def output(self):
          client = S3Client()
          return S3Target(path=someOutputPath, client=client, format=luigi.format.Nop)
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48150406

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档