我在与kafka streams主题通信时遇到错误。当我尝试从第一个主题到第二个主题时,它工作得很好。但是当从第二个主题发送到第三个主题时,它错误地说无法创建消费者绑定,30秒后重试,注册表消费者属性处出现空指针异常调试时我意识到,第三个主题的applicationID被作为空传递,尽管我已经明确指定了它
我的application.properties文件
## Topic 1
- spring.cloud.stream.bindings.input1.destination=topic-1
- spring.cloud.stream.bindings.input1.consumer.applicationId=processor-1
## Topic 2
- spring.cloud.stream.bindings.output2.bindings=<topic-2 name>
- spring.cloud.stream.bindings.input2.destination=<topic-2 name >
- spring.cloud.stream.bindings.input2.consumer.applicationId=processor-2
## Topic 3
- spring.cloud.stream.bindings.output3.destination=<topic-3 name>
- spring.cloud.stream.bindings.input3.destination=<topic-3 name>
- spring.cloud.stream.bindings.input3.consumer.applicationId=processor-31st listener
public classA {
@StreamListener
public void process(@INPUT(check.Const_A) KStream<String, StreamClass> stream)
stream.foreach((key,value)-> System.out.println(key + value)));
stream.to(check.Const_B);
}2nd listener
public classB {
@StreamListener
public void process(@INPUT(check.Const_C) KStream<String, StreamClass> stream)
stream.foreach((key,value)-> System.out.println(key + value)));
stream.to(check.Const_D);
}3rd listener
public classC {
@StreamListener
public void process(@INPUT(check.Const_E) KStream<String, StreamClass> stream)
stream.foreach((key,value)-> System.out.println(key + value)));
}Interface (to be added in @EnableBinding annotation)
public check {
String Const_A = "input1"
String Const_B="output2"
String CONST_C="input2"
String CONST_D="output3"
String CONST_E="input3"
@Input(Const_A)
KStream<String,StreamClass> inATopic();
@Output(Const_B)
MessageChannel outATopic();
@Input(Const_C)
KStream<String,StreamClass> inBTopic();
@Output(Const_D)
MessageChannel outBTopic();
@Output(Const_E)
KStream<String,StreamClass> outtCTopic();
}我甚至尝试用不同的名称更改每个进程的名称,但似乎都不起作用。有没有人能帮个忙。
发布于 2021-01-28 22:40:22
经过太多的调试后,我意识到我错过了第三个侦听器上的@Component注释,因此Spring boot并不是只接受它
https://stackoverflow.com/questions/65933740
复制相似问题