首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >卡夫卡消费者试图连接到随机主机名,而不是对一个。

卡夫卡消费者试图连接到随机主机名,而不是对一个。
EN

Stack Overflow用户
提问于 2018-07-06 16:45:26
回答 1查看 992关注 0票数 1

我是卡夫卡的新手,并开始探索样例程序。它过去工作时没有任何问题,但是突然间consumer.poll()命令挂起,永远不会返回。谷歌建议检查服务器是可访问的。Producer和Consumer代码运行在同一台机器上,在这台机器上,生产者可以将记录发布到Kafka,但是消费者投票方法挂起。

环境:

Kafka版本: 1.1.0

客户端: Java

在窗口内的Ubuntu码头容器中运行

Zoo门将和2个代理服务器在同一个容器中运行。

当我为客户端代码启用日志记录时,我会看到下面的异常:

代码语言:javascript
复制
2018-07-06 21:24:18 DEBUG NetworkClient:802 - [Consumer clientId=consumer-1, groupId=IDCS_Audit_Event_Consumer] Error connecting to node 4bdce773eb74:9095 (id: 2 rack: null)
java.io.IOException: Can't resolve address: 4bdce773eb74:9095
    at org.apache.kafka.common.network.Selector.doConnect(Selector.java:235)
    at org.apache.kafka.common.network.Selector.connect(Selector.java:214)
    .................
    .................

我不知道为什么消费者试图连接到4bdce773eb74,即使我的代理服务器是192.168.99.100:9094,192.168.99.100:9095。我的完整消费者代码是:

代码语言:javascript
复制
        final String BOOTSTRAP_SERVERS = "192.168.99.100:9094,192.168.99.100:9095";
        final Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "Event_Consumer");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        KafkaConsumer<Long, String> consumer = new KafkaConsumer<Long, String>(props);
        TopicPartition tpLogin = new TopicPartition("login1", 0);
        TopicPartition tpLogout = new TopicPartition("logout1", 1);
        List<TopicPartition> tps = Arrays.asList(tpLogin, tpLogout);
        consumer.assign(tps);
        while (true) {
            final ConsumerRecords<Long, String> consumerRecords = consumer.poll(1000);
            if (consumerRecords.count()==0) {
                continue;
            }
            consumerRecords.forEach(record -> {
                System.out.printf("Consumer Record:(%d, %s, %d, %d)\n", record.key(), record.value(),
                        record.partition(), record.offset());
            });

            consumer.commitAsync();
            Thread.sleep(5000);
        }
    }

请在这个问题上提供帮助。

正如我前面所说,编辑,我有两个经纪人,比如broker-1和broker-2。如果我停止broker-1,那么上面的异常不会被记录,但是poll()方法仍然没有返回。下面的消息无限期记录,如果我停止broker-1:

代码语言:javascript
复制
2018-07-07 11:31:24 DEBUG AbstractCoordinator:579 - [Consumer clientId=consumer-1, groupId=IDCS_Audit_Event_Consumer] Sending FindCoordinator request to broker 192.168.99.100:9094 (id: 1 rack: null)
2018-07-07 11:31:24 DEBUG AbstractCoordinator:590 - [Consumer clientId=consumer-1, groupId=IDCS_Audit_Event_Consumer] Received FindCoordinator response ClientResponse(receivedTimeMs=1530943284196, latencyMs=2, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, clientId=consumer-1, correlationId=573), responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=COORDINATOR_NOT_AVAILABLE, node=:-1 (id: -1 rack: null)))
2018-07-07 11:31:24 DEBUG AbstractCoordinator:613 - [Consumer clientId=consumer-1, groupId=IDCS_Audit_Event_Consumer] Group coordinator lookup failed: The coordinator is not available.
2018-07-07 11:31:24 DEBUG AbstractCoordinator:227 - [Consumer clientId=consumer-1, groupId=IDCS_Audit_Event_Consumer] Coordinator discovery failed, refreshing metadata

提前谢谢你,索曼

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-07-07 08:37:01

我发现了问题。在创建主题时,broker -0(运行于端口:9093;broker id:0)和broker-2(在端口上运行:9094;代理id:2)正在运行。今天,我错误地启动了broker -1(运行于端口:9095;broker id:1)和broker-2。停止broker-1并启动broker-0之后,解决此问题。现在消费者可以得到事件。

我肯定是人为的错误,但我有两个评论:

  1. 我认为Kafka应该优雅地使用broker-2(端口号:9094),而忽略broker-1(端口号:9095)。
  2. 为什么卡夫卡试图联系4bdce773eb74:9095,而不是正确的IP地址(192.168.99.100)?

谢谢。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51214674

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档