首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >关于kubernetes的kafka不能产生/消费主题(ClosedChannelException,ErrorLoggingCallback)

关于kubernetes的kafka不能产生/消费主题(ClosedChannelException,ErrorLoggingCallback)
EN

Stack Overflow用户
提问于 2016-06-11 08:11:30
回答 1查看 4.3K关注 0票数 7

我运行1卡夫卡和3动物园管理员-服务器在码头上库伯内特斯遵循这个说明。我不能在pod(坞容器)之外生成/使用主题。

代码语言:javascript
复制
bin/kafka-console-producer.sh --broker-list 1.2.3.4:9092 --topic

[2016-06-11 15:14:46,889] ERROR Error when sending message to topic test with key: null, value: 3 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 3 record(s) expired due to timeout while requesting metadata from brokers for test-0

bin/kafka-console-consumer.sh --zookeeper 5.6.7.8:2181 --topic test --from-beginning 

[2016-06-11 15:15:58,985] WARN Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [BrokerEndPoint(1001,kafka-service,9092)] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-06-11 15:15:58,992] WARN [console-consumer-66869_tattoo-NV49C-1465629357799-ce1529da-leader-finder-thread], Failed to find leader for Set([test,0]) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(BrokerEndPoint(1001,kafka-service,9092))] failed
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
Caused by: java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
    ... 3 more


kafka log:
    [2016-06-11 07:47:58,269] INFO [Kafka Server 1001], started (kafka.server.KafkaServer)
[2016-06-11 07:53:50,404] INFO [ReplicaFetcherManager on broker 1001] Removed fetcher for partitions [test,0] (kafka.    server.ReplicaFetcherManager)
[2016-06-11 07:53:50,443] INFO Completed load of log test-0 with log end offset 0 (kafka.log.Log)
[2016-06-11 07:53:50,458] INFO Created log for partition [test,0] in /kafka/kafka-logs-kafka-controller-3rsv3 with     properties {compression.type -> producer, message.format.version -> 0.10.0-IV1, file.delete.delay.ms -> 60000, max.    message.bytes -> 1000012, message.timestamp.type -> CreateTime, min.insync.replicas -> 1, segment.jitter.ms -> 0,     preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable     -> true, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> delete, flush.ms ->     9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, message.    timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages ->     9223372036854775807}. (kafka.log.LogManager)
[2016-06-11 07:53:50,459] INFO Partition [test,0] on broker 1001: No checkpointed highwatermark is found for     partition [test,0] (kafka.cluster.Partition)
[2016-06-11 07:57:57,955] INFO [Group Metadata Manager on Broker 1001]: Removed 0 expired offsets in 0 milliseconds. (    kafka.coordinator.GroupMetadataManager)

config/server.properties

代码语言:javascript
复制
broker.id=-1
log.dirs=/kafka/kafka-logs-kafka-controller-3rsv3
num.partitions=1
zookeeper.connect=zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
zookeeper.connection.timeout.ms=6000

service.port.9092.tcp.addr=10.254.68.65
service.port.9092.tcp.proto=tcp
service.service.port.kafka.port=9092
service.service.port=9092
service.port=tcp://10.254.68.65:9092
service.port.9092.tcp.port=9092
version=0.10.0.0
service.service.host=10.254.68.65
port=9092
advertised.host.name=kafka-service
service.port.9092.tcp=tcp://10.254.68.65:9092
advertised.port=9092

但我可以做bin/kafka-console-producer.sh --broker-list localhost:9092 --topicbin/kafka-console-consumer.sh --zookeeper 5.6.7.8:2181 --topic test --from-beginning,如果我在舱内(码头集装箱)。

当连接到动物园管理员的服务时,我可以正常地创建和列出主题:

代码语言:javascript
复制
bin/kafka-topics.sh --describe --zookeeper 5.6.7.8:2181 --topic test
Topic:test  PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: test Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001

我的yaml文件用于创建kafka复制

代码语言:javascript
复制
---
apiVersion: v1
kind: Service
metadata:
  name: kafka-service2
  labels:
    app: kafka2
spec:
  clusterIP: None
  ports:
  - port: 9092
    name: kafka-port
    targetPort: 9092
    protocol: TCP
  selector:
    app: kafka2
---
apiVersion: v1
kind: ReplicationController
metadata:
  name: kafka-controller2
spec:
  replicas: 1
  selector:
    app: kafka2
  template:
    metadata:
      labels:
        app: kafka2
    spec:
      containers:
      - name: kafka2
        image: wurstmeister/kafka
        ports:
        - containerPort: 9092
        env:
        - name: KAFKA_ADVERTISED_PORT
          value: "9092"
        - name: KAFKA_ADVERTISED_HOST_NAME
          value: kafka-service2
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: zoo1:2181,zoo2:2181,zoo3:2181
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2016-06-13 09:17:44

卡夫卡向动物园管理员注册了它的服务名称。而且,消费/生成消息需要访问服务名称(以下是动物园管理员-1、动物园管理员-2、动物园管理员-3上的dns记录),这些记录只能通过kubernetes的dns访问。所以只有在kubernetes上运行的应用程序才能访问我的kafka。因此,我不能使用卡夫卡的外部IP -服务或端口-转发卡夫卡荚到本地主机,然后访问它。

但是为什么我可以在kubernetes集群之外创建、列出和描述主题呢?我想是因为动物园管理员可以自己做这些手术。但是,消费/生产信息将需要访问卡夫卡提供的ADVERTISED_HOST_NAME。

票数 7
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/37761476

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档