我正在尝试使用Spring cloud和绑定器在同一个spring启动应用程序中实现Kafka consumer和Kafka producer。如果单独执行,两者都可以成功运行,但如果一起执行,只有Kafka Producer可以成功连接到kafka集群,而Kafka Consumer无法登录到Kafka集群。我认为问题出在kafka生产者和消费者的多个/不同的jaas配置上。请在下面找到我的application.yml文件
spring:
cloud:
stream:
bindings:
input:
group: consumer-tt1
useNativeEncoding: true
destination: consumer-topic
content-type: application/json
binder: consumer
output:
destination: producer-topic
useNativeEncoding: true
content-type: application/*+avro
binder: producer
binders:
consumer:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
autoCreateTopics: false
autoAddPartitions: false
consumer-properties:
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
brokers: xxxxx.tt.com:9092
jaas:
loginModule: com.sun.security.auth.module.Krb5LoginModule
controlFlag: required
options:
useKeyTab: true
storeKey: true
keyTab: \src\main\keytab\XXXXXCON.keytab
principal: XXXXXCON@tt.com
doNotPrompt: true
refreshKrb5Config: true
configuration:
application: XXXXXCON
sasl:
kerberos:
realm: tt.com
kdc: tt.com
service:
name: kafka
jaas:
loginModule: com.sun.security.auth.module.Krb5LoginModule
controlFlag: required
config:
useKeyTab: true
storeKey: true
keyTab: \src\main\keytab\XXXXXCON.keytab
principal: XXXXXCON@tt.com
doNotPrompt: true
refreshKrb5Config: true
security:
protocol: SASL_SSL
ssl:
truststore:
location: \src\main\keytab\truststore.jks
password: 123456789
type: JKS
producer:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
autoCreateTopics: false
autoAddPartitions: false
producer-properties:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url: http:/kafka-schema:8484
brokers: xxxxx.tt.com:9092
jaas:
loginModule: com.sun.security.auth.module.Krb5LoginModule
controlFlag: required
options:
useKeyTab: true
storeKey: true
keyTab: \src\main\keytab\XXXXXPRO.keytab
principal: XXXXXPRO@tt.com
doNotPrompt: true
refreshKrb5Config: true
configuration:
application:
id: XXXXXPRO
sasl:
kerberos:
realm: tt.com
kdc: tt.com
service:
name: kafka
jaas:
loginModule: com.sun.security.auth.module.Krb5LoginModule
controlFlag: required
config:
useKeyTab: true
storeKey: true
keyTab: \src\main\keytab\XXXXXPRO.keytab
principal: XXXXXPRO@tt.com
doNotPrompt: true
refreshKrb5Config: true
security:
protocol: SASL_SSL
ssl:
truststore:
location: \src\main\keytab\truststore.jks
password: 123456789
type: JKS
schema-registry-client:
endpoint: http:/kafka-schema:8484如果我在主类中使用@EnableBinding(Source.class)或@EnableBinding(Sink.class)运行相同的application.yml,它会以kafka生产者或Kafka消费者的身份成功连接到Kafka集群。但是当我用@EnableBinding(Processor.class)运行同样的application.yml时,我在kafka的消费者( Kafka生产者工作正常并且连接到Kafka集群)中得到了错误。问题只存在于Kafka Consumer:未授权访问该主题。
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [consumer-topic]
2020-03-24 19:45:07.794 WARN 19000 --- [ main] o.s.c.s.b.k.p.KafkaTopicProvisioner : No partitions have been retrieved for the topic (consumer-topic). This will affect the health check.
2020-03-24 19:45:07.794 WARN 19000 --- [ main] o.s.c.s.b.k.p.KafkaTopicProvisioner : The number of expected partitions was: 1, but 0 has been found instead.There will be 1 idle consumers
2020-03-24 19:45:07.796 ERROR 19000 --- [ main] o.s.cloud.stream.binding.BindingService : Failed to create consumer binding; retrying in 30 seconds
org.springframework.cloud.stream.binder.BinderException: Exception thrown while starting consumer:
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:435) ~[spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:97) ~[spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.cloud.stream.binder.AbstractBinder.bindConsumer(AbstractBinder.java:142) ~[spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.cloud.stream.binding.BindingService.doBindConsumer(BindingService.java:144) [spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.cloud.stream.binding.BindingService.bindConsumer(BindingService.java:122) [spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.cloud.stream.binding.BindableProxyFactory.createAndBindInputs(BindableProxyFactory.java:254) [spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.cloud.stream.binding.InputBindingLifecycle.doStartWithBindable(InputBindingLifecycle.java:58) [spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) ~[na:1.8.0_162]
at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:48) ~[spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.cloud.stream.binding.InputBindingLifecycle.start(InputBindingLifecycle.java:34) [spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:893) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:162) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:552) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:744) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:391) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:312) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1204) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
at com.rbc.ess.ESSEventTransform.EssEventTransformApplication.main(EssEventTransformApplication.java:21) ~[classes/:na]
Caused by: java.lang.IllegalArgumentException: A list of partitions must be provided
at org.springframework.util.Assert.isTrue(Assert.java:118) ~[spring-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createConsumerEndpoint(KafkaMessageChannelBinder.java:446) ~[spring-cloud-stream-binder-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createConsumerEndpoint(KafkaMessageChannelBinder.java:133) ~[spring-cloud-stream-binder-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:382) ~[spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
... 24 common frames omitted请检查并说明如何在Spring cloud stream绑定器应用中传递多个jaas配置
发布于 2020-03-25 23:31:22
您的jaas配置不正确;应该是
jaas:
config: com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="\src\main\keytab\XXXXXCON.keytab" principal="XXXXXCON@tt.com" doNotPrompt=true refreshKrb5Config=true;参见the documentation。
当你只有一个活页夹时,我不能解释为什么它能工作。
此外,您的useNativeEncoding是不正确的;它应该是
...
input:
consumer:
useNativeDecoding: true
...
output:
producer:
useNativeEncoding: truehttps://stackoverflow.com/questions/60841127
复制相似问题