首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何进入卡夫卡桥

如何进入卡夫卡桥
EN

Stack Overflow用户
提问于 2020-11-23 14:14:09
回答 3查看 1.8K关注 0票数 0

我正在使用strimzi,我想学习如何使用Kafka桥,并了解它的工作原理,我使用以下yml文件创建了一个Kafka集群

代码语言:javascript
复制
apiVersion: kafka.strimzi.io/v1beta1

kind: Kafka

metadata:

  name: my-cluster

spec:

  kafka:

    version: 2.6.0

    replicas: 3

    listeners:

      - name: plain

        port: 9092

        type: internal

        tls: false

      - name: tls

        port: 9093

        type: internal

        tls: true

    config:

      offsets.topic.replication.factor: 3

      transaction.state.log.replication.factor: 3

      transaction.state.log.min.isr: 2

      log.message.format.version: "2.6"

    storage:

      type: jbod

      volumes:

      - id: 0

        type: persistent-claim

        size: 100Gi

        deleteClaim: false

  zookeeper:

    replicas: 3

    storage:

      type: persistent-claim

      size: 100Gi

      deleteClaim: false

  entityOperator:

    topicOperator: {}

    userOperator: {}

以下是卡夫卡桥的yml语法

代码语言:javascript
复制
apiVersion: kafka.strimzi.io/v1alpha1

kind: KafkaBridge

metadata:

  name: my-bridge

spec:

  replicas: 1

  bootstrapServers: my-cluster-kafka-bootstrap:9092

  http:

    port: 8080

现在是服务

代码语言:javascript
复制
ist@ist-1207:~$ kubectl   get svc -A

NAMESPACE     NAME                             TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)                      AGE

default       kubernetes                       ClusterIP   10.96.0.1        <none>        443/TCP                      3d1h

kube-system   kube-dns                         ClusterIP   10.96.0.10       <none>        53/UDP,53/TCP,9153/TCP       3d1h

strimzi       my-bridge-bridge-service         ClusterIP   10.100.153.164   <none>        8080/TCP                     88m

strimzi       my-cluster-kafka-bootstrap       ClusterIP   10.97.160.117    <none>        9091/TCP,9092/TCP,9093/TCP   109m

strimzi       my-cluster-kafka-brokers         ClusterIP   None             <none>        9091/TCP,9092/TCP,9093/TCP   109m

strimzi       my-cluster-zookeeper-client      ClusterIP   10.107.59.225    <none>        2181/TCP                     110m

strimzi       my-cluster-zookeeper-nodes       ClusterIP   None             <none>        2181/TCP,2888/TCP,3888/TCP   110m

strimzi       my-connect-cluster-connect-api   ClusterIP   10.110.27.15     <none>        8083/TCP                     25m

现在,我使用以下命令来执行结束符:

代码语言:javascript
复制
$ kubectl  exec -it my-bridge-bridge-684df9fc64-bktc4 -n strimzi bash

在那之后

代码语言:javascript
复制
[strimzi@my-bridge-bridge-684df9fc64-bktc4 strimzi]$ curl -X POST http://localhost:8080/consumers/my-group   -H 'content-type: application/vnd.kafka.v2+json'   -d '{
    "name": "your-consumer",
    "format": "json",
    "auto.offset.reset": "earliest",
    "enable.auto.commit": false
}'
{"instance_id":"your-consumer","base_uri":"http://localhost:8080/consumers/my-group/instances/your-consumer"}

然后

代码语言:javascript
复制
[strimzi@my-bridge-bridge-684df9fc64-bktc4 strimzi]$   curl -X POST http://localhost:8080/consumers/my-group/instances/your-consumer/subscription   -H 'content-type: application/vnd.kafka.v2+json'   -d '{
    "topics": [
        "your-topic"
    ]
}' 

然后

代码语言:javascript
复制
[strimzi@my-bridge-bridge-684df9fc64-bktc4 strimzi]$   curl -X GET http://localhost:8080/consumers/my-group/instances/your-consumer/records \
>   -H 'accept: application/vnd.kafka.json.v2+json'
[]

然后我制作了

代码语言:javascript
复制
[strimzi@my-bridge-bridge-684df9fc64-bktc4 strimzi]$ curl -X POST \
>   http://localhost:8080/topics/your-topic \
>   -H 'content-type: application/vnd.kafka.json.v2+json' \
>   -d '{
>     "records": [
>         {
>             "key": "key-1",
>             "value": "kajal verma"
>         },
>         {
>             "key": "key-2",
>             "value": "Aman verma"
>         }
>     ]
> }'
{"offsets":[{"partition":0,"offset":2},{"partition":0,"offset":3}]}


