在我的应用程序中,streamlistner使用了来自多个主题的kafka消息,但我想为一个主题添加功能性的消费者bean,这是否可能一些主题由streamlistner消费,一些主题与消费者bean在同一应用程序中使用?当我尝试的时候,只创建了哪些主题是streamlistner消费的,而不是消费者bean的?
application.yml
spring:
cloud:
function:
definition: printString
stream:
kafka:
binder:
brokers: localhost:9092
zkNodes: localhost:2181
autoCreateTopics: true
autoAddPartitions: true
bindings:
printString-in-0:
destination: function-input-topic
group: abc
consumer:
maxAttempts: 1
partitioned: true
concurrency: 2
xyz-input:
group: abc
destination: xyz-input-input
consumer:
maxAttempts: 1
partitioned: true
concurrency: 2为xyz-input主题创建了主题,但不是为function-input-topic创建了主题
消费者bean
import java.util.function.Consumer;
@Component
public class xyz {
@Bean
Consumer<String> printString() {
return System.out::print;
}
}kafkaConfig接口
public interface KafkaConfig {
@Input("xyz-input")
SubscribableChannel inbound();
}发布于 2021-03-22 21:50:32
不,我们在过去尝试过,但当两种编程模式冲突时,会出现一些微妙的问题。将任何StreamListener分解成函数方式(主要是删除代码)非常简单,应用程序实际上变得简单得多。
只要去掉所有的KafkaConfig接口,去掉@EnableBinding,你就应该没问题了。
https://stackoverflow.com/questions/66745581
复制相似问题