首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >没有为方案hdfs - org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)找到文件系统

没有为方案hdfs - org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)找到文件系统
EN

Stack Overflow用户
提问于 2019-03-13 08:01:50
回答 2查看 1.2K关注 0票数 2

我使用的是ClouderaEnterprise6.1.0版本的,在用SparkRunner读取或编写HDFS上的任何文件时,使用apache 2.11SDKS来解决这个问题。但是,有了火花,它就开始运作了。

这个问题是在将Cloudera版本从5.14.0升级到6.1.0后出现的,在以前的版本中,它可以很好地处理下面的代码。

代码语言:javascript
复制
import java.io.File;
import java.io.IOException;
import java.sql.ResultSet;
import org.apache.beam.runners.spark.SparkContextOptions;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.jdbc.JdbcIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.apache.beam.sdk.io.hdfs.HadoopFileSystemOptions;

public class Test {

    public static void main(String[] args) {

        //HadoopFileSystemOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(HadoopFileSystemOptions.class);
        SparkConf sparkConf = new SparkConf()
                    .setMaster("yarn")
                    .set("spark.submit.deployMode", "client")
                    .set("spark.driver.memory", "4g")
                    .set("spark.executor.cores", "5")
                .set("spark.executor.instances", "30")
                .set("spark.executor.memory","8g");

        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        jsc.setLogLevel("ERROR");

        SparkContextOptions options = PipelineOptionsFactory.create().as(SparkContextOptions.class);
        options.setRunner(SparkRunner.class);
        options.setUsesProvidedSparkContext(true);
        options.setProvidedSparkContext(jsc);

        Pipeline p = Pipeline.create(options);
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS","hdfs://host1:8020");
         UserGroupInformation.setConfiguration(conf);
        try {
            UserGroupInformation.loginUserFromKeytab("test@EIO.COM", "/opt/app/kerbfiles/test.keytab");
            if(UserGroupInformation.isLoginKeytabBased()){
                UserGroupInformation.getLoginUser().reloginFromKeytab();
            }else if(UserGroupInformation.isLoginTicketBased()){
                UserGroupInformation.getLoginUser().reloginFromTicketCache();
            }
        }catch (IOException e1) {
            e1.printStackTrace();
        }
        System.out.println("*******************************8");
        p.apply("ReadLines", TextIO.read().from("hdfs://host1:8020/hdfsdata/input/Reg_Employee.txt"))

        .apply("WriteCounts", TextIO.write().to("hdfs://host1:8020/tmp/test"));
        p.run().waitUntilFinish();
    }

以下是例外情况:

代码语言:javascript
复制
Exception in thread "main" java.lang.IllegalArgumentException: No filesystem found for scheme hdfs
    at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
    at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
    at org.apache.beam.sdk.io.FileBasedSink.convertToFileResourceIfPossible(FileBasedSink.java:219)
    at org.apache.beam.sdk.io.TextIO$TypedWrite.to(TextIO.java:700)
    at org.apache.beam.sdk.io.TextIO$Write.to(TextIO.java:1027)
    at Test.main(Test.java:91)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

这件事需要帮助。

谢谢

尼基尔

EN

回答 2

Stack Overflow用户

发布于 2019-10-03 14:17:16

可能有几个原因:

  1. 服务文件在类路径上被错误地合并或错误的服务文件。您对类路径的一些依赖在org.apache.hadoop.hdfs.DistributedFileSystem中不包含META-INF/services/org.apache.hadoop.fs.FileSystem。因此,如果首先加载它,则无法找到类,该类将启动HDFS。例如,hadoop-common.jar在服务文件中没有org.apache.hadoop.hdfs.DistributedFileSystem,但是hadoop-hdfs.jar正确地拥有它。 该做什么-如果错误的服务文件来自fatjar,合并服务文件。例如在gradle中
代码语言:javascript
复制
shadowJar {
   mergeServiceFiles() 
}

或者将运行程序的版本与预构建的hadoop一起使用,则应该正确合并。

  1. 设置export HADOOP_CONF_DIR="/etc/hadoop/conf"。或者hdfs吐露的路径。确保运行spark/flink的用户可以看到这个环境变量。
  2. 你没有添加依赖 "org.apache.beam:beam-sdks-java-io-hadoop-file-system:${beamVersion}" 所以beam不知道这个方案。
票数 1
EN

Stack Overflow用户

发布于 2019-12-02 21:01:04

添加HadoopFileSystemOptions解决了我的问题:

代码语言:javascript
复制
    val conf = new Configuration()
    conf.set("fs.default.name", "hdfs://hadoop-namenode-5:9000")
    conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem")
    val options=PipelineOptionsFactory.fromArgs(args).as(classOf[HadoopFileSystemOptions])

代码来自:紫强流梁算例

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55136949

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档