我使用的是Kafka 0.11.0.0。我有一个发布到Kafka主题的测试程序;如果zookeeper和Kafka服务器关闭(这在我的开发环境中是正常的;我会根据需要启动它们),那么对KafkaProducer<>.send()的调用就会无限期地挂起。
我需要让send()返回,最好是指示错误;或者我需要一种方法来检查服务器是启动还是关闭。基本上,我希望我的测试工具能够告诉我,“嘿,笨蛋,启动Kafka!”而不是绞死。
有没有办法让我的生产者任务来确定服务器是启动还是关闭?
我像这样调用send():
kafkaProducer.send(new ProducerRecord<>(KAFKA_TOPIC, KAFKA_KEY,
message), (rm, ex) -> {
System.out.println("**** " + rm + "\n**** " +ex);
});我设置linger.ms = 1;我已经尝试了retries=0、1和2,并且仍然使用send()块。我从来没见过回调调用。
较旧的消息建议将metadata.fetch.timeout.ms设置为较小的值,但在0.11中已经消失了。其他人建议调用命令行实用程序来查看服务器是否处于OK...but状态,引用的实用程序似乎也消失了。
做这件事的得体方法是什么?
发布于 2017-11-19 22:58:16
这很奇怪。它应该返回一个错误,指出“无法更新元数据”或“正在过期x个记录”。
检查生产者的request.timeout.ms和max.block.ms设置。缺省情况下,request.timeout.ms的长度为60秒
发布于 2017-11-18 14:22:25
我们可以通过三种方式向代理发送消息:
即发即忘:我们向服务器发送一条消息,实际上并不关心消息是否成功到达。大多数情况下,它会成功到达,因为Kafka的可用性很高,生产者会自动重试发送消息。但是,使用此方法可能会丢失一些消息。
异步发送我们使用回调函数调用send()方法,该函数在接收到来自Kafka broker的响应时触发。
同步发送我们发送一条消息,send()方法返回一个Future对象,我们使用get()等待将来,看看send()是否成功。
同步发送消息的最简单方法如下:
ProducerRecord<String, String> record =
new ProducerRecord<>(KAFKA_TOPIC, KEY, message);
try {
producer.send(record).get();
} catch (Exception e) {
e.printStackTrace();
}这里,我们使用Future.get()来等待Kafka的回复。如果记录没有成功发送给Kafka,该方法会抛出异常。如果没有错误,我们将获得一个RecordMetadata对象,我们可以用它来检索消息被写入的偏移量。
希望这能有所帮助。
https://stackoverflow.com/questions/47357590
复制相似问题