首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >从Kafka主题获取多条消息

从Kafka主题获取多条消息
EN

Stack Overflow用户
提问于 2022-06-16 00:22:19
回答 2查看 1.3K关注 0票数 0

我的用例就像从生产者端--它将把一行数据(约100字节)作为一条消息发送给kafka主题,从消费者端--我想一次使用5条消息,并将其交给我的消费者逻辑。

代码语言:javascript
复制
@KafkaListener(id = "listener-batch", topics = "test", containerFactory = "concurrentKafkaListenerContainerFactory")
public void receive(@Payload List<String> messages,
                    @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                    @Header(KafkaHeaders.OFFSET) List<Long> offsets) {

    System.out.println("- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -");
    System.out.println("Starting the process to recieve batch messages :: " + messages);
    for (int i = 0; i < messages.size(); i++) {
        System.out.println("received message= "+ messages.get(i) +" with partition-offset= " + partitions.get(i) + "-" + offsets.get(i));
    }
    System.out.println("all the batch messages are consumed");
}

我做了一个示例,它总是收到一条消息并在控制台中打印。请建议我的任何配置改变,以实现这一目标。

请查找下面的源代码。

代码语言:javascript
复制
@EnableKafka
@Configuration
public class KafkaConfig {

@Bean
public ConsumerFactory<String, String> consumerFactory(){
    Map<String, Object> config = new HashMap<>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    config.put(ConsumerConfig.GROUP_ID_CONFIG, "batch");
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
    config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "5");
    return new DefaultKafkaConsumerFactory<>(config);
}

@Bean
public ConcurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);
    return factory;
}
}

使用以下命令启动生成器

./kafka-生产者-perf-测试-num-记录500 -主题测试-吞吐量10 -有效载荷文件test.csv -生产者-道具bootstrap.servers=localhost:9092 bootstrap.servers=localhost:9092

test.csv文件内容

代码语言:javascript
复制
Batch-1 message
Batch-2 message
Batch-3 message
Batch-4 message
Batch-5 message
Batch-6 message
Batch-7 message
Batch-8 message
Batch-9 message
Batch-10 message
Batch-11 message

输出如下所示。

代码语言:javascript
复制
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
Starting the process to recieve batch messages :: [Batch-3 message]
received message= Batch-3 message with partition-offset= 0-839501
all the batch messages are consumed
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
Starting the process to recieve batch messages :: [Batch-7 message]
received message= Batch-7 message with partition-offset= 0-839502
all the batch messages are consumed
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
Starting the process to recieve batch messages :: [Batch-3 message]
received message= Batch-3 message with partition-offset= 0-839503
all the batch messages are consumed
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
Starting the process to recieve batch messages :: [Batch-1 message]
received message= Batch-1 message with partition-offset= 0-839504
all the batch messages are consumed
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

提前谢谢。

EN

回答 2

Stack Overflow用户

发布于 2022-06-16 01:22:55

您应该配置一个批处理侦听器,然后可以设置max.poll.records属性来指定批处理大小。

请注意,将此值设置得太低可能会降低总体性能,因为您需要对代理进行更多的轮询才能获取相同数量的记录。

票数 0
EN

Stack Overflow用户

发布于 2022-06-16 03:19:35

这里所提供的要求是很高的。如果您能够从业务逻辑实现的角度告诉我们您的实际需求,那就太好了。您的低级别编码和其他配置参数可以根据您的需求进行微调。

为了给您一个建议,如果您想一个接一个地解决消息(5),那么您可以一次通过max.poll.records =5轮询这5条记录,然后遍历使用者记录。这相当简单。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72639154

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档