首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >kafka宕机发送消息无异常

kafka宕机发送消息无异常
EN

Stack Overflow用户
提问于 2019-02-01 15:06:49
回答 1查看 1.2K关注 0票数 6

我有一个场景,卡夫卡是随机下降的。当我向kafka发送消息时,它关闭了,我想单独处理它。但是当我向kafka发送消息时,它是关闭的,没有异常,所以我无法确定kafka是否关闭。有什么方法可以捕获异常和丢失的消息。

我有一个非常基本的代码

代码语言:javascript
复制
public static void main(String[] args) {
    String topicName = "test-1";

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, String> producer = new KafkaProducer<>(props);

    for (int i = 0; i < 10; i++) {
        producer.send(new ProducerRecord<>(topicName, Integer.toString(i), Integer.toString(i)));
    }
    System.out.println("Message sent successfully");
    producer.close();
}
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-02-01 18:10:01

您需要发送一个同步调用,才能从Kafka获得响应。

代码语言:javascript
复制
for (int i = 0; i < 10; i++) {
    ProducerRecord<String, String> record = 
              new ProducerRecord<>(topicName, Integer.toString(i), Integer.toString(i));
    try {
           producer.send(record).get();
    } catch (Exception e) {
           e.printStackTrace();
           // Handle message that has failed
    }
}

方法.get()将返回Kafka的响应。当向Kafka推送记录失败时会抛出异常。成功发送记录后,将返回RecordMetadata

票数 6
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54474540

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档