首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Sping Boot服务按需使用kafka消息

Sping Boot服务按需使用kafka消息
EN

Stack Overflow用户
提问于 2020-08-20 22:23:41
回答 2查看 747关注 0票数 0

在需要Spring服务的地方,我要求客户端应用程序每30分钟调用一次,服务将返回

基于查询参数指定的最新消息的

  1. 数,例如,http://messages.com/getNewMessages?number=10在本例中应返回10条消息

基于查询参数中指定的数量和偏移量的

  1. 消息数,例如,在这种情况下,http://messages.com/getSpecificMessages?number=5&start=123应该返回5条开始偏移量123的消息。

我有一个简单的独立应用程序,它工作得很好。下面是我测试的内容,并希望在服务中引入它。

代码语言:javascript
复制
public static void main(String[] args) {
        // create kafka consumer
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-first-consumer-group");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, args[0]);

        Consumer<String, String> consumer = new KafkaConsumer<>(properties);

        // subscribe to topic
        consumer.subscribe(Collections.singleton("test"));
        consumer.poll(0);

        //get to specific offset and get specified number of messages
        for (TopicPartition partition : consumer.assignment())
            consumer.seek(partition, args[1]);

        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(5000));

        System.out.println("Total Record Count ******* : " + records.count());      

        for (ConsumerRecord<String, String> record : records) {
            System.out.println("Message: " + record.value());
            System.out.println("Message offset: " + record.offset());           
            System.out.println("Message: " + record.timestamp());
            Date date = new Date(record.timestamp());
            Format format = new SimpleDateFormat("yyyy MM dd HH:mm:ss.SSS");
            System.out.println("Message date: " + format.format(date));
        }
        consumer.commitSync(); 

由于我的消费者将在中随心所欲地思考如何实现这一目标。如果我在启动时插入了application.properties,那么我应该在哪里指定这些属性,但是如何在运行时控制MAX_POLL_RECORDS_CONFIG。任何帮助都很感激。

EN

回答 2

Stack Overflow用户

发布于 2020-08-21 03:33:10

MAX_POLL_RECORDS_CONFIG只会影响您的卡夫卡客户端将记录返回到您的春季服务,它将永远不会减少来自kafka-server的用户投票的字节。

看上面的图片,无论你的起始偏移量= 150还是190,卡夫卡服务器都会从(offset=110,offset=190)返回整个数据,kafka服务器甚至不知道有多少记录返回给消费者,他只知道字节大小=(220-110)。

所以我认为你可以自己控制这个记录号码,目前它是由kafka客户端jar控制的,它们都占用了你的jvm本地内存。

票数 0
EN

Stack Overflow用户

发布于 2020-08-23 15:29:52

您的问题的答案是here,使用代码示例的答案是this answer。这两部作品都是由优秀的加里·罗素( Gary )写的,他是“卡夫卡之春”的主要作者或主要人物之一。

TL;DR:

如果您想在运行时任意回滚分区,请让侦听器实现ConsumerSeekAware并获取对ConsumerSeekCallback.

的引用。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63513904

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档