我是卡夫卡的新手,并开始探索样例程序。它过去工作时没有任何问题,但是突然间consumer.poll()命令挂起,永远不会返回。谷歌建议检查服务器是可访问的。Producer和Consumer代码运行在同一台机器上,在这台机器上,生产者可以将记录发布到Kafka,但是消费者投票方法挂起。
环境:
Kafka版本: 1.1.0
客户端: Java
在窗口内的Ubuntu码头容器中运行
Zoo门将和2个代理服务器在同一个容器中运行。
当我为客户端代码启用日志记录时,我会看到下面的异常:
2018-07-06 21:24:18 DEBUG NetworkClient:802 - [Consumer clientId=consumer-1, groupId=IDCS_Audit_Event_Consumer] Error connecting to node 4bdce773eb74:9095 (id: 2 rack: null)
java.io.IOException: Can't resolve address: 4bdce773eb74:9095
at org.apache.kafka.common.network.Selector.doConnect(Selector.java:235)
at org.apache.kafka.common.network.Selector.connect(Selector.java:214)
.................
.................我不知道为什么消费者试图连接到4bdce773eb74,即使我的代理服务器是192.168.99.100:9094,192.168.99.100:9095。我的完整消费者代码是:
final String BOOTSTRAP_SERVERS = "192.168.99.100:9094,192.168.99.100:9095";
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "Event_Consumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<Long, String> consumer = new KafkaConsumer<Long, String>(props);
TopicPartition tpLogin = new TopicPartition("login1", 0);
TopicPartition tpLogout = new TopicPartition("logout1", 1);
List<TopicPartition> tps = Arrays.asList(tpLogin, tpLogout);
consumer.assign(tps);
while (true) {
final ConsumerRecords<Long, String> consumerRecords = consumer.poll(1000);
if (consumerRecords.count()==0) {
continue;
}
consumerRecords.forEach(record -> {
System.out.printf("Consumer Record:(%d, %s, %d, %d)\n", record.key(), record.value(),
record.partition(), record.offset());
});
consumer.commitAsync();
Thread.sleep(5000);
}
}请在这个问题上提供帮助。
正如我前面所说,编辑,我有两个经纪人,比如broker-1和broker-2。如果我停止broker-1,那么上面的异常不会被记录,但是poll()方法仍然没有返回。下面的消息无限期记录,如果我停止broker-1:
2018-07-07 11:31:24 DEBUG AbstractCoordinator:579 - [Consumer clientId=consumer-1, groupId=IDCS_Audit_Event_Consumer] Sending FindCoordinator request to broker 192.168.99.100:9094 (id: 1 rack: null)
2018-07-07 11:31:24 DEBUG AbstractCoordinator:590 - [Consumer clientId=consumer-1, groupId=IDCS_Audit_Event_Consumer] Received FindCoordinator response ClientResponse(receivedTimeMs=1530943284196, latencyMs=2, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, clientId=consumer-1, correlationId=573), responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=COORDINATOR_NOT_AVAILABLE, node=:-1 (id: -1 rack: null)))
2018-07-07 11:31:24 DEBUG AbstractCoordinator:613 - [Consumer clientId=consumer-1, groupId=IDCS_Audit_Event_Consumer] Group coordinator lookup failed: The coordinator is not available.
2018-07-07 11:31:24 DEBUG AbstractCoordinator:227 - [Consumer clientId=consumer-1, groupId=IDCS_Audit_Event_Consumer] Coordinator discovery failed, refreshing metadata提前谢谢你,索曼
发布于 2018-07-07 08:37:01
我发现了问题。在创建主题时,broker -0(运行于端口:9093;broker id:0)和broker-2(在端口上运行:9094;代理id:2)正在运行。今天,我错误地启动了broker -1(运行于端口:9095;broker id:1)和broker-2。停止broker-1并启动broker-0之后,解决此问题。现在消费者可以得到事件。
我肯定是人为的错误,但我有两个评论:
谢谢。
https://stackoverflow.com/questions/51214674
复制相似问题