我正在尝试基于flink:1.4.1-hadoop27-scala_2.11-alpine映像将Flink作业部署到集群中。这项工作是使用卡夫卡连接器源(flink-连接器- Kafka -0.11),我正在尝试分配时间戳和水印。我的代码非常类似于Flink Kafka连接器文档中的Scala示例。但有了FlinkKafkaConsumer011
val myConsumer = new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties)
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter())当从我的IDE在本地运行时,这是很好的。但是,在集群环境中,我得到了以下错误:
java.lang.ClassNotFoundException: com.my.organization.CustomWatermarkEmitter
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1863)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1746)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2037)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:393)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:380)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:368)
at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.createPartitionStateHolders(AbstractFetcher.java:521)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.<init>(AbstractFetcher.java:167)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.<init>(Kafka09Fetcher.java:89)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.<init>(Kafka010Fetcher.java:62)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010.createFetcher(FlinkKafkaConsumer010.java:203)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:564)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)我正在构建我的工作,作为一个胖罐子,我已经证实它包含了这个类。仅当类位于/opt/flink/lib/文件夹中时,文档中的这个示例才能工作吗?
这就是我解决问题的方法。但是,必须单独构建这个类并将其放在/opt/flink/lib中会使我的构建过程变得非常复杂,所以我想知道这是应该解决的方式,还是解决这个问题的其他方法?
例如,Flink文档中的这一节暗示必须手动提供一些源( UserCodeClassLoader )?包括提供的卡夫卡消息来源?
据我在org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher:中所能看到的,它似乎在内部使用了"userCodeClassLoader“。
case PERIODIC_WATERMARKS: {
for (Map.Entry<KafkaTopicPartition, Long> partitionEntry : partitionsToInitialOffsets.entrySet()) {
KPH kafkaHandle = createKafkaPartitionHandle(partitionEntry.getKey());
AssignerWithPeriodicWatermarks<T> assignerInstance =
watermarksPeriodic.deserializeValue(userCodeClassLoader);
KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> partitionState =
new KafkaTopicPartitionStateWithPeriodicWatermarks<>(
partitionEntry.getKey(),
kafkaHandle,
assignerInstance);
partitionState.setOffset(partitionEntry.getValue());
partitionStates.add(partitionState);
}编辑:
我创建了一个简单的项目,在这里可以重现这个问题:https://github.com/lragnarsson/flink-kafka-classpath-problem
为了繁殖,你需要码头和码头组成。
只要做:
这将导致异常java.lang.ClassNotFoundException: se.ragnarsson.lage.MyTimestampExtractor。
发布于 2018-03-06 22:20:35
我认为您无意中发现了Flink 1.4.1:https://issues.apache.org/jira/browse/FLINK-8741中引入的一个bug。
不久将在1.4.2内修复。您可以尝试在1.4.2.rc2:https://github.com/apache/flink/tree/release-1.4.2-rc2上进行测试
https://stackoverflow.com/questions/49112172
复制相似问题