首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >卡夫卡如何处理在停止时发送给它的信息?

卡夫卡如何处理在停止时发送给它的信息?
EN

Stack Overflow用户
提问于 2020-09-09 09:11:53
回答 1查看 175关注 0票数 2

我是卡夫卡的新手,我一直在研究卡夫卡在停止的时候向它发送信息时的行为。

我面临的情况是,我停止使用'Kubectl删除StatefulSet kafka_kf‘的卡夫卡。然后,我使用java Kafka生产者向Kafka发送了一些信息。然后我又重新开始了卡夫卡,当我开始卡夫卡的时候,这些发送给卡夫卡的信息就立即出现在消费者的身上。知道卡夫卡在这种情况下会发生什么吗?以及如何防止这些消息出现在使用者中?这些消息会引起一个复制问题,这就是为什么我需要它们不出现的原因。

通过使用用命令打开的使用者,我看到消息出现在使用者中:

kubectl exec -ti test -- ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --isolation-level read_committed --topic testtopic

用于向kafka发送消息的代码的安全性是:producer.send(message)

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-09-09 09:18:28

首先,我认为重要的是要理解producer.send()是一个异步调用,因此它不会阻塞。其次,send()方法实际上并不将消息推送给代理,而是将消息放在本地内存中的二进制队列中。在生产者与其通信的主题中,每个分区都有一个单独的二进制队列。这些记录实际上是由生产者端的内部后台线程推送给代理的,该线程将由可配置批处理阈值触发。是这个动作在等待来自代理的ack(由ack设置配置),而不是send()方法。

资料来源:合流培训-开发人员构建Apache Kafka的技能。

当卡夫卡没有时间,你将得到一个TimeoutException在你的制片人。但是,这个异常可以通过重试来处理,并且生产者配置retries默认设置为2147483647。

一旦你让卡夫卡可用,你的制片人就能够真正地把信息发送给卡夫卡,你的消费者就会收到它们。

如果不希望接收这些消息,则需要设置KafkaProducer配置retries=0

要了解更多关于生产者回调异常的信息,您可以查看我的另一个answer

编辑评论中的新问题:

是否有方法查找消息(或所有消息)是否已成功发送?

在发送数据时,可以定义如下所示的自定义回调类。如果产生消息出了问题,此回调将引发异常。

代码语言:javascript
复制
class ProducerCallback extends Callback {

  @Override
  override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
    if (e != null) {
      e.printStackTrace()
    }
  }

}

producer.send(message, new ProducerCallback)

作为一种选择,您可以简单地调用

代码语言:javascript
复制
producer.send(message).get()

因为这将阻止您直到收到卡夫卡代理的所有确认(请参阅KafkaProducer配置acks)。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63808435

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档