Apache Kafka文档指出:
内部Kafka流消费者max.poll.interval.ms默认值从300000改为Integer.MAX_VALUE。
由于此值用于检测一批记录的处理时间超过给定阈值的时间,是否存在这样一个“无限”值的原因?
它是否使应用程序变得没有响应能力?或者,卡夫卡流有一种不同的方式离开消费者群体时,处理过程太长?
发布于 2017-12-21 07:53:07
它是否使应用程序变得没有响应能力?或者,卡夫卡流有一种不同的方式离开消费者群体时,处理过程太长?
Kafka Streams在这个上下文中利用Kafka消费者客户端的心跳功能,从而分离心跳(“这个应用实例还活着吗?”)从电话到poll()。这两个主要参数是session.timeout.ms (用于心跳线程)和max.poll.interval.ms (用于处理线程),它们的区别将在https://stackoverflow.com/a/39759329/1743580中详细描述。
引入心跳是为了允许应用程序实例在处理记录时花费大量时间,而不被认为“没有进展”,因而“死了”。例如,你的应用程序可以在一分钟内对一张唱片进行大量的处理,同时对卡夫卡说:“嘿,我还活着,我正在进步。但我只是还没有完成处理过程。请继续关注。”
当然,您可以将max.poll.interval.ms从其默认设置(Integer.MAX_VALUE)更改为较低的设置,例如,如果您确实希望应用程序实例在轮询记录之间的时间超过X秒时被视为“死”,那么如果处理最新一轮记录所需时间超过X秒,则可以将其更改为“死”。这取决于您的特定用例是否有意义--在大多数情况下,默认设置是安全的。
session.timeout.ms:在使用Kafka的组管理工具时用来检测消费者故障的超时。消费者向经纪人发送周期性心跳以表示其活性。如果代理在此会话超时到期之前没有接收到心跳,则代理将从组中删除该使用者并启动重新平衡。注意,该值必须在由group.min.session.timeout.ms和group.max.session.timeout.ms在代理配置中配置的允许范围内。max.poll.interval.ms:使用使用者组管理时轮询()调用之间的最大延迟。这为使用者在获取更多记录之前空闲的时间设置了上限。如果在此超时到期之前未调用poll(),则认为使用者失败,该组将重新平衡,以便将分区重新分配给另一个成员。
https://stackoverflow.com/questions/47906485
复制相似问题