首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >为什么我的卡夫卡S3连接器水槽删除我的主题后,创造(卡夫卡连接器重新启动)?

为什么我的卡夫卡S3连接器水槽删除我的主题后,创造(卡夫卡连接器重新启动)?
EN

Stack Overflow用户
提问于 2021-03-19 23:09:00
回答 1查看 308关注 0票数 0

我使用Confluent的Kafka连接器将数据从Kafka汇到一个MinIO桶中。我在Kubernetes环境中使用io.confluent.connect.s3.S3SinkConnector。这是我当前的S3 Sink配置:

代码语言:javascript
复制
kafkaConnectSpec:
  class: io.confluent.connect.s3.S3SinkConnector
  config:
     tasks.max: 1
     topics: topic-name
     s3.bucket.name: bucket
     flush.size: 1
     store.url: http://minio:9000
     format.class: io.confluent.connect.s3.format.json.JsonFormat
     storage.class: io.confluent.connect.s3.storage.S3Storage

在云环境部署之后,客户希望能够动态地控制主题(即。(任意添加和删除主题)。虽然我理解这可能不是理想的原因,但我屈服于上级当局。

因此,为了执行主题添加,我使用了Kafka:

代码语言:javascript
复制
def update_sink(topic, connector):
 configuration = requests.get("http://kafka-connect-cluster-connect-api:8083/connectors/" + str(connector)).json()

    if "config" not in configuration:
        return {
            "status": 500,
            "message": "Kafka Sink " + str(connector) + " does not have a configuration"
        }

    # Topics must be comma delimited
    if "topics" in configuration["config"]:
        configuration["config"]["topics"] += ',' + topic

    requests.put("http://kafka-connect-cluster-connect-api:8083/connectors/" + str(connector) + "/config", json=configuration["config"])
    print("http://kafka-connect-cluster-connect-api:8083/connectors/" + str(connector) + "/config")
    print(configuration["config"])
    return {
        "status": 200,
        "message": "Kafka Sink " + str(connector) + " successfully updated"
    }

我知道代码不是很漂亮,但现在它已经完成了任务。它本质上向/connectors/my-sink/config端点发出PUT请求,并附加了我的新主题。

这个很管用。我的水槽有了新的主题,我可以发送消息。

然而,在3-5分钟内,我的卡夫卡水槽开始重新启动(我认为)卡夫卡连接器:

代码语言:javascript
复制
2021-03-19 23:02:55,086 INFO [Worker clientId=connect-1, groupId=connect-cluster] Connector minio-s3-sink config updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [KafkaBasedLog Work Thread - connect-cluster-configs]
2021-03-19 23:02:55,589 INFO [Worker clientId=connect-1, groupId=connect-cluster] Handling connector-only config update by restarting connector minio-s3-sink (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1]
2021-03-19 23:02:55,589 INFO Stopping connector minio-s3-sink (org.apache.kafka.connect.runtime.Worker) [DistributedHerder-connect-1-1]
2021-03-19 23:02:55,589 INFO Shutting down S3 connector minio-s3-sink (io.confluent.connect.s3.S3SinkConnector) [DistributedHerder-connect-1-1]
2021-03-19 23:02:55,598 INFO Stopped connector minio-s3-sink (org.apache.kafka.connect.runtime.Worker) [DistributedHerder-connect-1-1]
2021-03-19 23:02:55,598 INFO [Worker clientId=connect-1, groupId=connect-cluster] Starting connector minio-s3-sink (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1]
... # Performs the restart here

这时,吊舱失去了话题。

我认为这是由config.action.reload = restart配置值造成的。我认为在接收到新的配置后,连接器将在N分钟后重新启动。但是,我似乎找不到任何关于如何改变这种行为的文档。也许我应该在我的PUT请求中这样做,但这感觉很烦人。这也只是黑暗中的一枪。

有人知道为什么我的连接器在PUT请求之后用更新的配置重新启动吗?有办法阻止这件事吗?

编辑#1:我尝试添加config.action.reload = none,但是连接器仍然重新启动。

我观看了卡夫卡操作员的日志,但没有触发复位。似乎完全孤立于卡夫卡连接器操作员。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-03-24 00:14:43

这个问题在这里被记录在Strimzi中:

如果启用了

,直接使用KafkaConnectors进行的手动更改将由群集操作符恢复

https://strimzi.io/docs/operators/latest/deploying.html#availability_of_the_kafka_connect_rest_api

我不知道这发生在我的部署,但显然我们必须关闭它。这是不幸的,因为K8连接器的部署对于一个简单的开始来说是很好的。

这是将它们“关闭”的相关配置:

代码语言:javascript
复制
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
  name: kafka-connect-cluster
  annotations:
#  # use-connector-resources configures this KafkaConnect
#  # to use KafkaConnector resources to avoid
#  # needing to call the Connect REST API directly
    strimzi.io/use-connector-resources: "false"

strimzi.io/use-connector-resources: "false"将使您不能通过YAML文件添加连接器,但您可以通过REST添加连接器(而且这些更改将持续到荚运行时)。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/66716670

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档