首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >当我为一个长期运行的处理添加一个新的使用者时,我如何正确地暂停/恢复考虑到发生的再平衡?

当我为一个长期运行的处理添加一个新的使用者时,我如何正确地暂停/恢复考虑到发生的再平衡?
EN

Stack Overflow用户
提问于 2020-08-11 22:22:03
回答 1查看 1.9K关注 0票数 0

我使用Kafka 2.3.1 (服务器)和spring 2.2.2.14以及kafka-clients 2.0.1。

我有一个有10个分区的主题。

生产者

一个生产者将消息发布到Kafka集群,根据密钥在分区之间分发消息,因为消息的顺序很重要。

代码语言:javascript
复制
acks = all

Consumer

同一个使用者的一些实例(从2到5,在k8s中进行水平缩放),每个实例只有一个线程,拉出这些消息并执行处理。

消息处理时间的变化是不可预测的。

为了避免频繁地对组进行再平衡,我将消息处理移至另一个线程。当处理器仍在工作时,消费者继续呼叫民意测验。

代码语言:javascript
复制
auto.offset.reset = earliest
enable.auto.commit = false
max.poll.interval.ms = 300000
max.poll.records = 1
AckMode.MANUAL_IMMEDIATE    
syncCommits = true

执行示例

10个分区分配给使用者-0 (0至4)和使用者-1 (5至9)。在处理过程中添加了新的使用者,每次添加都会导致重新平衡。我无法恢复以前暂停的分区。

信息-1被消耗了两次。由于处理时间比max.poll.interval.ms长,所以重新平衡重新排列分区。当提交被触发时,偏移分区可能与另一个使用者相关联。这个承诺生效了。重新平衡可能正在恢复所有暂停的分区。与此分区相关联的新使用者能够再次提取消息,因为它没有暂停。

代码语言:javascript
复制
consumer-1 [partition-5 offset-12] Received Message [message-1]
consumer-1 [partition-5 offset-12] Paused Partitions [partition-5, partition-8, partition-9, partition-6, partition-7]
consumer-2 Started
consumer-2 [partition-3 offset-80] Received Message [message-2]
consumer-2 [partition-3 offset-80] Paused Partitions [partition-2, partition-3, partition-0, partition-1]
consumer-3 Started
consumer-3 [partition-7 offset-43] Received Message [message-3]
consumer-3 [partition-7 offset-43] Paused Partitions [partition-6, partition-7]
consumer-4 Started
consumer-4 [partition-5 offset-12] Received Message [message-1]
consumer-4 [partition-5 offset-12] Paused Partitions [partition-4, partition-5]
consumer-1 [partition-5 offset-12] Executed Task 
consumer-1 [partition-5 offset-12] Committed [message-1]
consumer-1 [partition-5 offset-12] Resumed Partitions []
consumer-2 [partition-3 offset-80] Executed Task
consumer-2 [partition-3 offset-80] Committed [message-2]
consumer-2 [partition-3 offset-80] Resumed Partitions []
consumer-3 [partition-7 offset-43] Executed Task
consumer-3 [partition-7 offset-43] Committed [message-3]
consumer-3 [partition-7 offset-43] Resumed Partitions []
consumer-4 [partition-5 offset-12] Executed Task
consumer-4 [partition-5 offset-12] Committed [message-1]
consumer-4 [partition-5 offset-12] Resumed Partitions [partition-4, partition-5]

Groovy中的使用者实现

代码语言:javascript
复制
    @KafkaListener(topics = '${topic.files}', 
                   containerFactory = "fileKafkaListenerContainerFactory")
    void receive(@Payload FileEvent event,
                 @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                 @Header(KafkaHeaders.OFFSET) Long offset,
                 Acknowledgment ack,
                 Consumer consumer) {
        LOGGER.info("[partition-{} offset-{}] Received Message [{}]", partition, offset, event)
        try {
            LOGGER.debug("[partition-{} offset-{}] Paused Partitions [{}]", partition, offset, consumer.assignment())
            consumer.pause(consumer.assignment())
            ListenableFuture<Void> future = applicationTaskExecutor.submitListenable(callable(event))
            future.addCallback({ }, { ex -> throw ex }  )
            while (!future.isDone()) {
                Thread.sleep(500)
                consumer.poll(Duration.ofMillis(3_000))
            }
            future.get()
            ack.acknowledge()
            LOGGER.debug("[partition-{} offset-{}] Committed [{}]", partition, offset, event)
        } catch (Exception cause) {
            String message = String.format("Fail to consume partition=%s offset=%s %s", partition, offset, event)
            throw new RuntimeException(message, cause)
        } finally {
            LOGGER.debug("[partition-{} offset-{}] Resumed Partitions [{}]", partition, offset, consumer.paused())
            consumer.resume(consumer.paused())
        }
    }
    
    Callable<Void> callable(FileEvent event) {
        { ->
            indexService.index(event)
        }
    }

当我为一个长期运行的处理添加一个新的使用者时,我如何正确地暂停/恢复考虑到发生的再平衡?

EN

回答 1

Stack Overflow用户

发布于 2020-08-12 13:40:05

不要对使用者本身调用pause() / resume();而是暂停侦听器容器(并将max.poll.records设置为1)。

容器具有在再平衡后重新暂停的逻辑。

代码语言:javascript
复制
if (ListenerConsumer.this.consumerPaused) {
    ListenerConsumer.this.consumer.pause(partitions);
    ListenerConsumer.this.logger.warn("Paused consumer resumed by Kafka due to rebalance; "
            + "consumer paused again, so the initial poll() will never return any records");
}

如果您真的想自己做这一切,请将max.poll.records设置为1,并使用重新平衡侦听器重新暂停使用者。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63367165

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档