使用Scala2.12运行Flink 1.9.0并尝试使用flink-connector-kafka将数据发布到Kafka,在本地调试时一切正常。一旦我将作业提交到集群,就会在运行时得到以下java.lang.LinkageError,它无法运行作业:
java.lang.LinkageError: loader constraint violation: loader (instance of org/apache/flink/util/ChildFirstClassLoader) previously initiated loading for a different type with name "org/apache/kafka/clients/producer/ProducerRecord"
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:66)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
at java.lang.Class.getDeclaredMethod(Class.java:2128)
at java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:1629)
at java.io.ObjectStreamClass.access$1700(ObjectStreamClass.java:79)
at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:520)
at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494)
at java.security.AccessController.doPrivileged(Native Method)
at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:494)
at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391)
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:681)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:561)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.readObject(FlinkKafkaProducer.java:1202)
at sun.reflect.GeneratedMethodAccessor358.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:235)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:427)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:144)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:370)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)当查看使用-verbose:class加载的类时,我看到这个类被加载了几次:
taskmanager [Loaded org.apache.kafka.clients.producer.ProducerRecord from file:/tmp/blobStore-8cf95113-e767-4073-9b1b-e579d46c0283/job_f0c3db8b84dd38e83f92ecf1bc61b698/blob_p-c327eb8f4333a638b2b7049049368f23254aeb9c-03045e6d6a9c8f3c7dacdded8cb97d6e]
taskmanager [Loaded org.apache.kafka.clients.producer.ProducerRecord from file:/tmp/blobStore-8cf95113-e767-4073-9b1b-e579d46c0283/job_f0c3db8b84dd38e83f92ecf1bc61b698/blob_p-c327eb8f4333a638b2b7049049368f23254aeb9c-03045e6d6a9c8f3c7dacdded8cb97d6e]这个类是从我提交给Flink的相同的Uber-JAR加载的。而且,不存在加载ProducerRecord的多个传递依赖项,我的JAR是该依赖项的唯一供应商。
build.sbt
lazy val flinkVersion = "1.9.0"
libraryDependencies ++= Seq(
"org.apache.flink" %% "flink-table-planner" % flinkVersion,
"org.apache.flink" %% "flink-table-api-scala-bridge" % flinkVersion,
"org.apache.flink" % "flink-s3-fs-hadoop" % flinkVersion,
"org.apache.flink" %% "flink-container" % flinkVersion,
"org.apache.flink" %% "flink-connector-kafka" % flinkVersion,
"org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
"org.apache.flink" % "flink-json" % flinkVersion % "provided",
"org.apache.flink" % "flink-avro" % flinkVersion % "provided",
"org.apache.flink" %% "flink-parquet" % flinkVersion % "provided",
"org.apache.flink" %% "flink-runtime-web" % flinkVersion % "provided",
"org.apache.flink" %% "flink-cep" % flinkVersion
)发布于 2020-08-24 10:46:16
由于未知的原因,将classloader.resolve-order属性设置为parent-first (如Apache邮件列表中所述)可以解决此问题。我仍然对它的工作原理感到困惑,因为加载这个依赖项的不同版本的子类加载器和父类加载器之间不应该存在依赖冲突(因为它不是随我正在使用的flink-dist一起提供的)。
在“调试类加载”下的Flink文档中,有一个这一节讨论的是母子关系。
在涉及动态类加载(插件组件、会话设置中的Flink作业)的设置中,通常有两个ClassLoaders的层次结构:(1) Java的应用程序类加载器,它在类路径中包含所有类,(2)动态插件/用户代码类加载器。用于从插件或用户代码jar加载类。动态ClassLoader将应用程序类加载器作为其父类。 默认情况下,Flink反转类加载顺序,这意味着它首先查看动态类加载器,如果类不是动态加载代码的一部分,则只查看父类(应用程序类加载器)。 倒置类加载的好处是插件和作业可以使用不同的库版本,而不是Flink的核心版本,这在库的不同版本不兼容时非常有用。该机制有助于避免常见的依赖冲突错误,如IllegalAccessError或NoSuchMethodError。代码的不同部分只是拥有类的单独副本(Flink的核心或其依赖项之一可以使用与用户代码或插件代码不同的副本)。在大多数情况下,这是很好的工作,不需要来自用户的额外配置。
我还没有理解为什么加载ProducerRecord不止一次,或者异常消息中的这种“不同类型”指的是什么(对-verbose:class的结果进行优化只为ProducerRecord生成了一条路径)。
https://stackoverflow.com/questions/63559514
复制相似问题