首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用kafka-python消费kafka,seek()重置的本地偏移量会提交给kafka吗?

使用kafka-python消费kafka,seek()重置的本地偏移量会提交给kafka吗?
EN

Stack Overflow用户
提问于 2021-05-20 20:41:12
回答 1查看 35关注 0票数 0

使用kafka-python消费kafka,seek方法重置的本地偏移量会提交给kafka吗?我正在研究获取双中心机房Kafka集群的rpo索引的解决方案。使用kafka-python获取Kafka集群的最大时间戳,取两个机房的Kafka集群的最大时间戳之差。

使用seek()将偏移量重置为分区的最大偏移量-1,然后使用poll()获取最新的消息,但该消息无法在循环中获取,检查当前消费组的偏移量,发现堆叠的消息为0

代码语言:javascript
复制
#reset offset to (max_offset-1)
for tp,offset in offsets_dict.items():
            offset = offset - 1
            if (offset)<0:
                effective_partition = effective_partition-1
                continue
            consumer.seek(tp,offset)
            kafkaoffset = consumer.position(tp)
           
        if effective_partition==0:
            consumer.close()
            return max_timestamp

        try:
            Counter=0
            while(True):
                message = consumer.poll(max_records=1)
                if not message:
                    continue
                for partition, msgs in six.iteritems(message):
                    for msg in msgs:
                        max_timestamp = max(max_timestamp,int(msg.timestamp))
                        self.logger.debug(f"{max_timestamp}")
                Counter = Counter +1
                if Counter == effective_partition:
                    break
        except Exception as ex:
            raise ex
        finally:
            consumer.close()
            return max_timestamp
EN

回答 1

Stack Overflow用户

发布于 2021-05-21 06:15:39

如果您在查找后提交,则这将是组的新偏移量,是的。

enable_auto_commit缺省值为True,如果将其设置为False,则可以使用KafkaConsumer.commit(offsets)函数手动控制此行为

默认情况下,调用.close()也会执行提交,除非您真的只想在正常的线性使用者进程中查找和读取少数进程外的消息,否则您可能不需要执行该操作

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67620552

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档