我希望我的Flink应用程序将数据从ConsumerRecord<byte[], byte[]>的Flink流反序列化到Tuple2<byte[], byte[]>的Flink流,或者使用Flink org.apache.flink.api.java.tuple.Tuple2类来反序列化org.apache.flink.api.java.tuple.Tuple2的Flink流。
使用这两个选项,我无法获得编译的getProducedType实现。
public class KafkaNOOPDeserialization implements KafkaDeserializationSchema<ConsumerRecord<byte[], byte[]>> {
@Override
public boolean isEndOfStream(ConsumerRecord<byte[], byte[]> nextElement) {
return false;
}
@Override
public ConsumerRecord<byte[], byte[]> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
return record;
}
@Override
public TypeInformation<ConsumerRecord<byte[], byte[]>> getProducedType() {
// This provides TypeInformation<ConsumerRecord>
// It needs TypeInformation<ConsumerRecord<byte[], byte[]>>
var typeInfo = TypeExtractor.getForClass(ConsumerRecord.class);
// This hard cast won't compile
return (TypeInformation<ConsumerRecord<byte[], byte[]>>) typeInfo;
}
}public class KafkaByteArrayTupleDeserializer implements KafkaDeserializationSchema<Tuple2<byte[], byte[]>> {
@Override
public boolean isEndOfStream(Tuple2<byte[], byte[]> nextElement) {
return false;
}
@Override
public Tuple2<byte[], byte[]> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
return new Tuple2(record.key(), record.value());
}
@Override
public TypeInformation<Tuple2<byte[], byte[]>> getProducedType() {
// This provides TupleTypeInfo<Tuple>
// It needs TupleTypeInfo<Tuple2<byte[], byte[]>>
var typeInfo = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(
byte[].class, byte[].class);
// Hard cast won't compile
return (TypeInformation<Tuple2<byte[], byte[]>>) typeInfo;
}
}发布于 2022-05-18 14:46:54
Flink提供了帮助实现此功能的org.apache.flink.api.common.typeinfo.TypeHint类。(文档)
例如,在第二个示例中,而不是:
return (TypeInformation<Tuple2<byte[], byte[]>>) typeInfo;你可以写:
return new TypeHint<Tuple2<byte[], byte[]>>(){}.getTypeInfo();https://stackoverflow.com/questions/67524265
复制相似问题