首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >用于strimzi kafka安装的阿特拉斯MongoDB接收器连接器

用于strimzi kafka安装的阿特拉斯MongoDB接收器连接器
EN

Stack Overflow用户
提问于 2022-09-22 13:16:29
回答 1查看 61关注 0票数 0

我是一个新的kafka空间,我在Kubernetes中安装了Strimzi集群运算符、Kafka引导服务器、实体操作符和kafka connect,遵循以下准则:

https://strimzi.io/docs/operators/latest/deploying.html

如何为strimzi kafka连接集群设置kafka mongo接收器连接器?

我有官方的mongodb连接器插件。我能用这个插件连接到地图集mongodb吗?

大多数论坛都对融合的卡夫卡进行了解释,但没有对卡夫卡进行解释。

下面是我的kafka连接配置:

代码语言:javascript
复制
apiVersion: kafka.strimzi.io/v1beta2

kind: KafkaConnect

metadata:

  name: my-mongo-connect

  annotations:

    strimzi.io/use-connector-resources: "true"

spec:

  image: STRIMZI KAFKA CONNECT IMAGE WITH MONGODB PLUGIN

  version: 3.2.1

  replicas: 1

  bootstrapServers:  my-cluster-kafka-bootstrap:9092

  logging:

    type: inline

    loggers:

      connect.root.logger.level: "INFO"

  config:

    group.id:  my-cluster

    offset.storage.topic: mongo-connect-cluster-offsets

    config.storage.topic: mongo-connect-cluster-configs

    status.storage.topic: mongo-connect-cluster-status

    key.converter: org.apache.kafka.connect.json.JsonConverter

    value.converter: org.apache.kafka.connect.json.JsonConverter

    key.converter.schemas.enable: true

    value.converter.schemas.enable: true

    config.storage.replication.factor: -1

    offset.storage.replication.factor: -1

    status.storage.replication.factor: -1

下面是我的接收器连接器配置:

代码语言:javascript
复制
apiVersion: kafka.strimzi.io/v1beta2

kind: KafkaConnector

metadata:

  name: mongodb-sink-connector

  labels:

    strimzi.io/cluster: my-cluster

spec:

  class: com.mongodb.kafka.connect.MongoSinkConnector

  tasksMax: 2

  config:

    topics: my-topic

    connection.uri: "MONGO ATLAS CONNECTION STRING"

    database: my_database

    collection: my_collection

    post.processor.chain: com.mongodb.kafka.connect.sink.processor.DocumentIdAdder,com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder

    key.converter: org.apache.kafka.connect.json.JsonConverter

    key.converter.schemas.enable: false

    value.converter: org.apache.kafka.connect.json.JsonConverter

    value.converter.schemas.enable: false

但是,虽然我的kafka服务器已经启动,并且运行了生产者-消费者示例,但是上面的设置不起作用。

官方mongodb插件(Maven Central搜索)是否适合于此?还是使用debezium mongodb连接器?

如果有人能在这方面逐步阐明一些指导方针,那将有很大的帮助。

提前谢谢。

EN

回答 1

Stack Overflow用户

发布于 2022-09-23 09:07:03

由于评论部分越来越长,所以我会在评论中发布一些问题的答案。

下面是我的文档:

代码语言:javascript
复制
FROM quay.io/strimzi/kafka:0.31.0-kafka-3.2.1
USER root:root
COPY ./my-plugins/ /opt/kafka/plugins/
USER 1001

my-plugins文件夹包含两个jar文件mongo-kafka-connect-1.8.0.jarmongodb-driver-core-4.7.1.jar。我不确定我是否需要地图集mongodb的核心驱动插件,但无论如何,我有它。

我将生产者-消费者示例版本更改为:

代码语言:javascript
复制
kubectl -n kafka run kafka-producer -ti --image=quay.io/strimzi/kafka:0.31.0-kafka-3.2.1 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list my-cluster-kafka-bootstrap:9092 --topic my-topic

总结一下,我的strimzi operator is 0.31.0kafka-connect is set to 3.2.1是与这里支持的版本下的兼容性表对齐的

关于在卡夫卡连接中添加tls spec部分,活性探针失败地说:failed to connect to the IP-ADDRESS和吊舱继续重新启动。

下面是我的tls规范,在我的kafka服务器中有证书:

代码语言:javascript
复制
tls:
     trustedCertificates:
      - secretName: my-cluster-cluster-ca-cert 
        certificate: ca.crt

我把钥匙转换器按建议移走了。但是我的卡夫卡-连接中的group.id应该是什么呢?

我还将storage.topic配置更改为:

代码语言:javascript
复制
   offset.storage.topic: mongo-connect-cluster-offsets
   config.storage.topic: mongo-connect-cluster-configs
   status.storage.topic: mongo-connect-cluster-status

我参考了这个博客的上述设置。

kubectl -n kafka exec -it YOUR-KAFKA-CONNECT-POD -- curl http://localhost:8083/connectors的日志是[],所以这里有一个问题吗?

来自kubectl -n kafka exec -it YOUR-KAFKA-CONNECT-POD -- bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --list的日志

代码语言:javascript
复制
__consumer_offsets
__strimzi-topic-operator-kstreams-topic-store-changelog
__strimzi_store_topic
mongo-connect-cluster-configs
mongo-connect-cluster-offsets
mongo-connect-cluster-status
my-topic

下面是卡夫卡连接的荚日志,我很快就会找到一种共享整个日志文件的方法。

我哪里出问题了?另外,如何验证数据流是否以预期的方式发生?

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73815342

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档