首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Kafka消费者组消费者故障转移检测

Kafka消费者组消费者故障转移检测
EN

Stack Overflow用户
提问于 2021-11-18 14:04:09
回答 1查看 30关注 0票数 0

我有一个kafka主题,只有一个分区。

在任何时候,都可以有多个kafka客户端。所有客户端都是使用相同的消费者组订阅的。因此,在任何给定的时间点,只有1个客户端会接收消息。假设从t0到t10 consumer1收到消息,但一段时间后它停止收到消息,consumer2被选举为新的领导者(可能是因为consumer1中的GC暂停)。在我的consumer1中,我想要检测这种故障转移何时发生,以便它可以刷新其本地状态。

是否可以使用kafka客户端代码?

EN

回答 1

Stack Overflow用户

发布于 2021-11-18 14:57:47

可以使用ConsumerRebalanceListener接口中提供onPartitionsRevoked回调方法。

来自description

用户可以实现的回调方法,用于提供对自定义存储的偏移量提交的处理。当使用者必须放弃某些分区时,将在重新平衡操作期间调用此方法。当消费者被关闭或取消订阅时,也可以调用它。建议在此回调中将偏移量提交给Kafka或自定义偏移量存储,以防止重复数据。

示例实现:

代码语言:javascript
复制
private static class ConsumerPartitionAssignmentListener implements ConsumerRebalanceListener {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        System.out.println(String.format("Partitions revoke listener: %s", partitions.toString()));
        // Add your changes here to flush
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        System.out.println(String.format("Partitions assignment listener: %s", partitions.toString()));
    }
}

示例:

代码语言:javascript
复制
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

public class TestConsumer {
    
    KafkaConsumer<String, String> kafkaConsumer;
    
    public static void main(String[] args) {
        TestConsumer consumer = new TestConsumer();
        consumer.pollMessages();
    }

    public TestConsumer() {
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kafka-example-consumer");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        
        kafkaConsumer = new KafkaConsumer<>(properties);
        kafkaConsumer.subscribe(Arrays.asList("input-topic"), new ConsumerPartitionAssignmentListener());
    }
    
    public void pollMessages() {
        while(true) {
            System.out.println("Polling");
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(5000));
            System.out.println(records.count());
        }
    }
    
    private static class ConsumerPartitionAssignmentListener implements ConsumerRebalanceListener {
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            System.out.println(String.format("Partitions revoke listener: %s", partitions.toString()));
            // Add your changes here to flush
        }

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            System.out.println(String.format("Partitions assignment listener: %s", partitions.toString()));
        }
    }
}

输出:

代码语言:javascript
复制
Poll
Partitions assignment listener: [input-topic-0]
0
Poll
0
Poll
Partitions revoke listener: [input-topic-0]
Partitions assignment listener: [input-topic-0]
0
Poll
0
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70021270

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档