首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Kafka连接手动偏移/提交管理

Kafka连接手动偏移/提交管理
EN

Stack Overflow用户
提问于 2021-10-13 21:51:04
回答 1查看 581关注 0票数 0

我试图处理的偏移,提交我自己在一个自定义卡夫卡连接连接器,我正在工作。

我试过在连接器配置中配置这个- "consumer.enable.auto.commit":"false“。

此外,我在继承自SinkTask类的类中重写了SinkTask方法,因此它返回一个空映射,因为根据文档,手动管理偏移量是必需的(引用https://kafka.apache.org/11/javadoc/org/apache/kafka/connect/sink/SinkTask.html#preCommit-java.util.Map-)。

我也尝试用一个空的映射调用刷新方法。

但是,通过以上所有的尝试和这些尝试的排列,这些消息仍然被提交并且没有被重新处理(为了明确起见,我的目标是如果消息没有提交,它将在下一个轮询间隔中再次被消耗)。

在下一次投票中,我还能做什么使所使用的消息不再被提交和使用呢?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-10-18 14:09:13

这里的问题是我对卡夫卡的作品缺乏了解。基本上,偏移进程是在内存中跟踪的,而刷新/预提交只是为了获取内存中的内容并写入磁盘。

这意味着,只有在下一次重新启动使用者时(当它读取需要从磁盘开始的偏移量时),才会使用旧的偏移量。

SinkTaskContext中还有另一种叫做“偏移”的方法,它接受主题分区和偏移量的映射,并在内存中为使用者设置它。

在每次轮询之前,KafkaConnect运行时正在内存中获取跟踪的偏移量,如果需要,则根据前面提到的偏移量方法存储的映射调用下划线使用者查找方法。

反过来,这将使使用者在“偏移”方法中读取偏移量。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69562744

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档