首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Kafka连接使用REST与Strimzi : KafkaConnector

Kafka连接使用REST与Strimzi : KafkaConnector
EN

Stack Overflow用户
提问于 2021-07-26 10:30:07
回答 1查看 629关注 0票数 0

我试图使用Kafka来管理连接器,为了简单起见,请考虑以下pause实现:

代码语言:javascript
复制
def pause(): Unit = {
      logger.info(s"pause() Triggered")
      val response = HttpClient.newHttpClient.send({
        HttpRequest
          .newBuilder(URI.create(config.connectUrl + s"/connectors/${config.connectorName}/pause"))
          .PUT(BodyPublishers.noBody)
          .timeout(Duration.ofMillis(config.timeout.toMillis.toInt))
          .build()
      }, BodyHandlers.ofString)
      if (response.statusCode() != HTTPStatus.Accepted) {
        throw new Exception(s"Could not pause connector: ${response.body}")
      }
    }

由于我使用KafkaConnector作为资源,所以不能使用KafkaConnetor,因为连接器运算符将KafkaConnetor资源作为其唯一的真实来源,因此直接使用KafkaConnetor进行的手动更改(如pause )将由群集运算符还原。

因此,要暂停连接器,我需要以某种方式编辑资源。

我很难改变当前函数的逻辑,有一些如何处理KafkaConnetor资源的实用示例将是很棒的。

我查看了使用Strimzi文档,但是找不到任何实用的例子

谢谢!

在@Jakub的帮助下,我成功地创建了我的新客户机:

代码语言:javascript
复制
class KubernetesService(config: Configuration) extends StrictLogging {

  private[this] val client = new DefaultKubernetesClient(Config.autoConfigure(config.connectorContext))

  def setPause(pause: Boolean): Unit = {
    logger.info(s"[KubernetesService] - setPause($pause) Triggered")

    val connector = getConnector()
    connector.getSpec.setPause(pause)
    Crds.kafkaConnectorOperation(client).inNamespace(config.connectorNamespace).withName(config.connectorName).replace(connector)

    Crds.kafkaConnectorOperation(client)
      .inNamespace(config.connectorNamespace)
      .withName(config.connectorName)
      .waitUntilCondition(connector => {
        connector != null &&
          connector.getSpec.getPause == pause && {
          val desiredState = if (pause) "Paused" else "Running"
          connector.getStatus.getConditions.stream().anyMatch(_.getType.equalsIgnoreCase(desiredState))
        }
      }, config.timeout.toMillis, TimeUnit.MILLISECONDS)
  }

  def delete(): Unit = {
    logger.info(s"[KubernetesService] - delete() Triggered")

    Crds.kafkaConnectorOperation(client).inNamespace(config.connectorNamespace).withName(config.connectorName).delete

    Crds.kafkaConnectorOperation(client)
      .inNamespace(config.connectorNamespace)
      .withName(config.connectorName)
      .waitUntilCondition(_ == null, config.timeout.toMillis, TimeUnit.MILLISECONDS)
  }

  def create(oldKafkaConnect: KafkaConnector): Unit = {
    logger.info(s"[KubernetesService] - create(${oldKafkaConnect.getMetadata}) Triggered")

    Crds.kafkaConnectorOperation(client).inNamespace(config.connectorNamespace).withName(config.connectorName).create(oldKafkaConnect)

    Crds.kafkaConnectorOperation(client)
      .inNamespace(config.connectorNamespace)
      .withName(config.connectorName)
      .waitUntilCondition(connector => {
          connector != null &&
          connector.getStatus.getConditions.stream().anyMatch(_.getType.equalsIgnoreCase("Running"))
      }, config.timeout.toMillis, TimeUnit.MILLISECONDS)
  }

  def getConnector(): KafkaConnector = {
    logger.info(s"[KubernetesService] - getConnector() Triggered")
    Try {
      Crds.kafkaConnectorOperation(client).inNamespace(config.connectorNamespace).withName(config.connectorName).get
    } match {
      case Success(connector) => connector
      case Failure(_: NullPointerException) => throw new NullPointerException(s"Failure on getConnector(${config.connectorName}) on ns: ${config.connectorNamespace}, context: ${config.connectorContext}")
      case Failure(exception) => throw exception
    }
  }
}
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-07-26 12:03:37

要暂停连接器,可以编辑KafkaConnector资源并将.spec中的pause字段设置为true (参见文档)。有几种选择,你可以这样做。您可以使用kubectl,或者应用新的YAML文件(kubectl apply),或者使用kubectl edit进行交互操作。

如果要以编程方式执行,则需要使用Kubernetes客户端来编辑资源。在Java中,您还可以使用api模块,该模块拥有用于编辑资源的所有结构。我为使用Fabric8 Kubernetes客户端和api模块在Java中暂停Kafka连接器提供了一个简单的示例:

代码语言:javascript
复制
package cz.scholz.strimzi.api.examples;

import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.strimzi.api.kafka.Crds;
import io.strimzi.api.kafka.KafkaConnectorList;
import io.strimzi.api.kafka.model.KafkaConnector;

public class PauseConnector {
    public static void main(String[] args) {
        String namespace = "myproject";
        String crName = "my-connector";

        KubernetesClient client = new DefaultKubernetesClient();
        MixedOperation<KafkaConnector, KafkaConnectorList, Resource<KafkaConnector>> op = Crds.kafkaConnectorOperation(client);
        KafkaConnector connector = op.inNamespace(namespace).withName(crName).get();
        connector.getSpec().setPause(true);
        op.inNamespace(namespace).withName(crName).replace(connector);

        client.close();
    }
}

(完整的项目请参见https://github.com/scholzj/strimzi-api-examples )

我不是Scala用户,但我认为Scala也应该是可用的,但是我将把它从Java重写到Scala。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68528316

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档