首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >弹簧云卡夫卡消费者特性不起作用

弹簧云卡夫卡消费者特性不起作用
EN

Stack Overflow用户
提问于 2022-11-16 14:37:00
回答 1查看 35关注 0票数 0

我有以下application.yml文件:

代码语言:javascript
复制
spring:
  application:
    name: test-app
  cloud:
    stream:
      kafka:
        binder:
          consumerProperties:
            value:
              deserializer: org.apache.kafka.common.serialization.StringDeserializer
          brokers: localhost:9092
          autoCreateTopics: true
          replicationFactor: 1
      bindings:
        listenCloudEvent-in-0:
          destination: com.test.cloudevent
          group: test-app-group
          consumer:
            configuration:
              value:
                deserializer: io.cloudevents.kafka.CloudEventDeserializer
        listenString-in-0:
          destination: com.test.string
          group:  test-app-group
    function:
      definition: listenCloudEvent;listenString

据我所知,下面的属性应该覆盖默认的反序列化器,但它似乎不起作用。

代码语言:javascript
复制
  consumer:
        configuration:
          value:
            deserializer: io.cloudevents.kafka.CloudEventDeserializer

对于com.test.couldevent主题,我得到了以下错误。

org.springframework.messaging.converter.MessageConversionException:无法读取JSON:无法构造io.cloudevents.CloudEvent实例(没有创建者,如默认构造函数,存在):抽象类型需要映射到具体类型、具有自定义反序列化程序或包含其他类型信息

如果我将默认反序列化器更改为事件云反序列化程序,则它可以工作,但其他侦听器则会刹车。

以下是我的侦听器功能:

代码语言:javascript
复制
@Bean
public Consumer<Flux<Message<CloudEvent>>> listenCloudEvent() {
    return inboundMessage -> inboundMessage
            .onErrorStop()
            .doOnNext(message -> log.info("[{}] Message is received.", message.getPayload().getId()))
            .subscribe();
}

如果你能在这个问题上帮我,我真的很感激。我的主要目标是为不同的主题设置不同的反序列化器。但是,我无法成功地完成它,尽管它被指出可以覆盖spring文档中的默认反序列化器。

提前谢谢你!

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-11-16 14:59:31

在使用者属性中缺少kafka.元素。configuration是一个卡夫卡特有的属性,您正在设置一个公共的消费者属性.

请参阅https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#kafka-consumer-properties

以下属性仅适用于卡夫卡消费者,必须以spring.cloud.stream.kafka.bindings.<channelName>.consumer..作为前缀

注意...stream.kafka.bindings...,而不是...stream.bindings...

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/74462328

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档