我正在使用spring cloud stream和使用confluent's schema registry的Aiven的模式注册表。Aiven的模式注册表使用密码保护。根据these的说明,需要设置这两个配置参数才能成功访问模式注册表服务器。
props.put("basic.auth.credentials.source", "USER_INFO");
props.put("basic.auth.user.info", "avnadmin:schema-reg-password");当我只使用vanilla java的kafka驱动时,一切都很好,但是如果我使用Spring cloud stream,我不知道如何注入这两个参数。目前,我将"basic.auth.user.info"和"basic.auth.credentials.source"放在application.yml文件中的"spring.cloud.stream.kafka.binder.configuration"下。
这样做的时候,我将"401 Unauthorized"放在模式要注册的位置。
更新1:
根据'Ali n‘的建议,我更新了SchemaRegistryClient的bean的配置方式,以便它能够感知SSL上下文。
@Bean
public SchemaRegistryClient schemaRegistryClient(
@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint) {
try {
final KeyStore keyStore = KeyStore.getInstance("PKCS12");
keyStore.load(new FileInputStream(
new File("path/to/client.keystore.p12")),
"secret".toCharArray());
final KeyStore trustStore = KeyStore.getInstance("JKS");
trustStore.load(new FileInputStream(
new File("path/to/client.truststore.jks")),
"secret".toCharArray());
TrustStrategy acceptingTrustStrategy = (X509Certificate[] chain, String authType) -> true;
SSLContext sslContext = SSLContextBuilder
.create()
.loadKeyMaterial(keyStore, "secret".toCharArray())
.loadTrustMaterial(trustStore, acceptingTrustStrategy)
.build();
HttpClient httpClient = HttpClients.custom().setSSLContext(sslContext).build();
ClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory(
httpClient);
ConfluentSchemaRegistryClient schemaRegistryClient = new ConfluentSchemaRegistryClient(
new RestTemplate(requestFactory));
schemaRegistryClient.setEndpoint(endpoint);
return schemaRegistryClient;
} catch (Exception ex) {
ex.printStackTrace();
return null;
}
}这有助于消除应用程序启动时的错误,并注册模式。然而,每当应用程序想要向Kafka推送消息时,就会再次抛出一个新的错误。最后,mmelsen的回答也解决了这一问题。
发布于 2019-05-01 14:58:45
我遇到了同样的问题,因为我所处的情况是连接到一个由aiven托管并由basic身份验证保护的安全模式注册表。为了让它正常工作,我必须配置以下属性:
spring.kafka.properties.schema.registry.url=https://***.aiven***.com:port
spring.kafka.properties.basic.auth.credentials.source=USER_INFO
spring.kafka.properties.basic.auth.user.info=username:password我的绑定器的其他属性是:
spring.cloud.stream.binders.input.type=kafka
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.brokers=https://***.aiven***.com:port <-- different from the before mentioned port
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.security.protocol=SSL
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location=truststore.jks
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.truststore.password=secret
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.keystore.type=PKCS12
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.keystore.location=clientkeystore.p12
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.keystore.password=secret
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.key.password=secret
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.streams.binder.autoCreateTopics=false实际发生的情况是,Spring cloud stream会将spring.kafka.properties.basic*添加到DefaultKafkaConsumerFactory,这会将配置添加到KafkaConsumer。在spring kafka初始化期间的某个时刻,会创建一个CachedSchemaRegistryClient,其中提供了这些属性。此客户端包含一个名为configureRestService的方法,该方法将检查属性映射是否包含"basic.auth.credentials.source“。当我们通过spring.kafka.properties提供时,它将找到此属性,并在访问模式注册表的端点时负责创建适当的标头。
希望这也适用于你。
我使用的是spring cloud版本Greenwich.SR1、spring-boot-starter 2.1.4.RELEASE、avro-version1.8.2和confluent.version 5.2.1
发布于 2019-04-16 21:06:13
绑定器配置只处理众所周知的消费者和生产者属性。
您可以在绑定级别设置任意属性。
spring.cloud.stream.kafka.binding.<binding>.consumer.configuration.basic.auth...发布于 2019-04-16 21:19:42
由于Aiven使用SSL for Kafka安全协议,因此需要使用证书进行身份验证。
你可以关注this page来了解它是如何工作的。简而言之,您需要运行以下命令来生成证书并导入它们:
openssl pkcs12 -export -inkey service.key -in service.cert -out client.keystore.p12 -name service_key
keytool -import -file ca.pem -alias CA -keystore client.truststore.jks然后,您可以使用以下属性来使用证书:
spring.cloud.stream.kafka.streams.binder:
configuration:
security.protocol: SSL
ssl.truststore.location: client.truststore.jks
ssl.truststore.password: secret
ssl.keystore.type: PKCS12
ssl.keystore.location: client.keystore.p12
ssl.keystore.password: secret
ssl.key.password: secret
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: org.apache.kafka.common.serialization.StringSerializerhttps://stackoverflow.com/questions/55701542
复制相似问题