我有以下application.yml文件:
spring:
application:
name: test-app
cloud:
stream:
kafka:
binder:
consumerProperties:
value:
deserializer: org.apache.kafka.common.serialization.StringDeserializer
brokers: localhost:9092
autoCreateTopics: true
replicationFactor: 1
bindings:
listenCloudEvent-in-0:
destination: com.test.cloudevent
group: test-app-group
consumer:
configuration:
value:
deserializer: io.cloudevents.kafka.CloudEventDeserializer
listenString-in-0:
destination: com.test.string
group: test-app-group
function:
definition: listenCloudEvent;listenString据我所知,下面的属性应该覆盖默认的反序列化器,但它似乎不起作用。
consumer:
configuration:
value:
deserializer: io.cloudevents.kafka.CloudEventDeserializer对于com.test.couldevent主题,我得到了以下错误。
org.springframework.messaging.converter.MessageConversionException:无法读取JSON:无法构造io.cloudevents.CloudEvent实例(没有创建者,如默认构造函数,存在):抽象类型需要映射到具体类型、具有自定义反序列化程序或包含其他类型信息
如果我将默认反序列化器更改为事件云反序列化程序,则它可以工作,但其他侦听器则会刹车。
以下是我的侦听器功能:
@Bean
public Consumer<Flux<Message<CloudEvent>>> listenCloudEvent() {
return inboundMessage -> inboundMessage
.onErrorStop()
.doOnNext(message -> log.info("[{}] Message is received.", message.getPayload().getId()))
.subscribe();
}如果你能在这个问题上帮我,我真的很感激。我的主要目标是为不同的主题设置不同的反序列化器。但是,我无法成功地完成它,尽管它被指出可以覆盖spring文档中的默认反序列化器。
提前谢谢你!
发布于 2022-11-16 14:59:31
在使用者属性中缺少kafka.元素。configuration是一个卡夫卡特有的属性,您正在设置一个公共的消费者属性.
以下属性仅适用于卡夫卡消费者,必须以
spring.cloud.stream.kafka.bindings.<channelName>.consumer..作为前缀
注意...stream.kafka.bindings...,而不是...stream.bindings...。
https://stackoverflow.com/questions/74462328
复制相似问题