首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema不能转换为ObjectNode

org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema不能转换为ObjectNode
EN

Stack Overflow用户
提问于 2022-02-07 17:16:34
回答 1查看 236关注 0票数 2

当我使用以下代码时:

代码语言:javascript
复制
    KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
            .setProperties(kafkaProps)
            .setProperty("ssl.truststore.type",trustStoreType)
            .setProperty("ssl.truststore.password",trustStorePassword)
            .setProperty("ssl.truststore.location",trustStoreLocation)
            .setProperty("security.protocol",securityProtocol)
            .setProperty("partition.discovery.interval.ms", partitionDiscoveryIntervalMs)
            .setProperty("commit.offsets.on.checkpoint", commitOffsetsOnCheckpoint)
            .setGroupId(groupId)
            .setTopics(kafkaInputTopic)
            .setDeserializer(new JSONKeyValueDeserializationSchema(false))
            .setStartingOffsets
             (OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
            .build();

在构建incompatible types: org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema cannot be converted to org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema<org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode>期间,我得到以下错误

代码语言:javascript
复制
                .setDeserializer(new JSONKeyValueDeserializationSchema(false))

有人知道出了什么问题吗?

EN

回答 1

Stack Overflow用户

发布于 2022-02-10 07:49:28

解决办法是:

代码语言:javascript
复制
    KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
            .setProperties(kafkaProps)
            .setProperty("ssl.truststore.type",trustStoreType)
            .setProperty("ssl.truststore.password",trustStorePassword)
            .setProperty("ssl.truststore.location",trustStoreLocation)
            .setProperty("security.protocol",securityProtocol)
            .setProperty("partition.discovery.interval.ms", partitionDiscoveryIntervalMs)
            .setProperty("commit.offsets.on.checkpoint", commitOffsetsOnCheckpoint)
            .setGroupId(groupId)
            .setTopics(kafkaInputTopic)
            .setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(fetchMetadata)))
            .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
            .build();
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71022606

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档