我们有一个春季云消费者从一个卡夫卡主题阅读信息。下面是通道的接口
@Component
public interface CollectionStreams {
String INPUT_REPORT = "report-in";
String OUTPUT_REPORT = "report-out";
@Input(INPUT_REPORT)
SubscribableChannel inboundReport();
@Output(OUTPUT_REPORT)
MessageChannel outboundReportToJM();
}我们面临的问题是,当在消费者组“报告”中列出时,我们无法像预期的那样看到使用者ID、主机和客户ID。
[root@innolx131112 templates]# kubectl -n tmo-ccm exec kafka-test-client -- /usr/bin/kafka-consumer-groups --bootstrap-server kafka:9092 --describe -group report
Note: This will not show information about old Zookeeper-based consumers.
Consumer group 'report' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
report 3 2 3 1 - - -
report 1 4 4 0 - - -
report 2 1 1 0 - - -
report 4 2 2 0 - - -
report 0 2 2 0 - - -顺便说一下,我们正在运行我们的应用程序以及卡夫卡在库伯奈特。
由于这个问题,我们不能将我们的应用程序的多个POD作为所有的荚。
下面是通道的接口
@Component
public interface CollectionStreams {
String INPUT_REPORT = "report-in";
String OUTPUT_REPORT = "report-out";
@Input(INPUT_REPORT)
SubscribableChannel inboundReport();
@Output(OUTPUT_REPORT)
MessageChannel outboundReportToJM();
}并且我们定义了如下方法来从主题读取消息。
@StreamListener(CollectionStreams.INPUT_REPORT)
//public void handleMessage(@Payload MessageT message) {
public void handleMessage(@Payload MessageT message, @Headers MessageHeaders msg) {以下是配置yaml
**
cloud:
stream:
kafka:
binder:
brokers: kafka
autoCreateTopics: false
bindings:
report-in:
consumer:
autoCommitOffset: false
autoCommit: false
auto-offset-reset: earliest
autoCommitOnError: false
resetOffsets: false
autoRebalanceEnabled: false
ackEachRecord: false
bindings:
report-in:
destination: report
contentType: application/json
group: report
consumer:
concurrency: 5
partitioned: true
report-out:
destination: jobmanager
contentType: application/json
group: jobmanager
producer:
autoAddPartitions: true**
我们也有另一个消费者,我们没有为其设置任何卡夫卡相关的消费道具,令人惊讶的是,这些消费者正在正确注册自己。
配置:
cloud:
stream:
kafka:
binder:
autoCreateTopics: false
brokers: kafka
bindings:
parse-in:
destination: parser
contentType: application/json
group: parser
consumer:
concurrency: 3
partitioned: true
parse-out:
destination: jobmanager
contentType: application/json
group: jobmanager
producer:
partitionKeyExpression: headers['contentType']
autoAddPartitions: true并将命令输出描述如下
[root@innolx131112 shyama]# kubectl -n tmo-ccm exec kafka-test-client -- /usr/bin/kafka-consumer-groups --bootstrap-server kafka:9092 --describe -group parser
Note: This will not show information about old Zookeeper-based consumers.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
parser 0 14 14 0 consumer-3-cb99e45e-21b3-4efb-ac7b-4f642a9486be /192.168.0.26 consumer-3
parser 1 13 13 0 consumer-3-cb99e45e-21b3-4efb-ac7b-4f642a9486be /192.168.0.26 consumer-3
parser 2 15 15 0 consumer-4-bec1e4af-d771-47fe-83ad-3440f3a6d4bd /192.168.0.26 consumer-4
parser 3 15 15 0 consumer-4-bec1e4af-d771-47fe-83ad-3440f3a6d4bd /192.168.0.26 consumer-4
parser 4 12 12 0 consumer-5-b9ac7e36-58cf-40cb-b37d-a0fa092a0d56 /192.168.0.26 consumer-5是因为我们提供卡夫卡相关的道具给第一消费者(报告消费者),它不能注册?
发布于 2019-09-03 18:35:57
Consumer group 'report' has no active members.
这仅仅意味着当您输入命令时,应用程序没有运行。
该信息是短暂的,而不是保留当应用程序停止。
下一次可能会将分区分配给不同的实例。
编辑
每当我们使用相同的组实例化更多的消费者时,它们处理的是老消费者已经处理的旧的
仔细观察你的配置,这正是你想要的.
autoRebalanceEnabled: false..。这意味着Kafka将不使用组管理,分区将由Spring流分配。
autoCommitOffset: false这意味着Spring不会提交任何补偿,这是应用程序的责任。如果不提交偏移量,就会得到所观察到的行为。
https://stackoverflow.com/questions/57771206
复制相似问题