首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Kafka生产者在kafka服务器宕机时无限期发送阻塞

Kafka生产者在kafka服务器宕机时无限期发送阻塞
EN

Stack Overflow用户
提问于 2017-11-18 03:03:20
回答 2查看 5.9K关注 0票数 7

我使用的是Kafka 0.11.0.0。我有一个发布到Kafka主题的测试程序;如果zookeeper和Kafka服务器关闭(这在我的开发环境中是正常的;我会根据需要启动它们),那么对KafkaProducer<>.send()的调用就会无限期地挂起。

我需要让send()返回,最好是指示错误;或者我需要一种方法来检查服务器是启动还是关闭。基本上,我希望我的测试工具能够告诉我,“嘿,笨蛋,启动Kafka!”而不是绞死。

有没有办法让我的生产者任务来确定服务器是启动还是关闭?

我像这样调用send():

代码语言:javascript
复制
kafkaProducer.send(new ProducerRecord<>(KAFKA_TOPIC, KAFKA_KEY,
    message), (rm, ex) -> {
        System.out.println("**** " + rm + "\n**** " +ex);
});

我设置linger.ms = 1;我已经尝试了retries=0、1和2,并且仍然使用send()块。我从来没见过回调调用。

较旧的消息建议将metadata.fetch.timeout.ms设置为较小的值,但在0.11中已经消失了。其他人建议调用命令行实用程序来查看服务器是否处于OK...but状态,引用的实用程序似乎也消失了。

做这件事的得体方法是什么?

EN

回答 2

Stack Overflow用户

发布于 2017-11-19 22:58:16

这很奇怪。它应该返回一个错误,指出“无法更新元数据”或“正在过期x个记录”。

检查生产者的request.timeout.msmax.block.ms设置。缺省情况下,request.timeout.ms的长度为60秒

票数 5
EN

Stack Overflow用户

发布于 2017-11-18 14:22:25

我们可以通过三种方式向代理发送消息:

即发即忘:我们向服务器发送一条消息,实际上并不关心消息是否成功到达。大多数情况下,它会成功到达,因为Kafka的可用性很高,生产者会自动重试发送消息。但是,使用此方法可能会丢失一些消息。

异步发送我们使用回调函数调用send()方法,该函数在接收到来自Kafka broker的响应时触发。

同步发送我们发送一条消息,send()方法返回一个Future对象,我们使用get()等待将来,看看send()是否成功。

同步发送消息的最简单方法如下:

代码语言:javascript
复制
ProducerRecord<String, String> record =
            new ProducerRecord<>(KAFKA_TOPIC, KEY, message);
    try {
            producer.send(record).get();
    } catch (Exception e) {
             e.printStackTrace();
    }

这里,我们使用Future.get()来等待Kafka的回复。如果记录没有成功发送给Kafka,该方法会抛出异常。如果没有错误,我们将获得一个RecordMetadata对象,我们可以用它来检索消息被写入的偏移量。

希望这能有所帮助。

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/47357590

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档