首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Kafka -重置分区偏移量不起作用

Kafka -重置分区偏移量不起作用
EN

Stack Overflow用户
提问于 2020-06-19 15:29:32
回答 1查看 3.5K关注 0票数 0

我有一个主题“橙子”,其中有10个分区,1个消费者组中的2个消费者。我使用的是Spring Kafka。

由于某些原因,我需要不时地重新读取数据,我需要重置偏移量。我的侦听器实现了ConsumerSeekAware,在onPartitionsAssigned()中我简单地调用了callback#seekToBeginning。这很好用,因为我在日志中看到来自Kafka客户端API (2.3.1)的消息:

正在重置分区oranges-X to offset 0的偏移量。所有分区都会发生这种情况。

然而,实际上只有最后一个分区被重置(9),如果我幸运的话,有时第二个分区(1)也会被重置。所有其他的都不会被重置。

真正让我头疼的是:如果我在要重置的分区列表中省略了分区9,所有其他分区都会正常重置,并且一切都按预期工作。

代码非常简单:

代码语言:javascript
复制
class ... implements ConsumerSeekAware {
    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
...
        callback.seekToBeginning(topicPartition.topic(), topicPartition.partition());

}
...

日志:

代码语言:javascript
复制
19 Jun 09:56:49.442] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-9 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-8 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-1 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-0 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-3 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-2 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-5 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-4 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-7 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-6 to offset 0.
EN

回答 1

Stack Overflow用户

发布于 2020-06-24 05:21:10

我不能复制你的问题。

下面是我的Spring Boot测试应用程序:

代码语言:javascript
复制
@SpringBootApplication
public class So62465345Application extends AbstractConsumerSeekAware {


    private static final Logger LOG = LoggerFactory.getLogger(So62465345Application.class);


    public static void main(String[] args) {
        SpringApplication.run(So62465345Application.class, args);
    }

    @KafkaListener(id = "so62465345", topics = "so62465345")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so62465345").partitions(10).replicas(1).build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> IntStream.range(0, 9).forEach(i -> template.send("so62465345", i, null,
                System.currentTimeMillis() + ":foo:" + i));
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        try {
            Thread.sleep(5000);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        LOG.info("Seeking on assignment");
        callback.seekToBeginning(assignments.keySet());
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        LOG.info("Seeking on idle");
        callback.seekToBeginning(assignments.keySet());
    }

}
代码语言:javascript
复制
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.idle-event-interval=30000
spring.kafka.listener.poll-timeout=2000

我已经在onIdleContainer中设置了一个断点,并且使用kafka-console-consumer,我看到在下一个poll()之前,偏移量实际上不会被重置。

当我们执行查找时,Seeking to EARLIEST offset of partition so62465345-1会出现,但Resetting offset for partition so62465345-0 to offset 0直到我们再次调用poll()时才会出现(然后偏移量实际上是重置的)。

因此,我确实看到在当前轮询中没有发生查找,当前轮询返回0条记录,但下一次轮询从头开始。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62465345

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档