根据Kafka的文件:
Kafka提供了这样的保证:主题分区只分配给组中的一个使用者。
但我在服役时观察到了不同的行为。以下是一些细节:
我用的是卡夫卡2.8和弹簧卡夫卡2.2.13。
最初,我有一个带有5个分区的Kafka主题topic.1,这个主题在我的服务中使用了Spring的@KafkaListener注释和并发的== 5的ConcurrentKafkaListenerContainerFactory。
后来,我开始使用相同的topic.2和相同的组ID,在相同的服务中使用带有3个分区的ConcurrentKafkaListenerContainerFactory。
有一段时间它也正常工作,但在再平衡过程中没有包含一个使用者线程,而且出于某些原因,继续处理以前分配的分区,即向该分区分配了一个新的使用者,而旧的使用者一直在处理相同的分区,因此相同的记录被处理了两次。在日志中,我看到来自该分区的记录在几天内在两个不同的使用者线程中被消耗和处理了两次。
服务重新启动后,再次正确地分配了使用者。
下面是我的代码,ConcurrentKafkaListenerContainerFactory bean创建:
@Configuration
public class Config {
@Autowired
private KafkaProperties kafkaProperties;
@Bean
public ConcurrentKafkaListenerContainerFactory<Object, Object> transactionalKafkaContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
KafkaTransactionManager kafkaTransactionManager,
ConsumerFactory<Object, Object> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, consumerFactory);
//If an exception is thrown, then we want to seek back for the whole batch
factory.setBatchErrorHandler(new FixedBackoffSeekToCurrentBatchErrorHandler());
//Enable transactional consumer for exactly-once transitivity
factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);
//Enable batch-processing
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
factory.setBatchListener(true);
factory.setConcurrency(5);
return factory;
}
@Bean
public KafkaTransactionManager<?, ?> kafkaTransactionManager(
@Qualifier("transactionalProducerFactory") ProducerFactory producerFactory) {
return new KafkaTransactionManager(producerFactory);
}
@Bean
public ProducerFactory<?, ?> transactionalProducerFactory() {
DefaultKafkaProducerFactory<?, ?> producerFactory = new DefaultKafkaProducerFactory<>(
kafkaProperties.buildProducerProperties()
);
producerFactory.setTransactionIdPrefix("tx-");
return producerFactory;
}
}@KafkaListener-s:
@Service
public class Processor {
@KafkaListener(topics = "topic.1",
groupId = "my-group",
containerFactory = "transactionalKafkaContainerFactory")
public void listener1(List<ConsumerRecord<String, MyObject>> records) {
// process records
}
// Listener for topic.2, added later
@KafkaListener(topics = "topic.2",
groupId = "my-group",
containerFactory = "transactionalKafkaContainerFactory")
public void listener2(List<ConsumerRecord<String, MyObjec>> records) {
// process records
}
}以下是一些日志,其中一个使用者(消费者-7)未被包括在再平衡过程中,并继续使用他的旧分区(主题1-3),而新的使用者(消费者-4)被分配到同一个分区:
03/30/2022 10:23:47.484 [Consumer clientId=consumer-8, groupId=my-group] Setting newly assigned partitions [topic.1-1, topic.1-0]
03/30/2022 10:23:47.484 [Consumer clientId=consumer-4, groupId=my-group] Setting newly assigned partitions [topic.1-3]
03/30/2022 10:23:47.484 [Consumer clientId=consumer-2, groupId=my-group] Setting newly assigned partitions [topic.2-0]
03/30/2022 10:23:47.484 [Consumer clientId=consumer-6, groupId=my-group] Setting newly assigned partitions [topic.1-4]
03/30/2022 10:23:47.483 [Consumer clientId=consumer-5, groupId=my-group] Setting newly assigned partitions [topic.2-2]
03/30/2022 10:23:47.483 [Consumer clientId=consumer-1, groupId=my-group] Setting newly assigned partitions [topic.1-2]
03/30/2022 10:23:47.483 [Consumer clientId=consumer-3, groupId=my-group] Setting newly assigned partitions [topic.2-1]
...
03/30/2022 10:53:55.728 [Consumer clientId=consumer-7, groupId=my-group] Discovered group coordinator ... (id: ... rack: null)
03/30/2022 10:53:55.627 [Consumer clientId=consumer-7, groupId=my-group] Group coordinator ... (id: ... rack: null) is unavailable or invalid, will attempt rediscovery
03/30/2022 10:53:55.627 [Consumer clientId=consumer-7, groupId=my-group] Discovered group coordinator ... (id: ... rack: null)
03/30/2022 10:53:55.507 [Consumer clientId=consumer-7, groupId=my-group] Group coordinator ... (id: ... rack: null) is unavailable or invalid, will attempt rediscovery
03/30/2022 10:53:55.507 [Consumer clientId=consumer-7, groupId=my-group] Discovered group coordinator ... (id: ... rack: null)处理同一记录两次的示例(排除特定域的详细信息):
03/30/2022 11:55:05.144 PM +0300 Processing payload -> EventId: 289f43b4-b07b-4a1f-b768-0453e0c42719, Topic: topic.1, Partition: 3, Offset: 10903844 org.springframework.kafka.KafkaListenerEndpointContainer#1-4-C-1
03/30/2022 11:55:05.143 PM +0300 Processing payload -> EventId: 289f43b4-b07b-4a1f-b768-0453e0c42719, Topic: topic.1, Partition: 3, Offset: 10903844 org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1注意:业务逻辑细节在这里并不重要,所以我使用了简单的名称:topic.1、Processor、my-group等。
我的问题是,如何解释这种行为?为什么消费者-7能够在重新平衡后使用旧分区的消息?添加具有相同组ID的新@KafkaListener会导致此问题吗?(至少当我只有一个@KafkaListener服务时,我没有看到这种行为)
发布于 2022-04-25 15:09:24
我在那些日志里没有看到任何证据证明你的假设。
不过,有一种可能是,消费者您应该确保可以在max.poll.records中处理max.poll.interval.ms,以避免出现这种情况。
无论如何,在这种情况下使用同一个组并不是一个好做法,因为对一个主题进行重新平衡会导致对另一个主题进行不必要的再平衡,这是不可取的。
您还应该升级到一个受支持的spring版本;2.2.x已经很久没有支持了。
https://spring.io/projects/spring-kafka#support
当前版本为2.8.5;2.7.x很快就不再支持OSS了。
https://stackoverflow.com/questions/72001600
复制相似问题