[strimzi@my-bridge-bridge-684df9fc64-bktc4 strimzi]$   curl -X GET http://localhost:8080/consumers/my-group/instances/your-consumer/records \
>   -H 'accept: application/vnd.kafka.json.v2+json'
[]

再说一次,我没有得到记录,不知道为什么。

这些是吊桥的原木

代码语言:javascript
复制
ist@ist-1207:~$ kubectl logs -f my-bridge-bridge-684df9fc64-lstx2 -n strimzi
Kafka Bridge configuration:
#Bridge configuration
bridge.id=my-bridge

#Kafka common properties
kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092
kafka.security.protocol=PLAINTEXT

#Apache Kafka Producer

#Apache Kafka Consumer

#HTTP configuration
http.enabled=true
http.host=0.0.0.0
http.port=8080
http.cors.enabled=false
http.cors.allowedOrigins=
http.cors.allowedMethods=

[2020-11-24 11:42:01,374] INFO  <Application :64> [main        ] Strimzi Kafka Bridge 0.19.0 is starting
[2020-11-24 11:42:02,939] WARN  <onMetaSchema:337> [oop-thread-1] Unknown keyword example - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
[2020-11-24 11:42:03,402] INFO  <HttpBridge  :180> [oop-thread-1] Starting HTTP-Kafka bridge verticle...
[2020-11-24 11:42:03,459] INFO  <ClientConfig:354> [oop-thread-1] AdminClientConfig values: 
    bootstrap.servers = [my-cluster-kafka-bootstrap:9092]
    client.dns.lookup = use_all_dns_ips
    client.id = 
    connections.max.idle.ms = 300000
    default.api.timeout.ms = 60000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS

[2020-11-24 11:42:04,336] INFO  <ppInfoParser:117> [oop-thread-1] Kafka version: 2.6.0
[2020-11-24 11:42:04,336] INFO  <ppInfoParser:118> [oop-thread-1] Kafka commitId: 62abe01bee039651
[2020-11-24 11:42:04,338] INFO  <ppInfoParser:119> [oop-thread-1] Kafka startTimeMs: 1606218124265
Nov 24, 2020 11:42:05 AM io.vertx.core.impl.BlockedThreadChecker
WARNING: Thread Thread[vert.x-eventloop-thread-1,5,main]=Thread[vert.x-eventloop-thread-1,5,main] has been blocked for 2656 ms, time limit is 2000 ms
Nov 24, 2020 11:42:06 AM io.vertx.core.impl.BlockedThreadChecker
WARNING: Thread Thread[vert.x-eventloop-thread-1,5,main]=Thread[vert.x-eventloop-thread-1,5,main] has been blocked for 3869 ms, time limit is 2000 ms
[2020-11-24 11:42:07,589] INFO  <HttpBridge  :102> [oop-thread-1] HTTP-Kafka Bridge started and listening on port 8080
[2020-11-24 11:42:07,604] INFO  <HttpBridge  :103> [oop-thread-1] HTTP-Kafka Bridge bootstrap servers my-cluster-kafka-bootstrap:9092
[2020-11-24 11:42:07,609] INFO  <Application :219> [oop-thread-0] HTTP verticle instance deployed [6bc2ded6-162c-4444-b8ac-f51c0573e389]
[2020-11-24 11:44:15,295] WARN  <etworkClient:757> [dminclient-1] [AdminClient clientId=adminclient-1] Connection to node -1 (my-cluster-kafka-bootstrap/10.97.102.188:9092) could not be established. Broker may not be available.
[2020-11-24 11:44:15,301] INFO  <adataManager:235> [dminclient-1] [AdminClient clientId=adminclient-1] Metadata update failed
org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, deadlineMs=1606218154377, tries=1, nextAllowedTryMs=1606218255400) timed out at 1606218255300 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
[2020-11-24 11:46:09,397] INFO  <eateConsumer:85> [oop-thread-1] [9562016] CREATE_CONSUMER Request: from 127.0.0.1:34080, method = POST, path = /consumers/my-group
[2020-11-24 11:46:09,508] INFO  <nsumerConfig:354> [oop-thread-1] ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [my-cluster-kafka-bootstrap:9092]
    check.crcs = true
    client.dns.lookup = use_all_dns_ips
    client.id = your-consumer
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = my-group
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    internal.throw.on.fetch.stable.offset.unsupported = false
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

