我是卡夫卡的新手,我一直在研究卡夫卡在停止的时候向它发送信息时的行为。
我面临的情况是,我停止使用'Kubectl删除StatefulSet kafka_kf‘的卡夫卡。然后,我使用java Kafka生产者向Kafka发送了一些信息。然后我又重新开始了卡夫卡,当我开始卡夫卡的时候,这些发送给卡夫卡的信息就立即出现在消费者的身上。知道卡夫卡在这种情况下会发生什么吗?以及如何防止这些消息出现在使用者中?这些消息会引起一个复制问题,这就是为什么我需要它们不出现的原因。
通过使用用命令打开的使用者,我看到消息出现在使用者中:
kubectl exec -ti test -- ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --isolation-level read_committed --topic testtopic
用于向kafka发送消息的代码的安全性是:producer.send(message)。
发布于 2020-09-09 09:18:28
首先,我认为重要的是要理解producer.send()是一个异步调用,因此它不会阻塞。其次,send()方法实际上并不将消息推送给代理,而是将消息放在本地内存中的二进制队列中。在生产者与其通信的主题中,每个分区都有一个单独的二进制队列。这些记录实际上是由生产者端的内部后台线程推送给代理的,该线程将由可配置批处理阈值触发。是这个动作在等待来自代理的ack(由ack设置配置),而不是send()方法。
资料来源:合流培训-开发人员构建Apache Kafka的技能。
当卡夫卡没有时间,你将得到一个TimeoutException在你的制片人。但是,这个异常可以通过重试来处理,并且生产者配置retries默认设置为2147483647。
一旦你让卡夫卡可用,你的制片人就能够真正地把信息发送给卡夫卡,你的消费者就会收到它们。
如果不希望接收这些消息,则需要设置KafkaProducer配置retries=0。
要了解更多关于生产者回调异常的信息,您可以查看我的另一个answer。
编辑评论中的新问题:
是否有方法查找消息(或所有消息)是否已成功发送?
在发送数据时,可以定义如下所示的自定义回调类。如果产生消息出了问题,此回调将引发异常。
class ProducerCallback extends Callback {
@Override
override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
if (e != null) {
e.printStackTrace()
}
}
}
producer.send(message, new ProducerCallback)作为一种选择,您可以简单地调用
producer.send(message).get()因为这将阻止您直到收到卡夫卡代理的所有确认(请参阅KafkaProducer配置acks)。
https://stackoverflow.com/questions/63808435
复制相似问题