首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >spring云流绑定器kafka无法创建带有ssl配置的多个kafka绑定程序。

spring云流绑定器kafka无法创建带有ssl配置的多个kafka绑定程序。
EN

Stack Overflow用户
提问于 2020-07-31 03:13:01
回答 1查看 2.3K关注 0票数 1

我正试图通过具有jaas的SASL_SSL协议连接到一个kafka集群,如下所示:

代码语言:javascript
复制
spring:
  cloud:
    stream:
      bindings:
        binding-1:
          binder: kafka-1-with-ssl
          destination: <destination-1>
          content-type: text/plain
          group: <group-id-1>
          consumer:
            header-mode: headers
        binding-2:
          binder: kafka-2-with-ssl
          destination: <destination-2>
          content-type: text/plain
          group: <group-id-2>
          consumer:
            header-mode: headers
            

      binders:
        kafka-1-with-ssl:
          type: kafka
          defaultCandidate: false
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder: 
                      brokers: <broker-hostnames-1>
                      configuration:
                        ssl:
                          truststore:
                            location: <location-1>
                            password: <ts-password-1>
                            type: JKS
                      jaas:
                        loginModule: org.apache.kafka.common.security.scram.ScramLoginModule
                        options:
                          username: <username-1>
                          password: <password-1>

        kafka-2-with-ssl:
          type: kafka
          defaultCandidate: false
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder: 
                      brokers: <broker-hostnames-2>
                      configuration:
                        ssl:
                          truststore:
                            location: <location-2>
                            password: <ts-password-2>
                            type: JKS
                      jaas:
                        loginModule: org.apache.kafka.common.security.scram.ScramLoginModule
                        options:
                          username: <username-2>
                          password: <password-2>
      kafka:
        binder:
          configuration:
            security:
              protocol: SASL_SSL
            sasl:
              mechanism: SCRAM-SHA-256 

上面的配置与在样例配置上可用的春云流的官方git回购是一致的。

图书馆的git回购也提出了类似的问题说它在最新版本中是固定的,但看起来并非如此。获取以下错误:

springBootVersion: 2.2.8和spring云流依赖版本- Horsham.SR6。

代码语言:javascript
复制
Failed to create consumer binding; retrying in 30 seconds | org.springframework.cloud.stream.binder.BinderException: Exception thrown while starting consumer: 
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:461)
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:90)
    at org.springframework.cloud.stream.binder.AbstractBinder.bindConsumer(AbstractBinder.java:143)
    at org.springframework.cloud.stream.binding.BindingService.lambda$rescheduleConsumerBinding$1(BindingService.java:201)
    at org.springframework.cloud.sleuth.instrument.async.TraceRunnable.run(TraceRunnable.java:68)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient
    at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:407)
    at org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:65)
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createAdminClient(KafkaTopicProvisioner.java:246)
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.doProvisionConsumerDestination(KafkaTopicProvisioner.java:216)
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionConsumerDestination(KafkaTopicProvisioner.java:183)
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionConsumerDestination(KafkaTopicProvisioner.java:79)
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:402)
    ... 12 common frames omitted
Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: KrbException: Cannot locate default realm
    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:160)
    at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146)
    at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
    at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99)
    at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:382)
    ... 18 common frames omitted
Caused by: javax.security.auth.login.LoginException: KrbException: Cannot locate default realm
    at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:804)
    at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:617)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at javax.security.auth.login.LoginContext.invoke(LoginContext.java:755)
    at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)
    at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)
    at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)
    at javax.security.auth.login.LoginContext.login(LoginContext.java:587)
    at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:60)
    at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:61)
    at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:111)
    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:149)
    ... 22 common frames omitted
Caused by: sun.security.krb5.RealmException: KrbException: Cannot locate default realm
    at sun.security.krb5.Realm.getDefault(Realm.java:68)
    at sun.security.krb5.PrincipalName.<init>(PrincipalName.java:462)
    at sun.security.krb5.PrincipalName.<init>(PrincipalName.java:471)
    at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:706)
    ... 38 common frames omitted
Caused by: sun.security.krb5.KrbException: Cannot locate default realm
    at sun.security.krb5.Config.getDefaultRealm(Config.java:1029)
    at sun.security.krb5.Realm.getDefault(Realm.java:64)
    ... 41 common frames omitted 

这使我认为库没有正确地选择配置支持,因为jaas.loginModule被指定为ScramLoginModule,但它使用Krb5LoginModule进行身份验证。

但是,很明显,当配置完成如下时(区别在于绑定器环境之外的ssl凭据),它连接到在全局ssl支持(在绑定器的env之外)中指定的绑定器,并在不显示任何错误日志的情况下无声地忽略其他绑定器。

如果在全局ssl中指定了绑定器kafka-2-with-ssl的密码凭据,则创建绑定程序,并且订阅该绑定程序的绑定开始消耗事件。但是,只有当我们需要创建单一的绑定器时,这才是有用的。

代码语言:javascript
复制
spring:
  cloud:
    stream:
      bindings:
        binding-1:
          binder: kafka-1-with-ssl
          destination: <destination-1>
          content-type: text/plain
          group: <group-id-1>
          consumer:
            header-mode: headers
        binding-2:
          binder: kafka-2-with-ssl
          destination: <destination-2>
          content-type: text/plain
          group: <group-id-2>
          consumer:
            header-mode: headers
            

      binders:
        kafka-1-with-ssl:
          type: kafka
          defaultCandidate: false
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder: 
                      brokers: <broker-hostnames-1>
                      configuration:
                        ssl:
                          truststore:
                            location: <location-1>
                            password: <ts-password-1>
                            type: JKS
                      jaas:
                        loginModule: org.apache.kafka.common.security.scram.ScramLoginModule
                        options:
                          username: <username-1>
                          password: <password-1>

        kafka-2-with-ssl:
          type: kafka
          defaultCandidate: false
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder: 
                      brokers: <broker-hostnames-2>
                      configuration:
                        ssl:
                          truststore:
                            location: <location-2>
                            password: <ts-password-2>
                            type: JKS
                      jaas:
                        loginModule: org.apache.kafka.common.security.scram.ScramLoginModule
                        options:
                          username: <username-2>
                          password: <password-2>
      kafka:
        binder:
          configuration:
            security:
              protocol: SASL_SSL
            sasl:
              mechanism: SCRAM-SHA-256 
            ssl:
              truststore:
                location: <location-2>
                password: <ts-password-2> 
                type: JKS
          jaas:
            loginModule: org.apache.kafka.common.security.scram.ScramLoginModule
             options:
               username: <username-2>
               password: <password-2> 

向您保证ssl凭据没有任何问题。努力测试的任何一个ssl卡夫卡-粘合剂成功地被单独创建。目的是用SASL_SSL协议连接多个卡夫卡绑定器。提前谢谢。

EN

回答 1

Stack Overflow用户

发布于 2021-07-07 20:27:22

我认为您可能希望遵循在基普-85中实现的解决方案来解决这个问题。与其使用Stream绑定器提供的JAAS配置或设置java.security.auth.login.config属性,不如使用优先于其他方法的sasl.jaas.config属性。通过使用sasl.jaas.config,您可以覆盖JVM放置的限制,在该限制中使用JVM范围的静态安全上下文,从而忽略了在第一个配置之后找到的任何后续JAAS配置。

下面是一个样本应用,演示了如何将具有不同安全上下文的多个Kafka集群连接为多绑定应用程序。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63185259

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档