首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用Simple Consumer读取Apache Kafka中未处理的消息

使用Simple Consumer读取Apache Kafka中未处理的消息
EN

Stack Overflow用户
提问于 2014-12-05 16:16:28
回答 1查看 3.2K关注 0票数 1

我已经厌倦了这个链接

https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

使用SimpleConsumer来消费消息,但在使用它时,我发现了一些突然的行为,如下所示:

使用者正在使用来自特定分区的消息。但问题是,当我的使用者运行时,我使用生产者将消息推送到主题,它使用该分区中的消息。但是,如果我的消费者目前没有运行,并且我将一些消息推送到主题,并再次启动消费者,它不会消费生产者推送的消息,但它再次准备消费现在将被推送的消息。我使用LatestTime()而不是EarliestTime(),因为我只想使用未处理的消息。

例如

案例1

消费者正在运行:

生产者将M1、M2、M3消息推送到主题1的分区1

结果:消费者将消费所有三条消息。

案例2

使用者未在运行

producer现在将m4,m5 m6消息推送到主题1的分区1

现在调用消费者

结果:消费者不消费消息m4、m5、m6,但如果我检查偏移量,则将其设置为7。这意味着生产者在生成消息时将偏移量提前到7,因此消费者现在将从偏移量7开始消费消息

理想情况下,当消费者再次出现时,请帮助它读取来自m4的消息。

EN

回答 1

Stack Overflow用户

发布于 2014-12-05 17:15:56

你做错了。

首先,我不确定SimpleConsumer就是你要找的。它迫使你自己管理偏移量(例如,它根本不会将偏移量提交给Zookeeper,每次你再次启动SimpleConsumer时,它都会再次获取相同的消息)。SimpleConsumer不理解“已处理的消息”。它能做的就是从某个偏移量开始抓取,然后继续抓取,直到你说“停止”。

无论如何,如果您打算自己提交已处理的偏移量,则应该使用EarliestTime (auto.offset.reset=smallest配置项)。auto.offset.reset意味着如果你的消费者被初始化为错误的偏移量(如果我没记错的话,SimpleConsumer被初始化为-1偏移量,这显然是错误的),它将重置为smallest可用(EarliestTime)或largest可用(LatestTime)偏移量。

为了让它更清楚,这里有一个例子:

你的Case-1

您创建了一个使用者,并将其指向主题1分区1。由于它最初是用错误的偏移量初始化的,因此它将向代理请求一些适当的偏移量(这里是smallestlargest偏移量重置的地方)。如果您尚未生成任何消息,则smallestlargest偏移量都将为0,因此当您生成一些消息时,您的使用者将获取这些消息。

Case-2

您生成N条消息(比如7条)。然后启动SimpleConsumer。同样,它使用错误的偏移量进行初始化,并要求代理提供适当的偏移量。对于smallest reset offset,它将是0,而对于largest offset,它将是7。与您的示例一样,您使用LargestOffsets,您的消费者将使用偏移量7重新初始化,并从它开始消费。

一般来说,看看高级消费者,在大多数情况下,这就是你要找的。这是link

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/27311537

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档