首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spring应用程序无法连接到Kafka broker

Spring应用程序无法连接到Kafka broker
EN

Stack Overflow用户
提问于 2022-06-13 22:49:54
回答 1查看 877关注 0票数 0

我正在尝试使用Kafka作为消息传递系统来构建我的第一个应用程序,但是我在运行它时遇到了一些问题。我使用动物园管理员和来自wurstmeisterKafka与此的码头图片。

代码语言:javascript
复制
version: '3.8'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
    restart: unless-stopped
  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: "toAll:1:1"
    restart: unless-stopped

docker-compose 为我提供了以下输出:

代码语言:javascript
复制
kafka_1      | [2022-06-13 22:08:10,071] INFO [ThrottledChannelReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
kafka_1      | [2022-06-13 22:08:10,073] INFO [ThrottledChannelReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
kafka_1      | [2022-06-13 22:08:10,074] INFO [ThrottledChannelReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
kafka_1      | [2022-06-13 22:08:10,076] INFO [ThrottledChannelReaper-ControllerMutation]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
kafka_1      | [2022-06-13 22:08:10,089] INFO Log directory /kafka/kafka-logs-44d30bad178c not found, creating it. (kafka.log.LogManager)
kafka_1      | [2022-06-13 22:08:10,121] INFO Loading logs from log dirs ArraySeq(/kafka/kafka-logs-44d30bad178c) (kafka.log.LogManager)
kafka_1      | [2022-06-13 22:08:10,123] INFO Attempting recovery for all logs in /kafka/kafka-logs-44d30bad178c since no clean shutdown file was found (kafka.log.LogManager)
kafka_1      | [2022-06-13 22:08:10,128] INFO Loaded 0 logs in 7ms. (kafka.log.LogManager)
kafka_1      | [2022-06-13 22:08:10,128] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
kafka_1      | [2022-06-13 22:08:10,130] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
kafka_1      | [2022-06-13 22:08:10,429] INFO Updated connection-accept-rate max connection creation rate to 2147483647 (kafka.network.ConnectionQuotas)
kafka_1      | [2022-06-13 22:08:10,433] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
kafka_1      | [2022-06-13 22:08:10,465] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1049] Created data-plane acceptor and processors for endpoint : ListenerName(PLAINTEXT) (kafka.network.SocketServer)
kafka_1      | [2022-06-13 22:08:10,489] INFO [broker-1049-to-controller-send-thread]: Starting (kafka.server.BrokerToControllerRequestThread)
kafka_1      | [2022-06-13 22:08:10,504] INFO [ExpirationReaper-1049-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1      | [2022-06-13 22:08:10,505] INFO [ExpirationReaper-1049-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1      | [2022-06-13 22:08:10,506] INFO [ExpirationReaper-1049-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1      | [2022-06-13 22:08:10,506] INFO [ExpirationReaper-1049-ElectLeader]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1      | [2022-06-13 22:08:10,518] INFO [LogDirFailureHandler]: Starting (kafka.server.ReplicaManager$LogDirFailureHandler)
kafka_1      | [2022-06-13 22:08:10,552] INFO Creating /brokers/ids/1049 (is it secure? false) (kafka.zk.KafkaZkClient)
kafka_1      | [2022-06-13 22:08:10,567] INFO Stat of the created znode at /brokers/ids/1049 is: 1191,1191,1655158090560,1655158090560,1,0,0,72057715199770624,202,0,1191
kafka_1      |  (kafka.zk.KafkaZkClient)
kafka_1      | [2022-06-13 22:08:10,567] INFO Registered broker 1049 at path /brokers/ids/1049 with addresses: PLAINTEXT://127.0.0.1:9092, czxid (broker epoch): 1191 (kafka.zk.KafkaZkClient)
kafka_1      | [2022-06-13 22:08:10,623] INFO [ExpirationReaper-1049-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1      | [2022-06-13 22:08:10,627] INFO [ExpirationReaper-1049-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1      | [2022-06-13 22:08:10,628] INFO [ExpirationReaper-1049-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1      | [2022-06-13 22:08:10,640] INFO [GroupCoordinator 1049]: Starting up. (kafka.coordinator.group.GroupCoordinator)
kafka_1      | [2022-06-13 22:08:10,644] INFO [GroupCoordinator 1049]: Startup complete. (kafka.coordinator.group.GroupCoordinator)
kafka_1      | [2022-06-13 22:08:10,664] INFO [ProducerId Manager 1049]: Acquired new producerId block (brokerId:1049,blockStartProducerId:25000,blockEndProducerId:25999) by writing to Zk with path version 26 (kafka.coordinator.transaction.ProducerIdManager)
kafka_1      | [2022-06-13 22:08:10,665] INFO [TransactionCoordinator id=1049] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
kafka_1      | [2022-06-13 22:08:10,668] INFO [TransactionCoordinator id=1049] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
kafka_1      | [2022-06-13 22:08:10,668] INFO [Transaction Marker Channel Manager 1049]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
kafka_1      | [2022-06-13 22:08:10,688] INFO [ExpirationReaper-1049-AlterAcls]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1      | [2022-06-13 22:08:10,704] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
kafka_1      | [2022-06-13 22:08:10,722] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1049] Starting socket server acceptors and processors (kafka.network.SocketServer)
kafka_1      | [2022-06-13 22:08:10,735] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1049] Started data-plane acceptor and processor(s) for endpoint : ListenerName(PLAINTEXT) (kafka.network.SocketServer)
kafka_1      | [2022-06-13 22:08:10,735] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1049] Started socket server acceptors and processors (kafka.network.SocketServer)
kafka_1      | [2022-06-13 22:08:10,739] INFO Kafka version: 2.8.1 (org.apache.kafka.common.utils.AppInfoParser)
kafka_1      | [2022-06-13 22:08:10,739] INFO Kafka commitId: 839b886f9b732b15 (org.apache.kafka.common.utils.AppInfoParser)
kafka_1      | [2022-06-13 22:08:10,740] INFO Kafka startTimeMs: 1655158090735 (org.apache.kafka.common.utils.AppInfoParser)
kafka_1      | [2022-06-13 22:08:10,744] INFO [KafkaServer id=1049] started (kafka.server.KafkaServer)
zookeeper_1  | 2022-06-13 22:08:10,770 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@596] - Got user-level KeeperException when processing sessionid:0x100001c35cf0000 type:multi cxid:0x3b zxid:0x4ab txntype:-1 reqpath:n/a aborting remaining multi ops. Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election
kafka_1      | [2022-06-13 22:08:10,798] INFO [broker-1049-to-controller-send-thread]: Recorded new controller, from now on will use broker 127.0.0.1:9092 (id: 1049 rack: null) (kafka.server.BrokerToControllerRequestThread)
kafka_1      | creating topics: toAll:1:1
zookeeper_1  | 2022-06-13 22:08:19,653 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /172.20.0.3:60284
zookeeper_1  | 2022-06-13 22:08:19,655 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@949] - Client attempting to establish new session at /172.20.0.3:60284
zookeeper_1  | 2022-06-13 22:08:19,659 [myid:] - INFO  [SyncThread:0:ZooKeeperServer@694] - Established session 0x100001c35cf0001 with negotiated timeout 30000 for client /172.20.0.3:60284
zookeeper_1  | 2022-06-13 22:08:19,803 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@487] - Processed session termination for sessionid: 0x100001c35cf0001
zookeeper_1  | 2022-06-13 22:08:19,808 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1056] - Closed socket connection for client /172.20.0.3:60284 which had sessionid 0x100001c35cf0001 

因此,正如上面所说的:from now on will use broker 127.0.0.1:9092creating topics: toAll:1:1 --我猜它应该已经准备好运行我的应用程序了,即:

代码语言:javascript
复制
@SpringBootApplication
@Slf4j
public class KafkaTestApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaTestApplication.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("toAll").build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            String mess = "Test message";
            template.send("toAll", mess);
            log.info(String.format("Message sent: %s", mess));
        };
    }
}

