首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Strimzi -连接外部客户端

Strimzi -连接外部客户端
EN

Stack Overflow用户
提问于 2019-10-05 11:26:45
回答 2查看 935关注 0票数 1

在讨论here之后,我使用以下步骤来启用外部客户端(基于kafkajs)连接到OpenShift上的Strimzi。这些步骤来自here

启用外部路由

kafka-persistent-single.yaml编辑为,如下所示。

代码语言:javascript
复制
apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    version: 2.3.0
    replicas: 1
    listeners:
      plain: {}
      tls: {}
      external:
          type: route
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
      log.message.format.version: "2.3"
    storage:
      type: jbod
      volumes:
      - id: 0
        type: persistent-claim
        size: 5Gi
        deleteClaim: false
  zookeeper:
    replicas: 1
    storage:
      type: persistent-claim
      size: 5Gi
      deleteClaim: false
  entityOperator:
    topicOperator: {}
    userOperator: {}

提取证书,

为了提取证书并在客户端使用它,我运行了以下命令:

代码语言:javascript
复制
kubectl get secret my-cluster-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 -D > ca.crt

请注意,我必须在macOS上使用base64 -D,而不是文档中所示的base64 -d

Kafkajs客户端

这是根据他们的npm页面和documentation改编的客户端。

代码语言:javascript
复制
const fs = require('fs')
const { Kafka } = require('kafkajs')

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['my-cluster-kafka-bootstrap-messaging-os.192.168.99.100.nip.io'],
  ssl : { rejectUnauthorized: false,
    ca : [fs.readFileSync('ca.crt', 'utf-8')]
  }
})

const producer = kafka.producer()
const consumer = kafka.consumer({ groupId: 'test-group' })

const run = async () => {
  // Producing
  await producer.connect()
  await producer.send({
    topic: 'test-topic',
    messages: [
      { value: 'Hello KafkaJS user!' },
    ],
  })

  // Consuming
  await consumer.connect()
  await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log({
        partition,
        offset: message.offset,
        value: message.value.toString(),
      })
    },
  })
}

run().catch(console.error)

问题

当我从包含ca.crt的文件夹中运行node sample.js时,我得到了一条拒绝连接的消息。

代码语言:javascript
复制
{"level":"ERROR","timestamp":"2019-10-05T03:22:40.491Z","logger":"kafkajs","message":"[Connection] Connection error: connect ECONNREFUSED 192.168.99.100:9094","broker":"my-cluster-kafka-bootstrap-messaging-os.192.168.99.100.nip.io:9094","clientId":"my-app","stack":"Error: connect ECONNREFUSED 192.168.99.100:9094\n    at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1113:14)"}

我遗漏了什么?

EN

回答 2

Stack Overflow用户

发布于 2019-10-05 13:16:03

我猜问题在于您在代理地址上丢失了正确的端口443,因此您必须使用

经纪人:'my-cluster-kafka-bootstrap-messaging-os.192.168.99.100.nip.io:443‘

否则,它将尝试连接到OpenShift路由上的默认端口80。

票数 0
EN

Stack Overflow用户

发布于 2019-10-07 23:34:28

在与@ppatierno进行了广泛的讨论之后,我觉得Strimzi集群与Kafka控制台客户端配合得很好。另一方面,使用NOT_LEADER_FOR_PARTITION时,kafkajs包总是失败。

UPDATE Python client似乎没什么大惊小怪的;所以,我放弃了kafkajs

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/58245089

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档