我使用Kafka 2.3.1 (服务器)和spring 2.2.2.14以及kafka-clients 2.0.1。
我有一个有10个分区的主题。
生产者
一个生产者将消息发布到Kafka集群,根据密钥在分区之间分发消息,因为消息的顺序很重要。
acks = allConsumer
同一个使用者的一些实例(从2到5,在k8s中进行水平缩放),每个实例只有一个线程,拉出这些消息并执行处理。
消息处理时间的变化是不可预测的。
为了避免频繁地对组进行再平衡,我将消息处理移至另一个线程。当处理器仍在工作时,消费者继续呼叫民意测验。
auto.offset.reset = earliest
enable.auto.commit = false
max.poll.interval.ms = 300000
max.poll.records = 1
AckMode.MANUAL_IMMEDIATE
syncCommits = true执行示例
10个分区分配给使用者-0 (0至4)和使用者-1 (5至9)。在处理过程中添加了新的使用者,每次添加都会导致重新平衡。我无法恢复以前暂停的分区。
信息-1被消耗了两次。由于处理时间比max.poll.interval.ms长,所以重新平衡重新排列分区。当提交被触发时,偏移分区可能与另一个使用者相关联。这个承诺生效了。重新平衡可能正在恢复所有暂停的分区。与此分区相关联的新使用者能够再次提取消息,因为它没有暂停。
consumer-1 [partition-5 offset-12] Received Message [message-1]
consumer-1 [partition-5 offset-12] Paused Partitions [partition-5, partition-8, partition-9, partition-6, partition-7]
consumer-2 Started
consumer-2 [partition-3 offset-80] Received Message [message-2]
consumer-2 [partition-3 offset-80] Paused Partitions [partition-2, partition-3, partition-0, partition-1]
consumer-3 Started
consumer-3 [partition-7 offset-43] Received Message [message-3]
consumer-3 [partition-7 offset-43] Paused Partitions [partition-6, partition-7]
consumer-4 Started
consumer-4 [partition-5 offset-12] Received Message [message-1]
consumer-4 [partition-5 offset-12] Paused Partitions [partition-4, partition-5]
consumer-1 [partition-5 offset-12] Executed Task
consumer-1 [partition-5 offset-12] Committed [message-1]
consumer-1 [partition-5 offset-12] Resumed Partitions []
consumer-2 [partition-3 offset-80] Executed Task
consumer-2 [partition-3 offset-80] Committed [message-2]
consumer-2 [partition-3 offset-80] Resumed Partitions []
consumer-3 [partition-7 offset-43] Executed Task
consumer-3 [partition-7 offset-43] Committed [message-3]
consumer-3 [partition-7 offset-43] Resumed Partitions []
consumer-4 [partition-5 offset-12] Executed Task
consumer-4 [partition-5 offset-12] Committed [message-1]
consumer-4 [partition-5 offset-12] Resumed Partitions [partition-4, partition-5]Groovy中的使用者实现
@KafkaListener(topics = '${topic.files}',
containerFactory = "fileKafkaListenerContainerFactory")
void receive(@Payload FileEvent event,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) Long offset,
Acknowledgment ack,
Consumer consumer) {
LOGGER.info("[partition-{} offset-{}] Received Message [{}]", partition, offset, event)
try {
LOGGER.debug("[partition-{} offset-{}] Paused Partitions [{}]", partition, offset, consumer.assignment())
consumer.pause(consumer.assignment())
ListenableFuture<Void> future = applicationTaskExecutor.submitListenable(callable(event))
future.addCallback({ }, { ex -> throw ex } )
while (!future.isDone()) {
Thread.sleep(500)
consumer.poll(Duration.ofMillis(3_000))
}
future.get()
ack.acknowledge()
LOGGER.debug("[partition-{} offset-{}] Committed [{}]", partition, offset, event)
} catch (Exception cause) {
String message = String.format("Fail to consume partition=%s offset=%s %s", partition, offset, event)
throw new RuntimeException(message, cause)
} finally {
LOGGER.debug("[partition-{} offset-{}] Resumed Partitions [{}]", partition, offset, consumer.paused())
consumer.resume(consumer.paused())
}
}
Callable<Void> callable(FileEvent event) {
{ ->
indexService.index(event)
}
}当我为一个长期运行的处理添加一个新的使用者时,我如何正确地暂停/恢复考虑到发生的再平衡?
发布于 2020-08-12 13:40:05
不要对使用者本身调用pause() / resume();而是暂停侦听器容器(并将max.poll.records设置为1)。
容器具有在再平衡后重新暂停的逻辑。
if (ListenerConsumer.this.consumerPaused) {
ListenerConsumer.this.consumer.pause(partitions);
ListenerConsumer.this.logger.warn("Paused consumer resumed by Kafka due to rebalance; "
+ "consumer paused again, so the initial poll() will never return any records");
}如果您真的想自己做这一切,请将max.poll.records设置为1,并使用重新平衡侦听器重新暂停使用者。
https://stackoverflow.com/questions/63367165
复制相似问题