我正在遵循一个简单的指南文档来使用Kafka streams和spring boot (Spring guide)
我很清楚如何传入和传出消息,然后在中间我能够进行一些处理,替换@KafkaListener和kafkaTemplate.send()
所以我做了一个非常简单的基础类,如下所示:
@EnableBinding(Processor.class)
public static class UppercaseTransformer {
@StreamListener
@Input(Processor.INPUT)
public void receive(String input) {
System.out.println(input);
}
}然后(也许这是我的错误),我从一个控制器执行以下操作:
template.send("my-topic","hello world");我正在使用spring cloud streams,配置如下:
spring:
cloud:
stream:
bindings:
input:
destination: my-topic
group: ${spring.application.name}
consumer:
concurrency: ${KAFKA_CONSUMER_CONCURRENCY:3}
output:
destination: my-topic
kafka:
binder:
brokers: localhost:9092
auto-create-topics: false
required-acks: all
transaction:
transaction-id-prefix: ${spring.application.name}-
producer:
configuration:
retries: 3
bindings:
input:
consumer:
configuration:
isolation.level: read_committed
enable-dlq: true
dlq-name: some-name我还在使用者和监听器中尝试了这一点
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer每次我尝试发送一条消息时,我都会收到这样的消息:
class java.lang.String cannot be cast to class [B (java.lang.String and [B are in module java.base of loader 'bootstrap')不知道哪里出了问题,为什么从普通的监听器到这个版本会有这么大的变化……想法?
发布于 2019-01-29 06:26:06
我刚刚从start.spring.io创建了一个应用程序,并选择了"Cloud Stream“和"Kafka”。生成项目并将其添加到主类中(使用与上面提供的配置相同的配置)。
@SpringBootApplication
@EnableBinding(Processor.class)
public class So54408906Application {
public static void main(String[] args) {
SpringApplication.run(So54408906Application.class, args);
}
@StreamListener(Processor.INPUT)
public void receive(String input) {
System.out.println(input);
}
}然后运行kafka控制台生产者脚本。
kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic脚本中提供的文本将记录在应用程序的控制台上。
https://stackoverflow.com/questions/54408906
复制相似问题