当我使用以下代码时:
KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
.setProperties(kafkaProps)
.setProperty("ssl.truststore.type",trustStoreType)
.setProperty("ssl.truststore.password",trustStorePassword)
.setProperty("ssl.truststore.location",trustStoreLocation)
.setProperty("security.protocol",securityProtocol)
.setProperty("partition.discovery.interval.ms", partitionDiscoveryIntervalMs)
.setProperty("commit.offsets.on.checkpoint", commitOffsetsOnCheckpoint)
.setGroupId(groupId)
.setTopics(kafkaInputTopic)
.setDeserializer(new JSONKeyValueDeserializationSchema(false))
.setStartingOffsets
(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.build();在构建incompatible types: org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema cannot be converted to org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema<org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode>期间,我得到以下错误
.setDeserializer(new JSONKeyValueDeserializationSchema(false))有人知道出了什么问题吗?
发布于 2022-02-10 07:49:28
解决办法是:
KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
.setProperties(kafkaProps)
.setProperty("ssl.truststore.type",trustStoreType)
.setProperty("ssl.truststore.password",trustStorePassword)
.setProperty("ssl.truststore.location",trustStoreLocation)
.setProperty("security.protocol",securityProtocol)
.setProperty("partition.discovery.interval.ms", partitionDiscoveryIntervalMs)
.setProperty("commit.offsets.on.checkpoint", commitOffsetsOnCheckpoint)
.setGroupId(groupId)
.setTopics(kafkaInputTopic)
.setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(fetchMetadata)))
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.build();https://stackoverflow.com/questions/71022606
复制相似问题