但是当我运行它时,我得到了这个输出:

代码语言:javascript
复制
2022-06-14 00:25:25.148  INFO 1108 --- [           main] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
    bootstrap.servers = [localhost:9092]
    client.dns.lookup = use_all_dns_ips
    client.id = 
    connections.max.idle.ms = 300000
    default.api.timeout.ms = 60000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.connect.timeout.ms = null
    sasl.login.read.timeout.ms = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.login.retry.backoff.max.ms = 10000
    sasl.login.retry.backoff.ms = 100
    sasl.mechanism = GSSAPI
    sasl.oauthbearer.clock.skew.seconds = 30
    sasl.oauthbearer.expected.audience = null
    sasl.oauthbearer.expected.issuer = null
    sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
    sasl.oauthbearer.jwks.endpoint.url = null
    sasl.oauthbearer.scope.claim.name = scope
    sasl.oauthbearer.sub.claim.name = sub
    sasl.oauthbearer.token.endpoint.url = null
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    socket.connection.setup.timeout.max.ms = 30000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS

2022-06-14 00:25:25.254  INFO 1108 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.1.1
2022-06-14 00:25:25.255  INFO 1108 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 97671528ba54a138
2022-06-14 00:25:25.255  INFO 1108 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1655159125253
2022-06-14 00:25:27.299  INFO 1108 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Node -1 disconnected.
2022-06-14 00:25:27.301  WARN 1108 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2022-06-14 00:25:29.454  INFO 1108 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Node -1 disconnected.
2022-06-14 00:25:29.454  WARN 1108 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2022-06-14 00:25:31.724  INFO 1108 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Node -1 disconnected.
2022-06-14 00:25:31.724  WARN 1108 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

最后两行重复一段时间,最后抛出一个异常。

我和这个问题斗争了两天,试着为卡夫卡做一些其他的设置,比如kafka_listeners,kafka_advertised_listeners等等。他们中的一些使Kafka无法正常运行,有些改变了IP,但没有使我的应用程序提供不同的输出。你有什么想法可以导致这个问题,以及如何解决?

提前谢了。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-06-13 22:58:45

查看docker ps的输出

如果没有看到0.0.0.0:9092->9092/tcp,则没有将主机转发到所需的KAFKA_ADVERTISED_PORT

在compose中以这种方式选择一个随机的主机端口,将9092映射到容器中,因此localhost:9092不是主机上的一个有效的开放端口。

代码语言:javascript
复制
 ports:
      - "9092"

相关Connect to Kafka running in Docker

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72609862

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档