首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Kafka listeners,不使用Strings?

Kafka listeners,不使用Strings?
EN

Stack Overflow用户
提问于 2019-01-29 03:23:39
回答 1查看 138关注 0票数 0

我正在遵循一个简单的指南文档来使用Kafka streams和spring boot (Spring guide)

我很清楚如何传入和传出消息,然后在中间我能够进行一些处理,替换@KafkaListenerkafkaTemplate.send()

所以我做了一个非常简单的基础类,如下所示:

代码语言:javascript
复制
@EnableBinding(Processor.class)
public static class UppercaseTransformer {

  @StreamListener
  @Input(Processor.INPUT)
  public void receive(String input) {
    System.out.println(input);
  }
}

然后(也许这是我的错误),我从一个控制器执行以下操作:

代码语言:javascript
复制
 template.send("my-topic","hello world");

我正在使用spring cloud streams,配置如下:

代码语言:javascript
复制
spring:
  cloud:
    stream:
      bindings:
        input:
          destination: my-topic
          group: ${spring.application.name}
          consumer:
            concurrency: ${KAFKA_CONSUMER_CONCURRENCY:3}
        output:
          destination: my-topic
      kafka:
        binder:
          brokers: localhost:9092
          auto-create-topics: false
          required-acks: all
          transaction:
            transaction-id-prefix: ${spring.application.name}-
            producer:
              configuration:
                retries: 3
        bindings:
          input:
            consumer:
              configuration:
                isolation.level: read_committed
              enable-dlq: true
              dlq-name: some-name

我还在使用者和监听器中尝试了这一点

代码语言:javascript
复制
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
              value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

每次我尝试发送一条消息时,我都会收到这样的消息:

代码语言:javascript
复制
class java.lang.String cannot be cast to class [B (java.lang.String and [B are in module java.base of loader 'bootstrap')

不知道哪里出了问题,为什么从普通的监听器到这个版本会有这么大的变化……想法?

EN

回答 1

Stack Overflow用户

发布于 2019-01-29 06:26:06

我刚刚从start.spring.io创建了一个应用程序,并选择了"Cloud Stream“和"Kafka”。生成项目并将其添加到主类中(使用与上面提供的配置相同的配置)。

代码语言:javascript
复制
@SpringBootApplication
@EnableBinding(Processor.class)
public class So54408906Application {

    public static void main(String[] args) {
        SpringApplication.run(So54408906Application.class, args);
    }

    @StreamListener(Processor.INPUT)
    public void receive(String input) {
        System.out.println(input);
    }

}

然后运行kafka控制台生产者脚本。

代码语言:javascript
复制
kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic

脚本中提供的文本将记录在应用程序的控制台上。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54408906

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档