我有一个主题“橙子”,其中有10个分区,1个消费者组中的2个消费者。我使用的是Spring Kafka。
由于某些原因,我需要不时地重新读取数据,我需要重置偏移量。我的侦听器实现了ConsumerSeekAware,在onPartitionsAssigned()中我简单地调用了callback#seekToBeginning。这很好用,因为我在日志中看到来自Kafka客户端API (2.3.1)的消息:
正在重置分区oranges-X to offset 0的偏移量。所有分区都会发生这种情况。
然而,实际上只有最后一个分区被重置(9),如果我幸运的话,有时第二个分区(1)也会被重置。所有其他的都不会被重置。
真正让我头疼的是:如果我在要重置的分区列表中省略了分区9,所有其他分区都会正常重置,并且一切都按预期工作。
代码非常简单:
class ... implements ConsumerSeekAware {
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
...
callback.seekToBeginning(topicPartition.topic(), topicPartition.partition());
}
...日志:
19 Jun 09:56:49.442] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-9 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-8 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-1 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-0 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-3 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-2 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-5 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-4 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-7 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-6 to offset 0.发布于 2020-06-24 05:21:10
我不能复制你的问题。
下面是我的Spring Boot测试应用程序:
@SpringBootApplication
public class So62465345Application extends AbstractConsumerSeekAware {
private static final Logger LOG = LoggerFactory.getLogger(So62465345Application.class);
public static void main(String[] args) {
SpringApplication.run(So62465345Application.class, args);
}
@KafkaListener(id = "so62465345", topics = "so62465345")
public void listen(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so62465345").partitions(10).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> IntStream.range(0, 9).forEach(i -> template.send("so62465345", i, null,
System.currentTimeMillis() + ":foo:" + i));
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
try {
Thread.sleep(5000);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
LOG.info("Seeking on assignment");
callback.seekToBeginning(assignments.keySet());
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
LOG.info("Seeking on idle");
callback.seekToBeginning(assignments.keySet());
}
}spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.idle-event-interval=30000
spring.kafka.listener.poll-timeout=2000我已经在onIdleContainer中设置了一个断点,并且使用kafka-console-consumer,我看到在下一个poll()之前,偏移量实际上不会被重置。
当我们执行查找时,Seeking to EARLIEST offset of partition so62465345-1会出现,但Resetting offset for partition so62465345-0 to offset 0直到我们再次调用poll()时才会出现(然后偏移量实际上是重置的)。
因此,我确实看到在当前轮询中没有发生查找,当前轮询返回0条记录,但下一次轮询从头开始。
https://stackoverflow.com/questions/62465345
复制相似问题