首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spring @StreamListener消费者未在消费者组中注册使用者ID、主机和客户端ID

Spring @StreamListener消费者未在消费者组中注册使用者ID、主机和客户端ID
EN

Stack Overflow用户
提问于 2019-09-03 11:51:56
回答 1查看 497关注 0票数 0

我们有一个春季云消费者从一个卡夫卡主题阅读信息。下面是通道的接口

代码语言:javascript
复制
    @Component
    public interface CollectionStreams {


       String INPUT_REPORT = "report-in";
       String OUTPUT_REPORT = "report-out";

       @Input(INPUT_REPORT)
       SubscribableChannel inboundReport();

       @Output(OUTPUT_REPORT)
       MessageChannel outboundReportToJM();
}

我们面临的问题是,当在消费者组“报告”中列出时,我们无法像预期的那样看到使用者ID、主机和客户ID。

代码语言:javascript
复制
[root@innolx131112 templates]# kubectl -n tmo-ccm exec kafka-test-client -- /usr/bin/kafka-consumer-groups --bootstrap-server kafka:9092 --describe -group report
Note: This will not show information about old Zookeeper-based consumers.
Consumer group 'report' has no active members.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
report          3          2               3               1               -               -               -
report          1          4               4               0               -               -               -
report          2          1               1               0               -               -               -
report          4          2               2               0               -               -               -
report          0          2               2               0               -               -               -

顺便说一下,我们正在运行我们的应用程序以及卡夫卡在库伯奈特。

由于这个问题,我们不能将我们的应用程序的多个POD作为所有的荚。

下面是通道的接口

代码语言:javascript
复制
@Component
public interface CollectionStreams {


   String INPUT_REPORT = "report-in";
   String OUTPUT_REPORT = "report-out";

   @Input(INPUT_REPORT)
   SubscribableChannel inboundReport();

   @Output(OUTPUT_REPORT)
   MessageChannel outboundReportToJM();
}

并且我们定义了如下方法来从主题读取消息。

代码语言:javascript
复制
@StreamListener(CollectionStreams.INPUT_REPORT)
    //public void handleMessage(@Payload MessageT message) {
    public void handleMessage(@Payload MessageT message, @Headers MessageHeaders msg) {

以下是配置yaml

**

代码语言:javascript
复制
cloud:
        stream:
          kafka:
            binder:
              brokers: kafka
              autoCreateTopics: false
            bindings:
              report-in:
                consumer:
                  autoCommitOffset: false
                  autoCommit: false
                  auto-offset-reset: earliest
                  autoCommitOnError: false
                  resetOffsets: false
                  autoRebalanceEnabled: false
                  ackEachRecord: false
          bindings:
            report-in:
              destination: report
              contentType: application/json
              group: report
              consumer:
                concurrency: 5
                partitioned: true
            report-out:
              destination: jobmanager
              contentType: application/json
              group: jobmanager
              producer:
                autoAddPartitions: true

**

我们也有另一个消费者,我们没有为其设置任何卡夫卡相关的消费道具,令人惊讶的是,这些消费者正在正确注册自己。

配置:

代码语言:javascript
复制
cloud:
    stream:
      kafka:
        binder:
          autoCreateTopics: false
          brokers: kafka
      bindings:
        parse-in:
          destination: parser
          contentType: application/json
          group: parser
          consumer:
            concurrency: 3
            partitioned: true
        parse-out:
          destination: jobmanager
          contentType: application/json
          group: jobmanager
          producer:
            partitionKeyExpression: headers['contentType']
            autoAddPartitions: true

并将命令输出描述如下

代码语言:javascript
复制
[root@innolx131112 shyama]# kubectl -n tmo-ccm exec kafka-test-client -- /usr/bin/kafka-consumer-groups --bootstrap-server kafka:9092 --describe -group parser
Note: This will not show information about old Zookeeper-based consumers.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
parser          0          14              14              0               consumer-3-cb99e45e-21b3-4efb-ac7b-4f642a9486be /192.168.0.26   consumer-3
parser          1          13              13              0               consumer-3-cb99e45e-21b3-4efb-ac7b-4f642a9486be /192.168.0.26   consumer-3
parser          2          15              15              0               consumer-4-bec1e4af-d771-47fe-83ad-3440f3a6d4bd /192.168.0.26   consumer-4
parser          3          15              15              0               consumer-4-bec1e4af-d771-47fe-83ad-3440f3a6d4bd /192.168.0.26   consumer-4
parser          4          12              12              0               consumer-5-b9ac7e36-58cf-40cb-b37d-a0fa092a0d56 /192.168.0.26   consumer-5

是因为我们提供卡夫卡相关的道具给第一消费者(报告消费者),它不能注册?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-09-03 18:35:57

Consumer group 'report' has no active members.

这仅仅意味着当您输入命令时,应用程序没有运行。

该信息是短暂的,而不是保留当应用程序停止。

下一次可能会将分区分配给不同的实例。

编辑

每当我们使用相同的组实例化更多的消费者时,它们处理的是老消费者已经处理的旧的

仔细观察你的配置,这正是你想要的.

代码语言:javascript
复制
autoRebalanceEnabled: false

..。这意味着Kafka将不使用组管理,分区将由Spring流分配。

代码语言:javascript
复制
autoCommitOffset: false

这意味着Spring不会提交任何补偿,这是应用程序的责任。如果不提交偏移量,就会得到所观察到的行为。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57771206

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档