首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Flink TypeInfo Java泛型

Flink TypeInfo Java泛型
EN

Stack Overflow用户
提问于 2021-05-13 18:22:33
回答 1查看 279关注 0票数 1

我希望我的Flink应用程序将数据从ConsumerRecord<byte[], byte[]>的Flink流反序列化到Tuple2<byte[], byte[]>的Flink流,或者使用Flink org.apache.flink.api.java.tuple.Tuple2类来反序列化org.apache.flink.api.java.tuple.Tuple2的Flink流。

使用这两个选项,我无法获得编译的getProducedType实现。

代码语言:javascript
复制
public class KafkaNOOPDeserialization implements KafkaDeserializationSchema<ConsumerRecord<byte[], byte[]>> {
    @Override
    public boolean isEndOfStream(ConsumerRecord<byte[], byte[]> nextElement) {
        return false;
    }

    @Override
    public ConsumerRecord<byte[], byte[]> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        return record;
    }

    @Override
    public TypeInformation<ConsumerRecord<byte[], byte[]>> getProducedType() {
        // This provides TypeInformation<ConsumerRecord>
        // It needs TypeInformation<ConsumerRecord<byte[], byte[]>>
        var typeInfo = TypeExtractor.getForClass(ConsumerRecord.class);
        // This hard cast won't compile
        return (TypeInformation<ConsumerRecord<byte[], byte[]>>) typeInfo;
    }
}
代码语言:javascript
复制
public class KafkaByteArrayTupleDeserializer implements KafkaDeserializationSchema<Tuple2<byte[], byte[]>> {
    @Override
    public boolean isEndOfStream(Tuple2<byte[], byte[]> nextElement) {
        return false;
    }

    @Override
    public Tuple2<byte[], byte[]> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        return new Tuple2(record.key(), record.value());
    }

    @Override
    public TypeInformation<Tuple2<byte[], byte[]>> getProducedType() {
        // This provides TupleTypeInfo<Tuple>
        // It needs TupleTypeInfo<Tuple2<byte[], byte[]>>
        var typeInfo = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(
                byte[].class, byte[].class);
        // Hard cast won't compile
        return (TypeInformation<Tuple2<byte[], byte[]>>) typeInfo;
    }
}
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-05-18 14:46:54

Flink提供了帮助实现此功能的org.apache.flink.api.common.typeinfo.TypeHint类。(文档)

例如,在第二个示例中,而不是:

代码语言:javascript
复制
return (TypeInformation<Tuple2<byte[], byte[]>>) typeInfo;

你可以写:

代码语言:javascript
复制
return new TypeHint<Tuple2<byte[], byte[]>>(){}.getTypeInfo();
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67524265

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档