首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >同样群体下的卡夫卡消费者在重新平衡后使用相同的分区。

同样群体下的卡夫卡消费者在重新平衡后使用相同的分区。
EN

Stack Overflow用户
提问于 2022-04-25 14:59:52
回答 1查看 309关注 0票数 1

根据Kafka的文件:

Kafka提供了这样的保证:主题分区只分配给组中的一个使用者。

但我在服役时观察到了不同的行为。以下是一些细节:

我用的是卡夫卡2.8和弹簧卡夫卡2.2.13。

最初,我有一个带有5个分区的Kafka主题topic.1,这个主题在我的服务中使用了Spring的@KafkaListener注释和并发的== 5的ConcurrentKafkaListenerContainerFactory

后来,我开始使用相同的topic.2和相同的组ID,在相同的服务中使用带有3个分区的ConcurrentKafkaListenerContainerFactory

有一段时间它也正常工作,但在再平衡过程中没有包含一个使用者线程,而且出于某些原因,继续处理以前分配的分区,即向该分区分配了一个新的使用者,而旧的使用者一直在处理相同的分区,因此相同的记录被处理了两次。在日志中,我看到来自该分区的记录在几天内在两个不同的使用者线程中被消耗和处理了两次。

服务重新启动后,再次正确地分配了使用者。

下面是我的代码,ConcurrentKafkaListenerContainerFactory bean创建:

代码语言:javascript
复制
@Configuration
public class Config {

    @Autowired
    private KafkaProperties kafkaProperties;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<Object, Object> transactionalKafkaContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            KafkaTransactionManager kafkaTransactionManager,
            ConsumerFactory<Object, Object> consumerFactory) {

        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, consumerFactory);

        //If an exception is thrown, then we want to seek back for the whole batch
        factory.setBatchErrorHandler(new FixedBackoffSeekToCurrentBatchErrorHandler());

        //Enable transactional consumer for exactly-once transitivity 
        factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);

        //Enable batch-processing
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
        factory.setBatchListener(true);

        factory.setConcurrency(5);
        return factory;
    }

    @Bean
    public KafkaTransactionManager<?, ?> kafkaTransactionManager(
            @Qualifier("transactionalProducerFactory") ProducerFactory producerFactory) {
        return new KafkaTransactionManager(producerFactory);
    }

    @Bean
    public ProducerFactory<?, ?> transactionalProducerFactory() {
        DefaultKafkaProducerFactory<?, ?> producerFactory = new DefaultKafkaProducerFactory<>(
                kafkaProperties.buildProducerProperties()
        );
        producerFactory.setTransactionIdPrefix("tx-");
        return producerFactory;
    }

}

@KafkaListener-s:

代码语言:javascript
复制
@Service
public class Processor {

    @KafkaListener(topics = "topic.1",
            groupId = "my-group",
            containerFactory = "transactionalKafkaContainerFactory")
    public void listener1(List<ConsumerRecord<String, MyObject>> records) {
        // process records
    }

    // Listener for topic.2, added later 
    @KafkaListener(topics = "topic.2",
            groupId = "my-group",
            containerFactory = "transactionalKafkaContainerFactory")
    public void listener2(List<ConsumerRecord<String, MyObjec>> records) {
        // process records
    }

}

以下是一些日志,其中一个使用者(消费者-7)未被包括在再平衡过程中,并继续使用他的旧分区(主题1-3),而新的使用者(消费者-4)被分配到同一个分区:

代码语言:javascript
复制
03/30/2022 10:23:47.484 [Consumer clientId=consumer-8, groupId=my-group] Setting newly assigned partitions [topic.1-1, topic.1-0]
03/30/2022 10:23:47.484 [Consumer clientId=consumer-4, groupId=my-group] Setting newly assigned partitions [topic.1-3]  
03/30/2022 10:23:47.484 [Consumer clientId=consumer-2, groupId=my-group] Setting newly assigned partitions [topic.2-0]  
03/30/2022 10:23:47.484 [Consumer clientId=consumer-6, groupId=my-group] Setting newly assigned partitions [topic.1-4]  
03/30/2022 10:23:47.483 [Consumer clientId=consumer-5, groupId=my-group] Setting newly assigned partitions [topic.2-2]  
03/30/2022 10:23:47.483 [Consumer clientId=consumer-1, groupId=my-group] Setting newly assigned partitions [topic.1-2]  
03/30/2022 10:23:47.483 [Consumer clientId=consumer-3, groupId=my-group] Setting newly assigned partitions [topic.2-1]
...

03/30/2022 10:53:55.728 [Consumer clientId=consumer-7, groupId=my-group] Discovered group coordinator ... (id: ... rack: null)
03/30/2022 10:53:55.627 [Consumer clientId=consumer-7, groupId=my-group] Group coordinator ... (id: ... rack: null) is unavailable or invalid, will attempt rediscovery
03/30/2022 10:53:55.627 [Consumer clientId=consumer-7, groupId=my-group] Discovered group coordinator ... (id: ... rack: null)
03/30/2022 10:53:55.507 [Consumer clientId=consumer-7, groupId=my-group] Group coordinator ... (id: ... rack: null) is unavailable or invalid, will attempt rediscovery
03/30/2022 10:53:55.507 [Consumer clientId=consumer-7, groupId=my-group] Discovered group coordinator ... (id: ... rack: null)

处理同一记录两次的示例(排除特定域的详细信息):

代码语言:javascript
复制
03/30/2022 11:55:05.144 PM +0300    Processing payload -> EventId: 289f43b4-b07b-4a1f-b768-0453e0c42719, Topic: topic.1, Partition: 3, Offset: 10903844 org.springframework.kafka.KafkaListenerEndpointContainer#1-4-C-1
03/30/2022 11:55:05.143 PM +0300    Processing payload -> EventId: 289f43b4-b07b-4a1f-b768-0453e0c42719, Topic: topic.1, Partition: 3, Offset: 10903844 org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1

注意:业务逻辑细节在这里并不重要,所以我使用了简单的名称:topic.1Processormy-group等。

我的问题是,如何解释这种行为?为什么消费者-7能够在重新平衡后使用旧分区的消息?添加具有相同组ID的新@KafkaListener会导致此问题吗?(至少当我只有一个@KafkaListener服务时,我没有看到这种行为)

EN

回答 1

Stack Overflow用户

发布于 2022-04-25 15:09:24

我在那些日志里没有看到任何证据证明你的假设。

不过,有一种可能是,消费者您应该确保可以在max.poll.records中处理max.poll.interval.ms,以避免出现这种情况。

无论如何,在这种情况下使用同一个组并不是一个好做法,因为对一个主题进行重新平衡会导致对另一个主题进行不必要的再平衡,这是不可取的。

您还应该升级到一个受支持的spring版本;2.2.x已经很久没有支持了。

https://spring.io/projects/spring-kafka#support

当前版本为2.8.5;2.7.x很快就不再支持OSS了。

票数 -1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72001600

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档