首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何从beam写入HDFS?

如何从beam写入HDFS?
EN

Stack Overflow用户
提问于 2018-11-03 03:36:30
回答 1查看 1.2K关注 0票数 1

我正在尝试编写一个光束管道,它使用SparkRunner运行,从本地文件读取,并写入HDFS。

下面是一个最小的例子:

Options类-

代码语言:javascript
复制
package com.mycompany.beam.hdfsIOIssue;

import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.sdk.io.hdfs.HadoopFileSystemOptions;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.Validation;

public interface WritingToHDFSOptions extends PipelineOptions, SparkPipelineOptions, HadoopFileSystemOptions {

  @Validation.Required
  @Description("Path of the local file to read from")
  String getInputFile();
  void setInputFile(String value);

  @Validation.Required
  @Description("Path of the HDFS to write to")
  String getOutputFile();
  void setOutputFile(String value);

}

光束主类-

代码语言:javascript
复制
package com.mycompany.beam.hdfsIOIssue;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.PCollection;

public class WritingToHDFS {

  public static void main(String[] args) {
    PipelineOptionsFactory.register(WritingToHDFSOptions.class);

    WritingToHDFSOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
      .as(WritingToHDFSOptions.class);

    Pipeline p = Pipeline.create(options);

    buildPipeline(p, options);

    p.run();
  }

  static void buildPipeline(Pipeline p, WritingToHDFSOptions options) {
    PCollection<String> input = p.apply("ReadLines", TextIO.read().from(options.getInputFile()));

    ResourceId resource = FileBasedSink.convertToFileResourceIfPossible(options.getOutputFile());
    TextIO.Write write = TextIO.write().to(resource);
    input.apply("WriteLines", write);
  }
}

像这样运行它:

代码语言:javascript
复制
spark-submit test --master yarn --deploy-mode cluster --class com.mycompany.beam.hdfsIOIssue.WritingToHDFS my-project-bundled-0.1-SNAPSHOT.jar --runner=SparkRunner --inputFile=testInput --outputFile=hdfs://testOutput

我期望发生的事情:它读取本地testInput文件中的行,并将它们写到我的hdfs主目录中一个名为testOutput的新文件中。

实际发生的情况:据我所知,什么都没有。Spark表示作业已成功完成,我在日志中看到了Beam步骤,但没有将名为testOutput的文件或目录写入hdfs或我的本地目录。也许它是在spark executor节点上本地编写的,但我无法访问这些节点进行检查。

我猜要么是我错误地使用了TextIO接口,要么是我需要做更多的工作来配置文件系统,而不仅仅是将它添加到我的PipelineOptions接口。但我找不到解释如何做到这一点的文档。

EN

回答 1

Stack Overflow用户

发布于 2018-11-06 03:01:21

我认为你的选择应该如下所示:

代码语言:javascript
复制
--inputFile=hdfs:///testInput --outputFile=hdfs:///testOutput

您可能还想等到管道完成后才能看到结果:

代码语言:javascript
复制
p.run().waitUntilFinish();

您可以找到一个简单完整的HDFS写入(Avro文件) here的工作示例

请注意(BEAM-2277),它可能也适用于您正在运行的梁的版本(它将抛出错误)。您可以使用以下命令来解决此问题:

代码语言:javascript
复制
TextIO.Write write = TextIO.write().to(resource)
  .withTempDirectory(FileSystems.matchNewResource("hdfs:///tmp/beam-test", true));

如果你把你的项目打包到一个公共的GitHub库中,我会对它进行测试,并帮助你开始运行。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/53124859

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档