我有一个抽象类,它的抽象方法创建了一个SourceFunction,因此派生类可以返回简单或复杂的源(例如KafkaConsumers等)。ChangeMe是通过编译AvroSchema创建的一个简单的自动生成的类。
public SourceFunction<ChangeMe> createSourceFunction(ParameterTool params) {
FromElementsFunction<ChangeMe> dataSource = null;
List<ChangeMe> changeMeList = Arrays.asList(
ChangeMe.newBuilder().setSomeField("Some field 1").build(),
ChangeMe.newBuilder().setSomeField("Some field 2").build(),
ChangeMe.newBuilder().setSomeField("Some field 3").build()
);
try {
dataSource = new FromElementsFunction<>(new AvroSerializer<>(ChangeMe.class), changeMeList);
}
catch (IOException ex){
}
return dataSource;
}在我的Flink工作中,我基本上是这样的:
SourceFunction<ChangeMe> source = createSourceFunction(params);
DataStream<T> sourceDataStream = streamExecutionEnvironment.addSource(source);
DataStream<ChangeMe> changeMeEventsStream = this.getSourceDataStream(); // gets sourceDataStream above
changeMeEventsStream.print();当我运行作业时,我收到以下关于调用print()的错误:
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'Custom Source' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
……
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'T' in 'class org.apache.flink.streaming.api.functions.source.FromElementsFunction' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s).我使用的是Eclipse编译器,所以我以为会包含类型信息(尽管我认为这只适用于lambdas,上面没有)。我需要做什么才能让它正确运行?
https://stackoverflow.com/questions/44724251
复制相似问题