我是一个新的kafka空间,我在Kubernetes中安装了Strimzi集群运算符、Kafka引导服务器、实体操作符和kafka connect,遵循以下准则:
https://strimzi.io/docs/operators/latest/deploying.html
如何为strimzi kafka连接集群设置kafka mongo接收器连接器?
我有官方的mongodb连接器插件。我能用这个插件连接到地图集mongodb吗?
大多数论坛都对融合的卡夫卡进行了解释,但没有对卡夫卡进行解释。
下面是我的kafka连接配置:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: my-mongo-connect
annotations:
strimzi.io/use-connector-resources: "true"
spec:
image: STRIMZI KAFKA CONNECT IMAGE WITH MONGODB PLUGIN
version: 3.2.1
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9092
logging:
type: inline
loggers:
connect.root.logger.level: "INFO"
config:
group.id: my-cluster
offset.storage.topic: mongo-connect-cluster-offsets
config.storage.topic: mongo-connect-cluster-configs
status.storage.topic: mongo-connect-cluster-status
key.converter: org.apache.kafka.connect.json.JsonConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: true
value.converter.schemas.enable: true
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1下面是我的接收器连接器配置:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: mongodb-sink-connector
labels:
strimzi.io/cluster: my-cluster
spec:
class: com.mongodb.kafka.connect.MongoSinkConnector
tasksMax: 2
config:
topics: my-topic
connection.uri: "MONGO ATLAS CONNECTION STRING"
database: my_database
collection: my_collection
post.processor.chain: com.mongodb.kafka.connect.sink.processor.DocumentIdAdder,com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder
key.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: false
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: false但是,虽然我的kafka服务器已经启动,并且运行了生产者-消费者示例,但是上面的设置不起作用。
官方mongodb插件(Maven Central搜索)是否适合于此?还是使用debezium mongodb连接器?
如果有人能在这方面逐步阐明一些指导方针,那将有很大的帮助。
提前谢谢。
发布于 2022-09-23 09:07:03
由于评论部分越来越长,所以我会在评论中发布一些问题的答案。
下面是我的文档:
FROM quay.io/strimzi/kafka:0.31.0-kafka-3.2.1
USER root:root
COPY ./my-plugins/ /opt/kafka/plugins/
USER 1001my-plugins文件夹包含两个jar文件mongo-kafka-connect-1.8.0.jar和mongodb-driver-core-4.7.1.jar。我不确定我是否需要地图集mongodb的核心驱动插件,但无论如何,我有它。
我将生产者-消费者示例版本更改为:
kubectl -n kafka run kafka-producer -ti --image=quay.io/strimzi/kafka:0.31.0-kafka-3.2.1 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list my-cluster-kafka-bootstrap:9092 --topic my-topic总结一下,我的strimzi operator is 0.31.0和kafka-connect is set to 3.2.1是与这里支持的版本下的兼容性表对齐的
关于在卡夫卡连接中添加tls spec部分,活性探针失败地说:failed to connect to the IP-ADDRESS和吊舱继续重新启动。
下面是我的tls规范,在我的kafka服务器中有证书:
tls:
trustedCertificates:
- secretName: my-cluster-cluster-ca-cert
certificate: ca.crt我把钥匙转换器按建议移走了。但是我的卡夫卡-连接中的group.id应该是什么呢?
我还将storage.topic配置更改为:
offset.storage.topic: mongo-connect-cluster-offsets
config.storage.topic: mongo-connect-cluster-configs
status.storage.topic: mongo-connect-cluster-status我参考了这个博客的上述设置。
kubectl -n kafka exec -it YOUR-KAFKA-CONNECT-POD -- curl http://localhost:8083/connectors的日志是[],所以这里有一个问题吗?
来自kubectl -n kafka exec -it YOUR-KAFKA-CONNECT-POD -- bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --list的日志
__consumer_offsets
__strimzi-topic-operator-kstreams-topic-store-changelog
__strimzi_store_topic
mongo-connect-cluster-configs
mongo-connect-cluster-offsets
mongo-connect-cluster-status
my-topic下面是卡夫卡连接的荚日志,我很快就会找到一种共享整个日志文件的方法。

我哪里出问题了?另外,如何验证数据流是否以预期的方式发生?
https://stackoverflow.com/questions/73815342
复制相似问题