在使用spring-cloud-stream-binder-kafka时,我在配置到Confluent的连接时遇到问题。也许有人能看到哪里出了问题?
当我使用来自https://www.confluent.io/blog/schema-registry-avro-in-spring-boot-application-tutorial/的示例时,它工作得很好,并且我可以在Confluent Cloud上看到消息
但是,当使用spring-cloud-stream-binder-kafka配置添加相同的连接详细信息时,它会返回未经授权的错误。
Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"MySchema","namespace":"org.test","fields":[{"name":"value","type":"double"}]}
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401我下面的配置给出了上述错误。不确定哪里出了问题?
cloud:
stream:
default:
producer:
useNativeEncoding: true
kafka:
binder:
brokers: myinstance.us-east1.gcp.confluent.cloud:9092
producer-properties:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url: https://myinstance.us-central1.gcp.confluent.cloud
basic.auth.credentials.source: USER_INFO
schema.registry.basic.auth.user.info: mySchemaKey:mySchemaSecret
configuration:
ssl.endpoint.identification.algorithm: https
sasl.mechanism: PLAIN
request.timeout.ms: 20000
retry.backoff.ms: 500
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="myKey" password="MySecret";
security.protocol: SASL_SSL
bindings:
normals-out:
destination: normals
contentType: application/*+avro来自Confluent的运行良好的示例:
kafka:
bootstrap-servers:
- myinstance.us-east1.gcp.confluent.cloud:9092
properties:
ssl.endpoint.identification.algorithm: https
sasl.mechanism: PLAIN
request.timeout.ms: 20000
retry.backoff.ms: 500
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="myKey" password="MySecret";
security.protocol: SASL_SSL
schema.registry.url: https://myinstance.us-central1.gcp.confluent.cloud
basic.auth.credentials.source: USER_INFO
schema.registry.basic.auth.user.info: mySchemaKey:mySchemaSecret
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
template:
default-topic:
logging:
level:
root: info发布于 2020-05-28 01:05:28
我的问题只是在我的pom中缺少了一个依赖项。
我应该删除我的问题,但我把它留在这里作为参考,说明配置确实像上面那样工作。
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
<version>5.3.0</version>
</dependency>https://stackoverflow.com/questions/62029048
复制相似问题