首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在Ruffus流水线中运行函数前需要制作一组文件

在Ruffus流水线中运行函数前需要制作一组文件
EN

Stack Overflow用户
提问于 2010-03-18 05:11:21
回答 1查看 269关注 0票数 1

我正在使用ruffus编写一个管道。我有一个函数,它被并行调用了很多次,它创建了几个文件。我想创建一个函数"combineFiles()“,在创建完所有这些文件之后调用它。因为它们是在集群上并行运行的,所以它们不会全部一起完成。我写了一个函数'getFilenames()‘来返回需要创建的一组文件名,但是如何让combineFiles()等待它们出现呢?

我尝试了以下几种方法:

代码语言:javascript
复制
@pipelineFunction
@files(getFilenames)
def combineFiles(filenames):
  # I should only be called if every file in the list 'filenames' exists

我也尝试过这个装饰器:

代码语言:javascript
复制
@merge(getFilenames)

但这也不起作用。在生成getFilenames提供的文件之前,仍然会错误地调用combineFiles。如何才能使combineFiles以这些文件存在为条件?

谢谢。

EN

回答 1

Stack Overflow用户

发布于 2010-03-26 21:03:06

我是Ruffus的开发者。我不确定我是否完全理解你想要做什么,但下面是:

为了运行流水线的下一阶段,等待需要不同时间的作业才能完成,这正是Ruffus所做的,所以希望这是直截了当的。

第一个问题是,您是否知道正在预先创建哪些文件,即在管道运行之前?让我们先假设你这样做了。

代码语言:javascript
复制
from ruffus import *
filenames = ["one.file", "two.file", "three.file"]

让我们编写一个伪函数,它在每次调用时创建一个文件。在Ruffus中,任何输入和输出文件名都分别包含在前两个参数中。我们没有输入文件名,所以我们的函数调用应该如下所示:

代码语言:javascript
复制
create_file(None, "one.file")
create_file(None, "two.file")
create_file(None, "three.file")

create_file的定义如下所示:

代码语言:javascript
复制
@files([(None, fn) for fn in filenames])
def create_file(no_input_file_name, output_file_name):
    open(output_file_name, "w").write("dummy file")

这些文件中的每一个都将在3个单独的create_file调用中创建。如果你愿意的话,这些都可以并行运行。

代码语言:javascript
复制
pipeline_run([create_file], multiprocess = 5)

现在来组合这些文件。"@Merge“装饰器确实是为此而设置的。我们只需要将它链接到前一个函数:

代码语言:javascript
复制
@merge(create_file, "merge.file")
def merge_file(input_file_names, output_file_name):
    output_file = open(output_file_name, "w")
    for i in input_file_names:
        output_file.write(open(i).read())

这将仅当三次调用create_file()中的所有文件都准备就绪时才调用merge_file。

整个代码如下:

代码语言:javascript
复制
from ruffus import *
filenames = ["one.file", "two.file", "three.file"]

from random import randint
from time import sleep

@files([(None, fn) for fn in filenames])
def create_file(no_input_file_name, output_file_name):
    # simulate create file process of indeterminate complexity
    sleep(randint(1,5))
    open(output_file_name, "w").write("dummy file")

@merge(create_file, "merge.file")
def merge_file(input_file_names, output_file_name):
    output_file = open(output_file_name, "w")
    for i in input_file_names:
        output_file.write(open(i).read())


pipeline_run([merge_file], multiprocess = 5)

这就是结果:

代码语言:javascript
复制
>>> pipeline_run([merge_file], multiprocess = 5)

    Job = [None -> two.file] completed
    Job = [None -> three.file] completed
    Job = [None -> one.file] completed
Completed Task = create_file
    Job = [[one.file, three.file, two.file] -> merge.file] completed
Completed Task = merge_file
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/2465953

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档