[2020-11-24 11:46:09,759] INFO  <ppInfoParser:117> [oop-thread-1] Kafka version: 2.6.0
[2020-11-24 11:46:09,759] INFO  <ppInfoParser:118> [oop-thread-1] Kafka commitId: 62abe01bee039651
[2020-11-24 11:46:09,760] INFO  <ppInfoParser:119> [oop-thread-1] Kafka startTimeMs: 1606218369758
[2020-11-24 11:46:09,775] INFO  <idgeEndpoint:140> [oop-thread-1] Created consumer your-consumer in group my-group
[2020-11-24 11:46:09,781] INFO  <eateConsumer:85> [oop-thread-1] [9562016] CREATE_CONSUMER Response:  statusCode = 200, message = OK
[2020-11-24 11:46:33,188] INFO  <subscribe   :85> [oop-thread-1] [343058276] SUBSCRIBE Request: from 127.0.0.1:34370, method = POST, path = /consumers/my-group/instances/your-consumer/subscription
[2020-11-24 11:46:33,199] INFO  <idgeEndpoint:191> [oop-thread-1] Subscribe to topics [SinkTopicSubscription(topic=your-topic,partition=null,offset=null)]
[2020-11-24 11:46:33,224] INFO  <subscribe   :85> [oop-thread-1] [343058276] SUBSCRIBE Response:  statusCode = 200, message = OK
[2020-11-24 11:46:33,231] INFO  <afkaConsumer:965> [mer-thread-0] [Consumer clientId=your-consumer, groupId=my-group] Subscribed to topic(s): your-topic
[2020-11-24 11:46:58,713] INFO  <poll        :85> [oop-thread-1] [690326198] POLL Request: from 127.0.0.1:34814, method = GET, path = /consumers/my-group/instances/your-consumer/records
[2020-11-24 11:46:58,756] INFO  <poll        :85> [oop-thread-1] [690326198] POLL Response:  statusCode = 200, message = OK
[2020-11-24 11:47:25,358] INFO  <send        :85> [oop-thread-1] [859539310] SEND Request: from 127.0.0.1:35210, method = POST, path = /topics/your-topic
[2020-11-24 11:47:25,474] INFO  <oducerConfig:354> [oop-thread-1] ProducerConfig values: 
    acks = 1
    batch.size = 16384
    bootstrap.servers = [my-cluster-kafka-bootstrap:9092]
    buffer.memory = 33554432
    client.dns.lookup = use_all_dns_ips
    client.id = producer-1
    compression.type = none
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 120000
    enable.idempotence = false
    interceptor.classes = []
    internal.auto.downgrade.txn.commit = false
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metadata.max.idle.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

[2020-11-24 11:47:25,611] INFO  <ppInfoParser:117> [oop-thread-1] Kafka version: 2.6.0
[2020-11-24 11:47:25,631] INFO  <ppInfoParser:118> [oop-thread-1] Kafka commitId: 62abe01bee039651
[2020-11-24 11:47:25,631] INFO  <ppInfoParser:119> [oop-thread-1] Kafka startTimeMs: 1606218445608
[2020-11-24 11:47:25,649] INFO  <oducerConfig:354> [oop-thread-1] ProducerConfig values: 
    acks = 0
    batch.size = 16384
    bootstrap.servers = [my-cluster-kafka-bootstrap:9092]
    buffer.memory = 33554432
    client.dns.lookup = use_all_dns_ips
    client.id = producer-2
    compression.type = none
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 120000
    enable.idempotence = false
    interceptor.classes = []
    internal.auto.downgrade.txn.commit = false
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metadata.max.idle.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

