我试图在每次调用消费者时重置消费者offset,这样当我多次调用消费者时,它仍然可以读取生产者发送的记录。我设置了props.put("auto.offset.reset","earliest");并调用了consumer.seekToBeginning(consumer.assignment());,但是当我第二次调用消费者时,它将不会收到任何记录。我该如何解决这个问题呢?
public ConsumerRecords<String, byte[]> consumer(){
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
//props.put("group.id", String.valueOf(System.currentTimeMillis()));
props.put("auto.offset.reset","earliest");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("topiccc"));
ConsumerRecords<String, byte[]> records = consumer.poll(100);
consumer.seekToBeginning(consumer.assignment());
/* List<byte[]> videoContents = new ArrayList<byte[]>();
for (ConsumerRecord<String, byte[]> record : records) {
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
videoContents.add(record.value());
}*/
return records;
}
public String producer(@RequestParam("message") String message) {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections to the Kakfa cluster
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
Producer<String, byte[]> producer = new KafkaProducer<>(props);
Path path = Paths.get("C:/Programming Files/video-2012-07-05-02-29-27.mp4");
ProducerRecord<String, byte[]> record = null;
try {
record = new ProducerRecord<>("topiccc", "keyyyyy"
, Files.readAllBytes(path));
} catch (IOException e) {
e.printStackTrace();
}
producer.send(record);
producer.close();
//kafkaSender.send(record);
return "Message sent to the Kafka Topic java_in_use_topic Successfully";
}发布于 2021-01-06 00:13:23
从Kafka Java代码中,AUTO_OFFSET_RESET_CONFIG上的documentation表示:
当Kafka中没有初始偏移量或当前偏移量在服务器上不再存在时(例如,由于该数据已被删除),该如何处理:
最早:自动将偏移量重置为最早偏移量
最新:自动将偏移量重置为最新偏移量
none:如果找不到消费者所属组之前的偏移量,则向消费者抛出异常
其他:向消费者抛出异常。
这可以在GitHub:https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java中找到
我们可以从他们的评论中看到,只有当偏移量不在服务器上时才使用该设置。在问题中,偏移量是从服务器检索的,这就是为什么偏移量不是重置到开头,而是停留在最后一个偏移量,使其看起来没有更多的记录。
您需要显式地重置服务器端的偏移量,才能按照问题中的要求修复此问题。
这是另一个描述如何做到这一点的答案。https://stackoverflow.com/a/54492802/231860
这是允许我重置偏移量的代码片段。注意:如果调用subscribe方法,则不能调用seekToBeginning。只有当我自己使用assign方法分配分区时,我才能让它工作。真可惜。
// Create the consumer:
final Consumer<String, DataRecord> consumer = new KafkaConsumer<>(props);
// Get the partitions that exist for this topic:
List<PartitionInfo> partitions = consumer.partitionsFor(topic);
// Get the topic partition info for these partitions:
List<TopicPartition> topicPartitions = partitions.stream().map(info -> new TopicPartition(info.topic(), info.partition())).collect(Collectors.toList());
// Assign all the partitions to the topic so that we can seek to the beginning:
// NOTE: We can't use subscribe if we use assign, but we can't seek to the beginning if we use subscribe.
consumer.assign(topicPartitions);
// Make sure we seek to the beginning of the partitions:
consumer.seekToBeginning(topicPartitions);是的,实现一个看似简单的用例似乎极其复杂。这可能表明,整个kafka世界似乎只想读一次流。
发布于 2018-05-08 17:08:42
我通常会创建一个具有不同group.id的新消费者来再次读取记录。所以你可以这样做:
props.put("group.id", Instant.now().getEpochSecond());发布于 2018-05-09 17:13:26
有一个解决方法(不是生产解决方案)可以在每次消费时更改group.id配置值。在许多情况下,将auto.offset.reset设置为earliest是不够的。
https://stackoverflow.com/questions/50228504
复制相似问题