我的用例就像从生产者端--它将把一行数据(约100字节)作为一条消息发送给kafka主题,从消费者端--我想一次使用5条消息,并将其交给我的消费者逻辑。
@KafkaListener(id = "listener-batch", topics = "test", containerFactory = "concurrentKafkaListenerContainerFactory")
public void receive(@Payload List<String> messages,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
System.out.println("- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -");
System.out.println("Starting the process to recieve batch messages :: " + messages);
for (int i = 0; i < messages.size(); i++) {
System.out.println("received message= "+ messages.get(i) +" with partition-offset= " + partitions.get(i) + "-" + offsets.get(i));
}
System.out.println("all the batch messages are consumed");
}我做了一个示例,它总是收到一条消息并在控制台中打印。请建议我的任何配置改变,以实现这一目标。
请查找下面的源代码。
@EnableKafka
@Configuration
public class KafkaConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory(){
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "batch");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "5");
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
return factory;
}
}使用以下命令启动生成器
./kafka-生产者-perf-测试-num-记录500 -主题测试-吞吐量10 -有效载荷文件test.csv -生产者-道具bootstrap.servers=localhost:9092 bootstrap.servers=localhost:9092
test.csv文件内容
Batch-1 message
Batch-2 message
Batch-3 message
Batch-4 message
Batch-5 message
Batch-6 message
Batch-7 message
Batch-8 message
Batch-9 message
Batch-10 message
Batch-11 message输出如下所示。
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
Starting the process to recieve batch messages :: [Batch-3 message]
received message= Batch-3 message with partition-offset= 0-839501
all the batch messages are consumed
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
Starting the process to recieve batch messages :: [Batch-7 message]
received message= Batch-7 message with partition-offset= 0-839502
all the batch messages are consumed
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
Starting the process to recieve batch messages :: [Batch-3 message]
received message= Batch-3 message with partition-offset= 0-839503
all the batch messages are consumed
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
Starting the process to recieve batch messages :: [Batch-1 message]
received message= Batch-1 message with partition-offset= 0-839504
all the batch messages are consumed
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -提前谢谢。
发布于 2022-06-16 01:22:55
您应该配置一个批处理侦听器,然后可以设置max.poll.records属性来指定批处理大小。
请注意,将此值设置得太低可能会降低总体性能,因为您需要对代理进行更多的轮询才能获取相同数量的记录。
发布于 2022-06-16 03:19:35
这里所提供的要求是很高的。如果您能够从业务逻辑实现的角度告诉我们您的实际需求,那就太好了。您的低级别编码和其他配置参数可以根据您的需求进行微调。
为了给您一个建议,如果您想一个接一个地解决消息(5),那么您可以一次通过max.poll.records =5轮询这5条记录,然后遍历使用者记录。这相当简单。
https://stackoverflow.com/questions/72639154
复制相似问题