我正在使用Linux中的Docker运行一个带有单个节点的Flink独立集群。我已经在Flink 1.10.0和JDK8的生产环境中运行了一段时间的前一个版本,我能够让S3在那里正常运行。现在,我正在尝试更新到一个较新的版本,使用本地S3实现在我的开发机器上运行Docker。不管我怎么尝试,这个错误总是弹出:
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3'.看起来S3方案没有被映射到适当的类。我确信Flink正在挑选合适的插件。我有以下依赖项:
val testDependencies = Seq(
"org.scalatest" %% "scalatest" % "3.2.0" % "test"
)
val miscDependencies = Seq(
"com.github.tototoshi" %% "scala-csv" % "1.3.6",
"org.lz4" % "lz4-java" % "1.5.1",
"org.json4s" %% "json4s-jackson" % "3.6.1",
"org.apache.hadoop" % "hadoop-common" % "3.2.1",
"redis.clients" % "jedis" % "2.9.0",
"com.googlecode.plist" % "dd-plist" % "1.21",
"com.couchbase.client" % "java-client" % "2.7.14",
"org.apache.parquet" % "parquet-avro" % "1.11.1",
)
val flinkDependencies = Seq(
"org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
"org.apache.flink" % "flink-s3-fs-hadoop" % flinkVersion % "provided",
"org.apache.flink" % "flink-metrics-dropwizard" % flinkVersion,
"org.apache.flink" % "flink-formats" % flinkVersion pomOnly(),
"org.apache.flink" % "flink-compress" % flinkVersion,
"org.apache.flink" %% "flink-statebackend-rocksdb" % flinkVersion,
"org.apache.flink" %% "flink-clients" % flinkVersion,
"org.apache.flink" %% "flink-parquet" % flinkVersion
)我确认我是严格按照documentation来做的。
发布于 2020-10-06 18:23:43
经过一段时间的努力,我终于解决了这个问题。我将我的解决方案留在这里,以防有人遇到同样的问题。
一旦作业管理器和任务管理器启动,就会检测到插件类,例如S3文件系统工厂,但是,它们不会被加载。在我的设置中,类必须在作业启动后动态加载。你可以找到更多关于Flink如何加载它的类here的信息。
正如here所解释的,装入类的提示是在作业的jar内的META-INF/services中存在一个文件时给出的。要使S3插件正常工作,您需要具有以下文件:
META-INF/services/org.apache.flink.core.fs.FileSystemFactory对于Flink应该作为作业依赖项动态加载的每个类,它都包含一行。例如:
org.apache.flink.fs.s3hadoop.S3FileSystemFactory
org.apache.flink.fs.s3hadoop.S3AFileSystemFactory我正在使用sbt assembly为我的工作创建一个far JAR。在我的项目依赖项中,我将flink-s3-fs-hadoop作为提供的依赖项包括在内,这阻止了正确的服务文件被包括在内。一旦我删除了这个限定符,正确的服务就被创建了,一切都正常了。
https://stackoverflow.com/questions/64115627
复制相似问题