我正在编写一个从kafka消费的flink应用程序
FlinkKafkaConsumer<MyPojo> consumer = new FlinkKafkaConsumer(TOPIC, new MyPojoDes(), prop)
StreamExecutionEnvironment env = new StreamExecutionEnvironment ();
env.addSource(consumer).print();在较高的层次上,这个应用程序消耗来自kafka主题的消息并打印出来。但是,当我运行它时,我会出现异常。
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)引起的
:java.lang.NullPointerException: null
我试着调试到代码中,发现OperatorChain类中有一个序列化器变量,这是空的,不确定原因。
发布于 2020-12-09 11:57:06
我在反序列化器中调试时发现,我返回的是null,而不是actuall类型信息,因此OperatorChain中的序列化变量为null。
public TypeInformation<MyPojo> getProducerdType()
{
//return null
return TypeInformationOf(MyPojo.class);
}https://stackoverflow.com/questions/65202983
复制相似问题