首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Apache Kafka消费者组和简单消费者

Apache Kafka消费者组和简单消费者
EN

Stack Overflow用户
提问于 2013-08-01 03:54:41
回答 2查看 5.8K关注 0票数 7

我刚接触Kafka,到目前为止,我对消费者的理解是,基本上有两种类型的实现。

1) The High level consumer/consumer group

2) Simple Consumer

高级抽象最重要的部分是当Kafka不关心处理偏移量时使用它,而简单的使用者对偏移量管理提供了更好的控制。让我困惑的是,如果我想在多线程环境中运行消费者,又想控制我使用的消费者组,这是否意味着我必须从存储在zookeeper中的最后一个偏移量开始读取?这是我唯一的选择吗。

EN

回答 2

Stack Overflow用户

发布于 2013-08-02 18:46:32

在大多数情况下,高级消费者API不允许您直接控制偏移量。

当第一次创建消费者组时,您可以使用auto.offset.reset属性告诉它是从kafka存储的最旧的消息开始还是从最新的消息开始。

您还可以通过将auto.commit.enable设置为false来控制高级使用者何时向zookeeper提交新的偏移量。

由于高级消费者将偏移量存储在zookeeper中,因此您的应用程序可以直接访问zookeeper并操作偏移量-但它将在高级消费者API之外。

你的问题有点令人困惑,但你可以在多线程环境中使用简单的消费者。这就是高级消费者要做的事情。

票数 8
EN

Stack Overflow用户

发布于 2016-08-14 14:10:48

在Apache Kafka 0.9和0.10中,使用者组管理完全由一个代理(用于协调)和一个主题(用于状态存储)在Kafka应用程序中处理。

当一个消费者组第一次订阅一个主题时,auto.offset.reset的设置决定了消费者从哪里开始消费消息(http://kafka.apache.org/documentation.html#newconsumerconfigs)

您可以注册一个ConsumerRebalanceListener,以便在为特定使用者分配了主题/分区时接收通知。

一旦消费者开始运行,您就可以使用seekseekToBeginningseekToEnd从特定的偏移量获取消息。seek会影响该使用者的下一个poll,并在下一次提交时存储(例如,commitSynccommitAsync或auto.commit.interval过期时,如果已启用)。

使用者javadoc提到了更具体的情况:http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

您可以将Kafka提供的组管理与通过seek(..)手动管理偏移量相结合。一旦分配了分区。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/17979692

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档