首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >当使用3个代理群集创建新主题时,kafka broker连接失败

当使用3个代理群集创建新主题时,kafka broker连接失败
EN

Stack Overflow用户
提问于 2022-01-17 04:50:35
回答 1查看 434关注 0票数 0

我试图建立一个卡夫卡集群与3个经纪人在码头。

问题是:当我执行一个操作(即创建/列表/删除主题)时,总是有一个代理连接失败并重新启动Docker容器。此问题不会发生在2或单个代理的集群上。

我的复制步骤是:

  • Run docker-编写由一个卡夫卡容器组成的
  • Open并创建一个主题--这个,一个随机代理被断开并从集群中删除。有时,上述执行的响应是指出复制因子不能大于2的错误(因为1个代理已从集群中删除)

我是卡夫卡的新手。我想我只是有一些愚蠢的错误,但我不知道是什么。我在文档中搜索,但还没有找到。

这是我的停靠-撰写文件:

代码语言:javascript
复制
version: "3.9"

networks:
  kafka-cluster:
    driver: bridge

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      # ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 2181
      # ZOOKEEPER_TICK_TIME: 2000
      # ZOOKEEPER_SERVERS: "zookeeper:22888:23888"
      KAFKA_OPTS: "-Dzookeeper.4lw.commands.whitelist=*"
    ports:
      - 2181:2181
    restart: unless-stopped
    networks:
      - kafka-cluster

  kafka1:
    image: confluentinc/cp-kafka:latest
    container_name: kafka1
    depends_on:
      - zookeeper
    ports:
      - "9093:9093"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: CLIENT://:9092,EXTERNAL://:9093
      KAFKA_ADVERTISED_LISTENERS: CLIENT://kafka1:9092,EXTERNAL://localhost:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: CLIENT
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
    restart: unless-stopped
    networks:
      - kafka-cluster

  kafka2:
    image: confluentinc/cp-kafka:latest
    container_name: kafka2
    depends_on:
      - zookeeper
    ports:
      - "9094:9094"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: CLIENT://:9092,EXTERNAL://:9094
      KAFKA_ADVERTISED_LISTENERS: CLIENT://kafka2:9092,EXTERNAL://localhost:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: CLIENT
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
    restart: unless-stopped
    networks:
      - kafka-cluster

  kafka3:
    image: confluentinc/cp-kafka:latest
    container_name: kafka3
    depends_on:
      - zookeeper
    ports:
      - "9095:9095"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: CLIENT://:9092,EXTERNAL://:9095
      KAFKA_ADVERTISED_LISTENERS: CLIENT://kafka3:9092,EXTERNAL://localhost:9095
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: CLIENT
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
    restart: unless-stopped
    networks:
      - kafka-cluster

  kafdrop:
    image: obsidiandynamics/kafdrop:latest
    container_name: kafdrop
    ports:
      - "9000:9000"
    environment:
      - KAFKA_BROKERCONNECT=kafka1:9092,kafka2:9092,kafka3:9092
      - JVM_OPTS="-Xms32M -Xmx64M"
      - SERVER_SERVLET_CONTEXTPATH="/"
    depends_on:
      - kafka1
    networks:
      - kafka-cluster

下面是其他两个代理的错误日志:

代码语言:javascript
复制
[2022-01-17 04:32:40,078] WARN [ReplicaFetcher replicaId=1002, leaderId=1001, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=1002, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={test-topic-3-1=PartitionData(fetchOffset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty), test-topic-2-1=PartitionData(fetchOffset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=28449961, epoch=INITIAL), rackId=) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to kafka1:9092 (id: 1001 rack: null) failed.
    at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
    at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:104)
    at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:218)
    at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:321)
    at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:137)
    at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:136)
    at scala.Option.foreach(Option.scala:437)
    at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:136)
    at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:119)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2022-01-17 04:32:42,088] WARN [ReplicaFetcher replicaId=1002, leaderId=1001, fetcherId=0] Connection to node 1001 (kafka1/192.168.48.3:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2022-01-17 04:32:42,088] INFO [ReplicaFetcher replicaId=1002, leaderId=1001, fetcherId=0] Error sending fetch request (sessionId=28449961, epoch=INITIAL) to node 1001: (org.apache.kafka.clients.FetchSessionHandler)
java.io.IOException: Connection to kafka1:9092 (id: 1001 rack: null) failed.
    at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
    at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:104)
    at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:218)
    at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:321)
    at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:137)
    at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:136)
    at scala.Option.foreach(Option.scala:437)
    at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:136)
    at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:119)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
EN

回答 1

Stack Overflow用户

发布于 2022-01-17 05:37:07

假设您不需要主机连接(因为您直接在容器中运行Kafka命令),您可以大大简化您的撰写文件

defaults.

  • Remove
  1. 删除主机端口
  2. ,删除非CLIENT侦听器,并坚持使用
  3. 组成网络(用于调试),因为其中一个是自动创建的

总之,你最终会有这样的结果

代码语言:javascript
复制
x-kafka-setup: &kafka-setup
  KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
  ALLOW_PLAINTEXT_LISTENER: 'yes'

version: "3.8"
services:
  zookeeper:
    image: docker.io/bitnami/zookeeper:3.7
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka1:
    image: &broker-image docker.io/bitnami/kafka:3
    environment:
      KAFKA_BROKER_ID: 1
      <<: *kafka-setup
    depends_on:
      - zookeeper
  kafka2:
    image: *broker-image
    environment:
      KAFKA_BROKER_ID: 2
      <<: *kafka-setup
    depends_on:
      - zookeeper
  kafka3:
    image: *broker-image
    environment:
      KAFKA_BROKER_ID: 3
      <<: *kafka-setup
    depends_on:
      - zookeeper

  kafdrop:
    image: obsidiandynamics/kafdrop:latest
    ports:
      - "9000:9000"
    environment:
      KAFKA_BROKERCONNECT: kafka1:9092,kafka2:9092,kafka3:9092
      JVM_OPTS: "-Xms32M -Xmx64M"
      SERVER_SERVLET_CONTEXTPATH: /
    depends_on:
      - kafka1
      - kafka2
      - kafka3

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70736655

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档