首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >集群上的flink作业- java.lang.NoClassDefFoundError: KeyedDeserializationSchema

集群上的flink作业- java.lang.NoClassDefFoundError: KeyedDeserializationSchema
EN

Stack Overflow用户
提问于 2018-03-15 23:23:18
回答 1查看 1.1K关注 0票数 0

我正在尝试flink并在flink集群上部署作业时遇到异常(在kubernetes上)。

设置

Flink - 1.4.2

斯卡拉- 2.11.12

Java8SDK

集群上的Flink对接图像-flink:1.4.2-scala_2.11-高寒

SBT文件

代码语言:javascript
复制
ThisBuild / resolvers ++= Seq(
    "Apache Development Snapshot Repository" at "https://repository.apache.org/content/repositories/snapshots/",
    Resolver.mavenLocal
)

name := "streamingTest1"
version := "0.1-SNAPSHOT"
organization := "com.example"
ThisBuild / scalaVersion := "2.11.12"
val flinkVersion = "1.4.2"

val flinkDependencies = Seq(
  "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-streaming-java" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion % "provided"
)

lazy val root = (project in file(".")).
  settings(
    libraryDependencies ++= flinkDependencies
  )

libraryDependencies += "com.microsoft.azure" % "azure-eventhubs" % "1.0.0"
libraryDependencies += "com.google.code.gson" % "gson" % "2.3.1"

excludeDependencies ++= Seq(
  ExclusionRule("org.ow2.asm", "*")
)

assembly / mainClass := Some("com.example.Job")

// make run command include the provided dependencies
Compile / run  := Defaults.runTask(Compile / fullClasspath,
                                   Compile / run / mainClass,
                                   Compile / run / runner
                                  ).evaluated

// exclude Scala library from assembly
assembly / assemblyOption  := (assembly / assemblyOption).value.copy(includeScala = false)

我可以通过IDEA或通过sbt在我的机器上本地运行作业,我可以看到预期的输出,也可以将输出发送到我的接收器。

在部署在kubernetes集群上的flink服务器上运行时,我会看到以下异常。

误差

代码语言:javascript
复制
java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: Could not run the jar.
    at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:90)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not run the jar.
    ... 9 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The program caused an error: 
    at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93)
    at org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:334)
    at org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:87)
    at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:69)
    ... 8 more
Caused by: java.lang.NoClassDefFoundError: org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema
    at com.example.Job$.main(Job.scala:126)
    at com.example.Job.main(Job.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
    at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
    ... 11 more
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 20 more

更多的背景

我的项目有一个用于flink作业的scala类,它还在一个单独的目录中为一个与EventHub接口的连接器提供了java代码。我的主要scala代码不依赖于KeyedDeserializationSchema,但是连接器有。KeyedDeserializationSchema最有可能来自卡夫卡连接器依赖项,它包含在我的sbt文件中。在集群上运行此作业时,是否有任何理由无法使用kafka连接器?

如何调试在服务器上加载的kafka连接器的哪个版本?有没有其他方式的包装,可以迫使flink加载卡夫卡连接器?

EN

回答 1

Stack Overflow用户

发布于 2018-03-25 16:40:44

正如我很快意识到,在张贴这个问题后,修复是删除“提供”的卡夫卡连接器。

代码语言:javascript
复制
val flinkDependencies = Seq(
  "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-streaming-java" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion
)
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/49310854

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档