[2020-11-24 11:47:25,670] INFO  <ppInfoParser:117> [oop-thread-1] Kafka version: 2.6.0
[2020-11-24 11:47:25,671] INFO  <ppInfoParser:118> [oop-thread-1] Kafka commitId: 62abe01bee039651
[2020-11-24 11:47:25,672] INFO  <ppInfoParser:119> [oop-thread-1] Kafka startTimeMs: 1606218445665
[2020-11-24 11:47:25,673] INFO  <Metadata    :279> [| producer-1] [Producer clientId=producer-1] Cluster ID: MJQu4vwtR4iZ7eIiDp7zLg
[2020-11-24 11:47:25,685] INFO  <Metadata    :279> [| producer-2] [Producer clientId=producer-2] Cluster ID: MJQu4vwtR4iZ7eIiDp7zLg
[2020-11-24 11:47:25,714] INFO  <send        :85> [oop-thread-1] [859539310] SEND Response:  statusCode = 200, message = OK
[2020-11-24 11:47:25,938] INFO  <afkaProducer:1189> [ker-thread-7] [Producer clientId=producer-2] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[2020-11-24 11:47:25,944] INFO  <afkaProducer:1189> [ker-thread-7] [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[2020-11-24 11:47:41,679] INFO  <poll        :85> [oop-thread-1] [484189988] POLL Request: from 127.0.0.1:35416, method = GET, path = /consumers/my-group/instances/your-consumer/records
[2020-11-24 11:47:41,682] WARN  <etworkClient:1073> [mer-thread-0] [Consumer clientId=your-consumer, groupId=my-group] Error while fetching metadata with correlation id 2 : {your-topic=LEADER_NOT_AVAILABLE}
[2020-11-24 11:47:41,684] INFO  <Metadata    :279> [mer-thread-0] [Consumer clientId=your-consumer, groupId=my-group] Cluster ID: MJQu4vwtR4iZ7eIiDp7zLg
[2020-11-24 11:47:41,688] INFO  <poll        :85> [oop-thread-1] [484189988] POLL Response:  statusCode = 200, message = OK
[2020-11-24 11:53:01,882] INFO  <poll        :85> [oop-thread-1] [1610292326] POLL Request: from 127.0.0.1:39262, method = GET, path = /consumers/my-group/instances/your-consumer/records
[2020-11-24 11:53:01,883] INFO  <poll        :85> [oop-thread-1] [1610292326] POLL Response:  statusCode = 200, message = OK
[2020-11-24 11:53:01,946] INFO  <tCoordinator:815> [mer-thread-0] [Consumer clientId=your-consumer, groupId=my-group] Discovered group coordinator my-cluster-kafka-2.my-cluster-kafka-brokers.strimzi.svc:9092 (id: 2147483645 rack: null)
[2020-11-24 11:53:01,979] INFO  <tCoordinator:553> [mer-thread-0] [Consumer clientId=your-consumer, groupId=my-group] (Re-)joining group
EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2020-11-23 19:24:11

您正在尝试使用桥梁吊舱中的kafka-topics工具,它不包括卡夫卡二进制文件,因此那里没有kafka-topics工具。您正在尝试执行的命令可以在您部署的卡夫卡吊舱( Kafka Pod)上工作,在该工具可用的地方。总之,我不明白你想要达到的目标和桥牌的使用之间的关系。网桥只是在端口8080上为Kafka提供了一个HTTP接口,因此在Kubernetes集群中的一个吊舱中,您可以在桥端点上获得/发送请求,以便通过HTTP与Kafka进行交互。总体的桥文档可以在这里获得:https://strimzi.io/docs/bridge/latest/,如果您想公开Kubernetes集群之外的桥,那么您可以在这里阅读更多关于它的文章:https://strimzi.io/blog/2019/11/05/exposing-http-bridge/,最后,这里有一个很好的桥接介绍:https://strimzi.io/blog/2019/07/19/http-bridge-intro/

票数 0
EN

Stack Overflow用户

发布于 2020-11-23 14:22:36

对于Strimzi版本并不完全确定,但在我看来,脚本似乎不在$PATH中,或者被称为有点不同。例如,这是来自我的一个集群(不过,我使用的是合流的kafka头盔图表):

代码语言:javascript
复制
root@confluent-kafka-connect-6bf9c944f-fzbgc:/# which kafka-topics
/usr/bin/kafka-topics
root@confluent-kafka-connect-6bf9c944f-fzbgc:/# kafka-topics --help
This tool helps to create, delete, describe, or change a topic.
Option                                   Description         
------                                   -----------                            
--alter                                  Alter the number of partitions,        
                                           replica assignment, and/or           
                                           configuration for the topic.         
(...)                   
票数 0
EN

Stack Overflow用户

发布于 2020-11-23 19:12:59

Kafka服务是一个HTTP服务,没有Kafka工具(如果有,端口8080将不响应该命令)。

如果要列出主题,请使用获取/topics请求

如果您有一个入口或端口,则不应该需要执行到容器中才能完成这一任务。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64970147

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档