使用kafka-python消费kafka,seek方法重置的本地偏移量会提交给kafka吗?我正在研究获取双中心机房Kafka集群的rpo索引的解决方案。使用kafka-python获取Kafka集群的最大时间戳,取两个机房的Kafka集群的最大时间戳之差。
使用seek()将偏移量重置为分区的最大偏移量-1,然后使用poll()获取最新的消息,但该消息无法在循环中获取,检查当前消费组的偏移量,发现堆叠的消息为0
#reset offset to (max_offset-1)
for tp,offset in offsets_dict.items():
offset = offset - 1
if (offset)<0:
effective_partition = effective_partition-1
continue
consumer.seek(tp,offset)
kafkaoffset = consumer.position(tp)
if effective_partition==0:
consumer.close()
return max_timestamp
try:
Counter=0
while(True):
message = consumer.poll(max_records=1)
if not message:
continue
for partition, msgs in six.iteritems(message):
for msg in msgs:
max_timestamp = max(max_timestamp,int(msg.timestamp))
self.logger.debug(f"{max_timestamp}")
Counter = Counter +1
if Counter == effective_partition:
break
except Exception as ex:
raise ex
finally:
consumer.close()
return max_timestamp发布于 2021-05-21 06:15:39
如果您在查找后提交,则这将是组的新偏移量,是的。
enable_auto_commit缺省值为True,如果将其设置为False,则可以使用KafkaConsumer.commit(offsets)函数手动控制此行为
默认情况下,调用.close()也会执行提交,除非您真的只想在正常的线性使用者进程中查找和读取少数进程外的消息,否则您可能不需要执行该操作
https://stackoverflow.com/questions/67620552
复制相似问题