首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >是否可以创建一个与的多绑定绑定- Kafka-Streams从集群A流到集群B?

是否可以创建一个与的多绑定绑定- Kafka-Streams从集群A流到集群B?
EN

Stack Overflow用户
提问于 2022-08-10 13:44:15
回答 1查看 474关注 0票数 0

我想用创建一个Kafka流应用程序,它集成了两个不同的Kafka集群/设置。我尝试使用多绑定器配置来实现它,如文档中提到的那样,并且类似于下面的示例:https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/multi-binder-samples

给定这样一个简单的函数:

代码语言:javascript
复制
    @Bean
public Function<KStream<String, AnalyticsEvent>, KStream<String, UpdateEvent>> analyticsEventProcessor() {
    return input -> input
            .filter(new AnalyticsPredicate())
            .map(new AnalyticsToUpdateEventMapper());
}

在配置中,我试图将这些绑定绑定到不同的绑定程序。

代码语言:javascript
复制
spring.cloud:
  stream:
    bindings:
      analyticsEventProcessor-in-0:
        destination: analytics-events
        binder: cluster1-kstream
      analyticsEventProcessor-out-0:
        destination: update-events
        binder: cluster2-kstream

binders:
  cluster1-kstream:
    type: kstream
    environment:
      spring:
        cloud:
          stream:
            kafka:
              binder:
                brokers: <url cluster1>:9093
                configuration:
                  security.protocol: SSL
                  schema.registry.url: <schema-registry-url-cluster1>
                  schema.registry.ssl.truststore.location: /mnt/secrets/cluster1/truststore.jks
                  schema.registry.ssl.truststore.password: ${SPRING_KAFKA_SSL_CLUSTER1_TRUST-STORE-PASSWORD}
                  schema.registry.ssl.keystore.location: /mnt/secrets/cluster1/keystore.jks
                  schema.registry.ssl.keystore.password: ${SPRING_KAFKA_SSL_CLUSTER1_KEY-STORE-PASSWORD}
                  ssl.truststore.location: /mnt/secrets/cluster1/truststore.jks
                  ssl.truststore.password: ${SPRING_KAFKA_SSL_CLUSTER1_TRUST-STORE-PASSWORD}
                  ssl.truststore.type: JKS
                  ssl.keystore.location: /mnt/secrets/cluster1/keystore.jks
                  ssl.keystore.password: ${SPRING_KAFKA_SSL_CLUSTER1_KEY-STORE-PASSWORD}
                  ssl.keystore.type: JKS
                  ssl.enabled.protocols: TLSv1.2
              streams:
                binder:
                  brokers: <url cluster1>:9093
                  configuration:
                    security.protocol: SSL
                    schema.registry.url: <schema-registry-url-cluster1>
                    schema.registry.ssl.truststore.location: /mnt/secrets/cluster1/truststore.jks
                    schema.registry.ssl.truststore.password: ${SPRING_KAFKA_SSL_CLUSTER1_TRUST-STORE-PASSWORD}
                    schema.registry.ssl.keystore.location: /mnt/secrets/cluster1/keystore.jks
                    schema.registry.ssl.keystore.password: ${SPRING_KAFKA_SSL_CLUSTER1_KEY-STORE-PASSWORD} 
                    ssl.truststore.location: /mnt/secrets/cluster1/truststore.jks
                    ssl.truststore.password: ${SPRING_KAFKA_SSL_CLUSTER1_TRUST-STORE-PASSWORD}
                    ssl.truststore.type: JKS
                    ssl.keystore.location: /mnt/secrets/cluster1/keystore.jks
                    ssl.keystore.password: ${SPRING_KAFKA_SSL_CLUSTER1_KEY-STORE-PASSWORD}
                    ssl.keystore.type: JKS
                    ssl.enabled.protocols: TLSv1.2
  cluster2-kstream:
    type: kstream
    environment:
      spring:
        cloud:
          stream:
            kafka:
              binder:
                brokers: <url cluster2>:9093
                configuration:
                  security.protocol: SSL
                  schema.registry.url: <schema-registry-url-cluster2>
                  schema.registry.ssl.truststore.location: /mnt/secrets/cluster2/truststore.jks
                  schema.registry.ssl.truststore.password: ${SPRING_KAFKA_SSL_CLUSTER2_TRUST-STORE-PASSWORD}
                  schema.registry.ssl.keystore.location: /mnt/secrets/cluster2/keystore.jks
                  schema.registry.ssl.keystore.password: ${SPRING_KAFKA_SSL_CLUSTER2_KEY-STORE-PASSWORD}
                  ssl.truststore.location: /mnt/secrets/cluster2/truststore.jks
                  ssl.truststore.password: ${SPRING_KAFKA_SSL_CLUSTER2_TRUST-STORE-PASSWORD}
                  ssl.truststore.type: JKS
                  ssl.keystore.location: /mnt/secrets/cluster2/keystore.jks
                  ssl.keystore.password: ${SPRING_KAFKA_SSL_CLUSTER2_KEY-STORE-PASSWORD}
                  ssl.keystore.type: JKS
                  ssl.enabled.protocols: TLSv1.2
              streams:
                binder:
                  brokers: <url cluster2>:9093
                  configuration:
                    security.protocol: SSL
                    schema.registry.url: <schema-registry-url-cluster2>
                    schema.registry.ssl.truststore.location: /mnt/secrets/cluster2/truststore.jks
                    schema.registry.ssl.truststore.password: ${SPRING_KAFKA_SSL_CLUSTER2_TRUST-STORE-PASSWORD}
                    schema.registry.ssl.keystore.location: /mnt/secrets/cluster2/keystore.jks
                    schema.registry.ssl.keystore.password: ${SPRING_KAFKA_SSL_CLUSTER2_KEY-STORE-PASSWORD} 
                    ssl.truststore.location: /mnt/secrets/cluster2/truststore.jks
                    ssl.truststore.password: ${SPRING_KAFKA_SSL_CLUSTER2_TRUST-STORE-PASSWORD}
                    ssl.truststore.type: JKS
                    ssl.keystore.location: /mnt/secrets/cluster2/keystore.jks
                    ssl.keystore.password: ${SPRING_KAFKA_SSL_CLUSTER2_KEY-STORE-PASSWORD}
                    ssl.keystore.type: JKS
                    ssl.enabled.protocols: TLSv1.2

