我正在使用Spring-Kafka框架实现来自Kafka主题的消息消费。我正在尝试理解我为我的卡夫卡监听器创建的ConcurrentKafkaListenerContainerFactory的一些用法。@KafkaListener运行良好,但不出所料,在我的场景中,我有多个独立的侦听器,分别侦听多个主题。我想知道我是否可以在我所有的监听器中重用ConcurrentKafkaListenerContainerFactory,或者我必须为每个@KafkaListener创建一个containerFactory。有没有办法让所有@Kafkalisteners共享一个通用的containerFactory?
谢谢你
发布于 2020-04-26 22:24:24
是的,这就是问题的关键--它是一个监听容器的工厂;你通常只需要一个工厂来引导自动配置。
如果您需要侦听器的不同属性(例如,反序列化程序),最近的版本(从spring-kafka 2.2.4开始)允许您覆盖注释上的消费者属性。
要覆盖单个监听程序的其他属性,例如容器属性,请将监听程序容器定制器添加到工厂。
@Component
class ContainerFactoryCustomizer {
ContainerFactoryCustomizer(AbstractKafkaListenerContainerFactory<?, ?, ?> factory) {
factory.setContainerCustomizer(
container -> {
String groupId = container.getContainerProperties().getGroupId();
if (groupId.equals("foo")) {
container.getContainerProperties().set...
}
else {
container.getContainerProperties().set...
}
});
}正如您所看到的,您可以通过访问groupId()容器属性来判断我们在调用哪个容器时正在创建它。
如果你的监听器有非常不同的配置,你可能想使用两个工厂,但是这样你就失去了boot的自动配置功能(至少对于工厂是这样)。
https://stackoverflow.com/questions/61436049
复制相似问题