首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spring @KafkaListener不消耗所有分区

Spring @KafkaListener不消耗所有分区
EN

Stack Overflow用户
提问于 2022-08-11 10:10:00
回答 1查看 349关注 0票数 0

我有一个带有9个分区的kafka主题设置。我的卡夫卡消费者注解如下

代码语言:javascript
复制
@KafkaListener(topics = "myTopic",
              groupId = "topic-group",
              autoStartup = "true") //if this is false listener doesn’t startup
public void consume(ConsumerRecord<?, ?> record) {

当我的应用程序启动时,我看到

代码语言:javascript
复制
11-08-2022 17:14:53.290 [33m[main][0;39m sub= gsk= [34mINFO [0;39m c.a.b.d.e.push.Application.logStarted - Started Application in 8.221 seconds (JVM running for 9.007)
11-08-2022 17:14:56.331 [33m[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1][0;39m sub= gsk= [34mINFO [0;39m o.s.k.l.KafkaMessageListenerContainer.info - topic-group: partitions assigned: [myTopic-2, myTopic-1, myTopic-4, myTopic-3, myTopic-0]

我对此相当陌生,我的理解是KafkaMessageListenerContainer现在已经订阅/轮询/侦听了5个分区。为什么?为什么它不查看所有的9个分区?

当我将concurrency添加到卡夫卡侦听器中时

代码语言:javascript
复制
@KafkaListener(topics = "myTopic",
               concurrency = "9",    
               groupId = "topic-group",
                autoStartup = "true")

我的启动日志是:

代码语言:javascript
复制
11-08-2022 18:41:20.586 [33m[main][0;39m sub= gsk= [34mINFO [0;39m c.a.b.d.e.push.Application.logStarted - Started Application in 7.161 seconds (JVM running for 7.87)
11-08-2022 18:41:23.066 [33m[org.springframework.kafka.KafkaListenerEndpointContainer#0-5-C-1][0;39m sub= gsk= [34mINFO [0;39m o.s.k.l.KafkaMessageListenerContainer.info - topic-group: partitions assigned: [myTopic-5]
11-08-2022 18:41:23.071 [33m[org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1][0;39m sub= gsk= [34mINFO [0;39m o.s.k.l.KafkaMessageListenerContainer.info - topic-group: partitions assigned: [myTopic-2]
11-08-2022 18:41:23.141 [33m[org.springframework.kafka.KafkaListenerEndpointContainer#0-7-C-1][0;39m sub= gsk= [34mINFO [0;39m o.s.k.l.KafkaMessageListenerContainer.info - topic-group: partitions assigned: [myTopic-7]
11-08-2022 18:41:23.141 [33m[org.springframework.kafka.KafkaListenerEndpointContainer#0-6-C-1][0;39m sub= gsk= [34mINFO [0;39m o.s.k.l.KafkaMessageListenerContainer.info - topic-group: partitions assigned: [myTopic-6]
11-08-2022 18:41:23.141 [33m[org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1][0;39m sub= gsk= [34mINFO [0;39m o.s.k.l.KafkaMessageListenerContainer.info - topic-group: partitions assigned: [myTopic-3]
11-08-2022 18:41:23.141 [33m[org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1][0;39m sub= gsk= [34mINFO [0;39m o.s.k.l.KafkaMessageListenerContainer.info - topic-group: partitions assigned: [myTopic-4]
11-08-2022 18:41:23.141 [33m[org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1][0;39m sub= gsk= [34mINFO [0;39m o.s.k.l.KafkaMessageListenerContainer.info - topic-group: partitions assigned: [myTopic-1]
11-08-2022 18:41:23.146 [33m[org.springframework.kafka.KafkaListenerEndpointContainer#0-8-C-1][0;39m sub= gsk= [34mINFO [0;39m o.s.k.l.KafkaMessageListenerContainer.info - topic-group: partitions assigned: [myTopic-8]
11-08-2022 18:41:23.146 [33m[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1][0;39m sub= gsk= [34mINFO [0;39m o.s.k.l.KafkaMessageListenerContainer.info - topic-group: partitions assigned: [myTopic-0]

这启动了9个容器--我认为这不是正确的方法,我不想将自己绑定到9个分区。

如何使用所有9个分区?

P.S:将reactor-kafka标记为生产者是被动的,而且工作正常。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-08-16 12:51:54

分配的

分区: myTopic-2,myTopic-1,myTopic-4,myTopic-3,myTopic-0

最有可能的情况是,您有另一个实例正在运行,该实例分配给其他4个分区。

但是,当您使用并发= 9运行版本时,它没有运行。

如果不是这样的话,就提供一个MCRE,这样我们就可以知道出了什么问题。

PS:默认情况下,autoStartup是真的。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73318927

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档