我遇到了一个关于apache Kafka的问题,我不明白。我在我的代理中订阅了一个名为" topic -received“的主题。代码如下:
protected String readResponse(final String idMessage) {
if (props != null) {
kafkaClient = new KafkaConsumer<>(props);
logger.debug("Subscribed to topic-received");
kafkaClient.subscribe(Arrays.asList("topic-received"));
logger.debug("Waiting for reading : topic-received");
ConsumerRecords<String, String> records =
kafkaClient.poll(kafkaConfig.getRead_timeout());
if (records != null) {
for (ConsumerRecord<String, String> record : records) {
logger.debug("Resultado devuelto : "+record.value());
return record.value();
}
}
}
return null;
}当这种情况发生时,我从另一个角度向"topic-received“发送了一条消息。代码如下:
private void sendMessageToKafkaBroker(String idTopic, String value) {
Producer<String, String> producer = null;
try {
producer = new KafkaProducer<String, String>(mapProperties());
ProducerRecord<String, String> producerRecord = new
ProducerRecord<String, String>("topic-received", value);
producer.send(producerRecord);
logger.info("Sended value "+value+" to topic-received");
} catch (ExceptionInInitializerError eix) {
eix.printStackTrace();
} catch (KafkaException ke) {
ke.printStackTrace();
} finally {
if (producer != null) {
producer.close();
}
}
}当我第一次尝试使用主题" topic -received“时,我得到了这样的警告
"WARN 13164 --- [nio-8085-exec-3] org.apache.kafka.clients.NetworkClient :
Error while fetching metadata with correlation id 1 : {topic-
received=LEADER_NOT_AVAILABLE}"但是如果我再试一次,对于这个主题" topic -received",工作正常,并且不会出现任何警告。无论如何,这对我来说是没有用的,因为我每次都必须从一个主题监听并发送到一个新的主题(由字符串标识符引用,例如:..12Erw45-2345Saf-234DASDFasd )
为了在谷歌中搜索LEADER_NOT_AVAILABLE,一些人谈到了在server.properties中添加以下几行:
host.name=127.0.0.1
advertised.port=9092
advertised.host.name=127.0.0.1但它对我不起作用(不知道为什么)。
在这个过程之前,我试着用下面的代码创建主题:
private void createTopic(String idTopic) {
String zookeeperConnect = "localhost:2181";
ZkClient zkClient = new ZkClient(zookeeperConnect,10000,10000,
ZKStringSerializer$.MODULE$);
ZkUtils zkUtils = new ZkUtils(zkClient, new
ZkConnection(zookeeperConnect),false);
if(!AdminUtils.topicExists(zkUtils,idTopic)) {
AdminUtils.createTopic(zkUtils, idTopic, 2, 1, new Properties(),
null);
logger.debug("Created topic "+idTopic+" by super user");
}
else{
logger.debug("topic "+idTopic+" already exists");
}
}没有错误,但它仍然会一直监听到超时。
我已经检查了代理的属性,以检查是否有任何帮助,但我没有发现任何足够清楚的东西。我用来阅读的道具有:
props = new Properties();
props.put("bootstrap.servers", kafkaConfig.getBootstrap_servers());
props.put("key.deserializer", kafkaConfig.getKey_deserializer());
props.put("value.deserializer", kafkaConfig.getValue_deserializer());
props.put("key.serializer", kafkaConfig.getKey_serializer());
props.put("value.serializer", kafkaConfig.getValue_serializer());
props.put("group.id",kafkaConfig.getGroupId());还有,发送..。
Properties props = new Properties();
props.put("bootstrap.servers", kafkaConfig.getHost() + ":" +
kafkaConfig.getPort());
props.put("group.id", kafkaConfig.getGroup_id());
props.put("enable.auto.commit", kafkaConfig.getEnable_auto_commit());
props.put("auto.commit.interval.ms",
kafkaConfig.getAuto_commit_interval_ms());
props.put("session.timeout.ms", kafkaConfig.getSession_timeout_ms());
props.put("key.deserializer", kafkaConfig.getKey_deserializer());
props.put("value.deserializer", kafkaConfig.getValue_deserializer());
props.put("key.serializer", kafkaConfig.getKey_serializer());
props.put("value.serializer", kafkaConfig.getValue_serializer());有什么线索吗?为什么,我必须使用来自代理和主题的消息的唯一方式是在错误后重复请求?
提前感谢
发布于 2018-09-07 01:02:49
当尝试向不存在的主题生成消息时,就会发生这种情况
请注意:在一些Kafka安装中,框架可以在主题不存在的时候自动创建主题,这就解释了为什么你在一开始只看到一次问题。
发布于 2019-08-07 12:26:53
当您的主题名称不存在时,会出现此错误。
要列出所有主题,请执行以下命令:
kafka-topics --list --zookeper localhost:2181https://stackoverflow.com/questions/39718975
复制相似问题