有人知道在google-cloud-dataflow中使用文件模式匹配时如何获取文件名吗?
我是newbee来使用数据流。如何在使用文件模式匹配时获取文件名。
p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*.txt"))我想知道如何检测文件名,即kinglear.txt、Hamlet.txt等。
发布于 2019-01-29 13:05:59
基于最新SDK Java (sdk 2.9.0)更新:
FileIO TextIO阅读器不提供对文件名本身的访问权限,对于这些用例,我们需要使用Beams来匹配文件并访问存储在文件名中的信息。与TextIO不同,在FileIO读取的下游转换中,文件的读取需要由用户处理。FileIO读取的结果是一个PCollection。ReadableFile类包含作为元数据的文件名,它可以与文件内容一起使用。
FileIO确实有一个方便的方法readFullyAsUTF8String(),它可以将整个文件读取到一个String对象中,这将首先将整个文件读取到内存中。如果内存是一个问题,您可以使用FileSystems等实用程序类直接处理该文件。
PCollection<KV<String, String>> filesAndContents = p
.apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
// withCompression can be omitted - by default compression is detected from the filename.
.apply(FileIO.readMatches().withCompression(GZIP))
.apply(MapElements
// uses imports from TypeDescriptors
.into(KVs(strings(), strings()))
.via((ReadableFile f) -> KV.of(
f.getMetadata().resourceId().toString(), f.readFullyAsUTF8String())));Python (sdk 2.9.0):
对于2.9.0 For python,您需要从Dataflow管道外部收集URI列表,并将其作为参数提供给管道。例如,使用FileSystems通过Glob模式读取文件列表,然后将其传递给PCollection进行处理。
一旦文件参见PR https://github.com/apache/beam/pull/7791/可用,下面的代码也将是python的一个选项。
import apache_beam as beam
from apache_beam.io import fileio
with beam.Pipeline() as p:
readable_files = (p
| fileio.MatchFiles(‘hdfs://path/to/*.txt’)
| fileio.ReadMatches()
| beam.Reshuffle())
files_and_contents = (readable_files
| beam.Map(lambda x: (x.metadata.path,
x.read_utf8()))发布于 2016-02-27 05:46:36
一种方法是构建一个List<PCollection>,其中每个条目对应一个输入文件,然后使用Flatten。例如,如果要将文件集合的每一行解析为Foo对象,可以执行以下操作:
public static class FooParserFn extends DoFn<String, Foo> {
private String fileName;
public FooParserFn(String fileName) {
this.fileName = fileName;
}
@Override
public void processElement(ProcessContext processContext) throws Exception {
String line = processContext.element();
// here you have access to both the line of text and the name of the file
// from which it came.
}
}
public static void main(String[] args) {
...
List<String> inputFiles = ...;
List<PCollection<Foo>> foosByFile =
Lists.transform(inputFiles,
new Function<String, PCollection<Foo>>() {
@Override
public PCollection<Foo> apply(String fileName) {
return p.apply(TextIO.Read.from(fileName))
.apply(new ParDo().of(new FooParserFn(fileName)));
}
});
PCollection<Foo> foos = PCollectionList.<Foo>empty(p).and(foosByFile).apply(Flatten.<Foo>pCollections());
...
}这种方法的一个缺点是,如果您有100个输入文件,那么在Cloud Dataflow监控控制台中也会有100个节点。这使得我们很难知道发生了什么。我有兴趣听到Google Cloud Dataflow人员的意见,看看这种方法是否有效。
发布于 2016-06-02 10:02:30
在使用类似于@danvk的代码时,我还在数据流图上有100个输入文件= 100个节点。我切换到了一种类似的方法,它将所有读取合并到一个块中,您可以展开该块以深入到所读取的每个文件/目录。在我们的用例中,使用这种方法也比使用Lists.transform方法运行得更快。
GcsOptions gcsOptions = options.as(GcsOptions.class);
List<GcsPath> paths = gcsOptions.getGcsUtil().expand(GcsPath.fromUri(options.getInputFile()));
List<String>filesToProcess = paths.stream().map(item -> item.toString()).collect(Collectors.toList());
PCollectionList<SomeClass> pcl = PCollectionList.empty(p);
for(String fileName : filesToProcess) {
pcl = pcl.and(
p.apply("ReadAvroFile" + fileName, AvroIO.Read.named("ReadFromAvro")
.from(fileName)
.withSchema(SomeClass.class)
)
.apply(ParDo.of(new MyDoFn(fileName)))
);
}
// flatten the PCollectionList, combining all the PCollections together
PCollection<SomeClass> flattenedPCollection = pcl.apply(Flatten.pCollections());https://stackoverflow.com/questions/29983621
复制相似问题