在需要Spring服务的地方,我要求客户端应用程序每30分钟调用一次,服务将返回
基于查询参数指定的最新消息的
基于查询参数中指定的数量和偏移量的
我有一个简单的独立应用程序,它工作得很好。下面是我测试的内容,并希望在服务中引入它。
public static void main(String[] args) {
// create kafka consumer
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-first-consumer-group");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, args[0]);
Consumer<String, String> consumer = new KafkaConsumer<>(properties);
// subscribe to topic
consumer.subscribe(Collections.singleton("test"));
consumer.poll(0);
//get to specific offset and get specified number of messages
for (TopicPartition partition : consumer.assignment())
consumer.seek(partition, args[1]);
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(5000));
System.out.println("Total Record Count ******* : " + records.count());
for (ConsumerRecord<String, String> record : records) {
System.out.println("Message: " + record.value());
System.out.println("Message offset: " + record.offset());
System.out.println("Message: " + record.timestamp());
Date date = new Date(record.timestamp());
Format format = new SimpleDateFormat("yyyy MM dd HH:mm:ss.SSS");
System.out.println("Message date: " + format.format(date));
}
consumer.commitSync(); 由于我的消费者将在中随心所欲地思考如何实现这一目标。如果我在启动时插入了application.properties,那么我应该在哪里指定这些属性,但是如何在运行时控制MAX_POLL_RECORDS_CONFIG。任何帮助都很感激。
发布于 2020-08-21 03:33:10
MAX_POLL_RECORDS_CONFIG只会影响您的卡夫卡客户端将记录返回到您的春季服务,它将永远不会减少来自kafka-server的用户投票的字节。

看上面的图片,无论你的起始偏移量= 150还是190,卡夫卡服务器都会从(offset=110,offset=190)返回整个数据,kafka服务器甚至不知道有多少记录返回给消费者,他只知道字节大小=(220-110)。
所以我认为你可以自己控制这个记录号码,目前它是由kafka客户端jar控制的,它们都占用了你的jvm本地内存。
发布于 2020-08-23 15:29:52
您的问题的答案是here,使用代码示例的答案是this answer。这两部作品都是由优秀的加里·罗素( Gary )写的,他是“卡夫卡之春”的主要作者或主要人物之一。
TL;DR:
如果您想在运行时任意回滚分区,请让侦听器实现ConsumerSeekAware并获取对ConsumerSeekCallback.
的引用。
https://stackoverflow.com/questions/63513904
复制相似问题