安装程序:有4个分区的一个主题(测试主题)
场景1:当使用一个使用者组和一个使用者时,系统需要60秒来完成100条消息的处理。
场景2:当使用一个包含4个使用者的使用者组时,系统需要大约15秒来处理相同的100条消息。
场景3:当使用4个具有相同组id和每个组一个使用者的使用者组时,系统需要60秒的时间来处理。
我的假设是在场景3中,处理所有消息所需的时间大约为15秒,但事实并非如此。我用日志验证了分区是分布的,所有分区都收到25条消息。有谁能帮我理解一下,我在这里做错了什么,增加消费者群体是不可能的?这种行为正常吗(我不这么认为)
注意:
消费者和生产者设置是默认的,消息传递是"exactly-once".
Psuedo代码:
Observable<KafkaConsumerRecord<String, String>> observable = consumer.toObservable();
consumer.subscribe(topics);
observable
.flatMapCompletable(record -> producer.rxBeginTransaction()
.andThen(producer.rxSend(producerRecord))
.flatMapCompletable(recordMetadata -> Completable.fromAction(() -> {
Map<TopicPartition, OffsetAndMetadata> consumerOffsetsMap = new HashMap<>();
consumerOffsetsMap.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset()));
producer.getDelegate().unwrap().sendOffsetsToTransaction(consumerOffsetsMap, "grp1");
}))
.andThen(producer.rxCommitTransaction())
.andThen(consumer.rxCommit())
).subscribe(() -> {
System.out.println("Processed successfully");
});发布于 2022-01-24 14:52:20
一个组与多个独立组具有(大约)相同的处理时间。
有了一个完美平衡的主题,以及即时消费者组的再平衡,一个具有分布式消费者的组将占用一定的时间,正如您所发现的-
(time of one consumer reading all partitions / min(number of partitions, number of group members)在您的例子中,60 / min(4,4) = 15
场景3:当4个消费者组具有相同的组id时
这根本不可能。我猜你指的是不同的组号。这将是场景3与场景1占用相同时间的唯一原因,因为您要重复场景1四次。运行4个组所需的时间相同,这意味着它具有一定的规模(水平)。减少的时间将意味着组内的垂直缩放,您在场景2中看到了这一点。
在Kubernetes上运行的
应用程序。只需更改副本以增加实例的数量
这本身并不能创造出独特的消费者群体。您需要在每个应用程序中注入不同的group.id值来创建新的消费群体。
https://stackoverflow.com/questions/70809726
复制相似问题