首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在google-cloud-dataflow中使用文件模式匹配时如何获取文件名

在google-cloud-dataflow中使用文件模式匹配时如何获取文件名
EN

Stack Overflow用户
提问于 2015-05-01 16:13:45
回答 4查看 4.8K关注 0票数 5

有人知道在google-cloud-dataflow中使用文件模式匹配时如何获取文件名吗?

我是newbee来使用数据流。如何在使用文件模式匹配时获取文件名。

代码语言:javascript
复制
p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*.txt"))

我想知道如何检测文件名,即kinglear.txt、Hamlet.txt等。

EN

回答 4

Stack Overflow用户

发布于 2019-01-29 13:05:59

基于最新SDK Java (sdk 2.9.0)更新:

FileIO TextIO阅读器不提供对文件名本身的访问权限,对于这些用例,我们需要使用Beams来匹配文件并访问存储在文件名中的信息。与TextIO不同,在FileIO读取的下游转换中,文件的读取需要由用户处理。FileIO读取的结果是一个PCollection。ReadableFile类包含作为元数据的文件名,它可以与文件内容一起使用。

FileIO确实有一个方便的方法readFullyAsUTF8String(),它可以将整个文件读取到一个String对象中,这将首先将整个文件读取到内存中。如果内存是一个问题,您可以使用FileSystems等实用程序类直接处理该文件。

来自:Document Link

代码语言:javascript
复制
PCollection<KV<String, String>> filesAndContents = p
     .apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
     // withCompression can be omitted - by default compression is detected from the filename.
     .apply(FileIO.readMatches().withCompression(GZIP))
     .apply(MapElements
         // uses imports from TypeDescriptors
         .into(KVs(strings(), strings()))
         .via((ReadableFile f) -> KV.of(
             f.getMetadata().resourceId().toString(), f.readFullyAsUTF8String())));

Python (sdk 2.9.0):

对于2.9.0 For python,您需要从Dataflow管道外部收集URI列表,并将其作为参数提供给管道。例如,使用FileSystems通过Glob模式读取文件列表,然后将其传递给PCollection进行处理。

一旦文件参见PR https://github.com/apache/beam/pull/7791/可用,下面的代码也将是python的一个选项。

代码语言:javascript
复制
import apache_beam as beam
from apache_beam.io import fileio

with beam.Pipeline() as p:
  readable_files = (p 
                    | fileio.MatchFiles(‘hdfs://path/to/*.txt’)
                    | fileio.ReadMatches()
                    | beam.Reshuffle())
  files_and_contents = (readable_files 
                        | beam.Map(lambda x: (x.metadata.path, 
                                              x.read_utf8()))
票数 2
EN

Stack Overflow用户

发布于 2016-02-27 05:46:36

一种方法是构建一个List<PCollection>,其中每个条目对应一个输入文件,然后使用Flatten。例如,如果要将文件集合的每一行解析为Foo对象,可以执行以下操作:

代码语言:javascript
复制
public static class FooParserFn extends DoFn<String, Foo> {
  private String fileName;
  public FooParserFn(String fileName) {
    this.fileName = fileName;
  }

  @Override
  public void processElement(ProcessContext processContext) throws Exception {
    String line = processContext.element();
    // here you have access to both the line of text and the name of the file
    // from which it came.
  }
}

public static void main(String[] args) {
  ...
  List<String> inputFiles = ...;
  List<PCollection<Foo>> foosByFile =
          Lists.transform(inputFiles,
          new Function<String, PCollection<Foo>>() {
            @Override
            public PCollection<Foo> apply(String fileName) {
              return p.apply(TextIO.Read.from(fileName))
                      .apply(new ParDo().of(new FooParserFn(fileName)));
            }
          });

  PCollection<Foo> foos = PCollectionList.<Foo>empty(p).and(foosByFile).apply(Flatten.<Foo>pCollections());
  ...
}

这种方法的一个缺点是,如果您有100个输入文件,那么在Cloud Dataflow监控控制台中也会有100个节点。这使得我们很难知道发生了什么。我有兴趣听到Google Cloud Dataflow人员的意见,看看这种方法是否有效。

票数 1
EN

Stack Overflow用户

发布于 2016-06-02 10:02:30

在使用类似于@danvk的代码时,我还在数据流图上有100个输入文件= 100个节点。我切换到了一种类似的方法,它将所有读取合并到一个块中,您可以展开该块以深入到所读取的每个文件/目录。在我们的用例中,使用这种方法也比使用Lists.transform方法运行得更快。

代码语言:javascript
复制
GcsOptions gcsOptions = options.as(GcsOptions.class);
List<GcsPath> paths = gcsOptions.getGcsUtil().expand(GcsPath.fromUri(options.getInputFile()));
List<String>filesToProcess = paths.stream().map(item -> item.toString()).collect(Collectors.toList());

PCollectionList<SomeClass> pcl = PCollectionList.empty(p);
for(String fileName : filesToProcess) {
    pcl = pcl.and(
            p.apply("ReadAvroFile" + fileName, AvroIO.Read.named("ReadFromAvro")
                    .from(fileName)
                    .withSchema(SomeClass.class)
            )
            .apply(ParDo.of(new MyDoFn(fileName)))
    );
}

// flatten the PCollectionList, combining all the PCollections together
PCollection<SomeClass> flattenedPCollection = pcl.apply(Flatten.pCollections());
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/29983621

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档