我首先尝试在一个运行良好的集群中完全运行应用程序。当我运行这个程序时,我总是会得到一个错误:

代码语言:javascript
复制
2022-08-10 15:28:42.892  WARN 1 --- [-StreamThread-2] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=<clientid>-StreamThread-2-consumer, groupId=<group-id>] Error while fetching metadata with correlation id 2 : {analytics-events=TOPIC_AUTHORIZATION_FAILED}
2022-08-10 15:28:42.893 ERROR 1 --- [-StreamThread-2] org.apache.kafka.clients.Metadata        : [Consumer clientId=<client-id>, groupId=<group-id>] Topic authorization failed for topics [analytics-events]
2022-08-10 15:28:42.893  INFO 1 --- [-StreamThread-2] org.apache.kafka.clients.Metadata        : [Consumer clientId=<client-id>, groupId=<group-id>] Cluster ID: <cluster-id>
2022-08-10 15:28:42.893 ERROR 1 --- [-StreamThread-2] c.s.a.a.e.UncaughtExceptionHandler       : org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [analytics-events]
2022-08-10 15:28:42.893 ERROR 1 --- [-StreamThread-2] org.apache.kafka.streams.KafkaStreams    : stream-client [<client-id>] Replacing thread in the streams uncaught exception handler

org.apache.kafka.streams.errors.StreamsException: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [analytics-events]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:642) ~[kafka-streams-3.1.1.jar!/:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:576) ~[kafka-streams-3.1.1.jar!/:na]
Caused by: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [analytics-events]

我验证了卡夫卡客户证明他们应该是正确的。我用keytool查看了它们,并且正确设置了密码env。consumerConfig还使用正确的代理URL。

是否可以在KStream函数中使用不同的卡夫卡集群,多绑定器作为流的输入和输出,这是可能的,还是只适用于类型卡夫卡绑定?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-08-11 16:24:11

在Kafka流中,您不能在一个应用程序中连接到两个不同的集群。这意味着在使用函数时,不能从入站上的集群接收到,也不能写入出站上的另一个集群。有关更多细节,请参见这个线程。

您可能可以接收和写入您的Kafka流中的同一个集群作为解决方案。然后,使用一个基于常规Kafka绑定器的函数,简单地将输出主题连接到第二个集群。在常规函数(非Kafka流)中,它可以从多个集群中消耗并发布到多个集群。

代码语言:javascript
复制
@Bean
public Function<KStream<String, AnalyticsEvent>, KStream<String, UpdateEvent>> analyticsEventProcessor() {
    return input -> input
            .filter(new AnalyticsPredicate())
            .map(new AnalyticsToUpdateEventMapper());
}

此函数需要接收并写入同一个集群。然后您可以有另一个函数,如下所示。

代码语言:javascript
复制
@Bean
public Function<?, ?> bridgeFunction() {
    ....
}

对于这个函数,输入是集群-1,输出是集群-2.

在使用此解决方案时,请确保将常规的Kafka绑定器也作为依赖项- spring-cloud-stream-binder-kafka

请记住,这种方法有一些缺点,例如增加额外的主题开销、由此产生的延迟等等。然而,这是这个用例的潜在解决方案。有关更多选项,请参见前面提到的SO线程。

代码语言:javascript
复制
  [1]: https://stackoverflow.com/questions/45847690/how-to-connect-to-multiple-clusters-in-a-single-kafka-streams-application
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73307435

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档