首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spring Selecting @TopicPartition在一个组中禁用消费

Spring Selecting @TopicPartition在一个组中禁用消费
EN

Stack Overflow用户
提问于 2020-03-04 23:04:58
回答 1查看 2.8K关注 0票数 2

我有两个消费者组-- firstsecond --使用SpringBootVersion2.1.8订阅my-topic主题。我有两个服务使用每个单独的消费群体。

使用最小设置,我定义了一个侦听器:

代码语言:javascript
复制
@KafkaListener(topics = "my-topic")
public void consume(@Payload String message) {
    logger.info(String.format("#### -> Consumed message -> %s", message));
}

使用kafka-consumer-groups的清单显示以下内容:

代码语言:javascript
复制
root@8d49c1b2c3bf:/# kafka-consumer-groups --bootstrap-server localhost:9092 --describe --all-groups

GROUP TOPIC  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
first my-topic  2          0               0               0         consumer-2-42b222ac-998a-4629-8358-1846c85f37e7 /172.18.0.9     consumer-2
first my-topic  0          0               0               0         consumer-2-42b222ac-998a-4629-8358-1846c85f37e7 /172.18.0.9     consumer-2
first my-topic  1          0               0               0         consumer-2-42b222ac-998a-4629-8358-1846c85f37e7 /172.18.0.9     consumer-2

GROUP TOPIC  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
second my-topic  0          0               0               0        consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8     consumer-2
second my-topic  1          0               0               0        consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8     consumer-2
second my-topic  2          0               0               0        consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8     consumer-2

我希望first组中的使用者只使用0和2个分区,所以我尝试通过侦听器实现它:

代码语言:javascript
复制
@KafkaListener(topicPartitions = {
    @TopicPartition(
        topic = "my-topic",
        partitions = { "0", "2" }
    )}
)
public void consume(@Payload String message) {
    logger.info(String.format("#### -> Consumed message -> %s", message));
}

为什么使用相同命令的清单不显示与上面相同的内容,除了first组的第1部分?此外,CONSUMER-ID值和进一步的值为空,@KafkaListenerfirst组接收没有消息( second组中的侦听器工作方式相同)。怎么修呢?

代码语言:javascript
复制
Consumer group 'first' has no active members.

GROUP TOPIC  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
first my-topic  2          0               0               0               -               -               -
first my-topic  0          0               0               0               -               -               -

GROUP  TOPIC  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
second my-topic  0          0               0               0        consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8     consumer-2
second my-topic  1          0               0               0        consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8     consumer-2
second my-topic  2          0               0               0        consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8     consumer-2
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-03-05 02:04:49

手动分配所有分区,因为您要将使用者线程分配到特定的分区,所以Kafka将使用assign()方法,而不会使用组协调。每个消费者的行为都是独立的,即使它与另一个消费者共享一个groupId。

假设您希望始终读取所有分区的所有记录(例如,当使用压缩主题加载分布式缓存时),手动分配分区而不使用Kafka的组管理可能很有用。

但是,检查消费者位置的命令只会显示使用组协调的消费者的位置。

检查消费者地位

有时候看到你的消费者的立场是有用的。我们有一个工具,可以显示一个消费者组中所有消费者的位置,以及他们在日志结束后有多远是。若要在名为“我的组”的使用者组上运行此工具,请使用名为“我的主题”的主题。

KafkaConsumer

手动分区分配不使用组协调,因此使用者失败不会导致分配的分区被重新平衡。即使每个使用者与另一个使用者共享一个groupId,也是独立的。为了避免偏移提交冲突,通常应该确保groupId对于每个使用者实例都是唯一的。

侦听器不使用消息的最后一个原因是偏移量,默认情况下偏移量被设置为最新的,您需要指定偏移位置,特别是如下所示

代码语言:javascript
复制
@KafkaListener(id = "thing2", topicPartitions =
    { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
      @TopicPartition(topic = "topic2", partitions = "0",
         partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
    })
 public void listen(ConsumerRecord<?, ?> record) {
      ...
 }

有关偏移的更多信息

第一个构造函数接受一个TopicPartitionOffset参数数组,显式地指示容器使用哪些分区(使用使用者分配()方法),并使用一个可选的初始偏移量。在默认情况下,正值是绝对偏移量。在默认情况下,负值相对于分区内当前的最后偏移量。为TopicPartitionOffset提供了一个构造函数,该构造函数采用额外的布尔参数。如果这是正确的,则初始偏移量(正或负)相对于该消费者的当前位置。当容器启动时应用偏移量。

注意:,您可以在分区或partitionOffsets属性中指定每个分区,但不能同时指定两个分区。

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60536000

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档