我刚接触Kafka,到目前为止,我对消费者的理解是,基本上有两种类型的实现。
1) The High level consumer/consumer group
高级抽象最重要的部分是当Kafka不关心处理偏移量时使用它,而简单的使用者对偏移量管理提供了更好的控制。让我困惑的是,如果我想在多线程环境中运行消费者,又想控制我使用的消费者组,这是否意味着我必须从存储在zookeeper中的最后一个偏移量开始读取?这是我唯一的选择吗。
发布于 2013-08-02 18:46:32
在大多数情况下,高级消费者API不允许您直接控制偏移量。
当第一次创建消费者组时,您可以使用auto.offset.reset属性告诉它是从kafka存储的最旧的消息开始还是从最新的消息开始。
您还可以通过将auto.commit.enable设置为false来控制高级使用者何时向zookeeper提交新的偏移量。
由于高级消费者将偏移量存储在zookeeper中,因此您的应用程序可以直接访问zookeeper并操作偏移量-但它将在高级消费者API之外。
你的问题有点令人困惑,但你可以在多线程环境中使用简单的消费者。这就是高级消费者要做的事情。
发布于 2016-08-14 14:10:48
在Apache Kafka 0.9和0.10中,使用者组管理完全由一个代理(用于协调)和一个主题(用于状态存储)在Kafka应用程序中处理。
当一个消费者组第一次订阅一个主题时,auto.offset.reset的设置决定了消费者从哪里开始消费消息(http://kafka.apache.org/documentation.html#newconsumerconfigs)
您可以注册一个ConsumerRebalanceListener,以便在为特定使用者分配了主题/分区时接收通知。
一旦消费者开始运行,您就可以使用seek、seekToBeginning和seekToEnd从特定的偏移量获取消息。seek会影响该使用者的下一个poll,并在下一次提交时存储(例如,commitSync、commitAsync或auto.commit.interval过期时,如果已启用)。
使用者javadoc提到了更具体的情况:http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
您可以将Kafka提供的组管理与通过seek(..)手动管理偏移量相结合。一旦分配了分区。
https://stackoverflow.com/questions/17979692
复制相似问题