我的flink作业使用FlinkKafkaConsumer010从kafka consumer读取数据,并使用CustomBucketingSink接收到hdfs。我们有一系列的转换kafka -> flatmaps(2-3个转换) -> keyBy -> tumblingWindow(5 mins) -> Aggregation -> hdfsSink。我们有平均300万/分钟事件的kafka输入,高峰时间大约2000万/分钟的事件。检查点持续时间和两个检查点之间的最小停顿是3分钟,我使用的是FsStateBackend。
以下是我的假设:
Flink从kafka消耗一些固定数量的事件(一次从多个分区获得多个偏移量),并等待,直到它到达接收器,然后是检查点。在成功的情况下,它提交kafka分区,偏移它读取,并维护一些与它正在写入的hdfs文件相关的状态。在kafka将事件交给其他运营商之后,进行了多个转换,而kafka消费者则处于闲置状态,直到它得到它发送的事件的成功确认。因此,我们可以说,当接收器将数据写入hdfs时,所有之前的操作符都处于空闲状态。在失败的情况下,flink转到以前的检查点状态,并指向kafka最后提交的分区偏移量,并指向它应该开始写入的hdfs文件offest。
以下是基于上述假设的我的疑虑:
1)上述假设是正确的。2)翻滚窗口是否有意义,因为在失败的情况下,无论如何我们都是从最后一个kafka分区提交的偏移量开始。3)如果翻滚窗口处于make状态,flink何时可以使用该状态。4)为什么检查点和保存点状态大小不同。5)在任何故障的情况下,flink总是从sorce操作符开始。对吧?
发布于 2020-03-13 00:06:36
你的假设是不正确的。
(1)检查点不以任何方式依赖于到达接收器的事件或结果。
(2) Flink做自己的Kafka偏移管理。从检查点恢复时,在发生故障后,将使用检查点中的偏移量,而不是那些可能已提交回Kafka的偏移量。
(3)任何操作员都不会像您所描述的那样空闲。流水线不会因为检查点而停滞。
要理解检查点是如何工作的,最好的方法是浏览Flink操作游乐场,特别是关于Observing Failure and Recovery的部分。这将使您对该主题有更清晰的理解,因为您将能够准确地观察正在发生的事情。
我还可以推荐阅读https://ci.apache.org/projects/flink/flink-docs-master/training/fault_tolerance.html,并访问其中包含的链接。
但要了解检查点在应用程序中的工作原理,以下是基本步骤:
(1)当检查点协调器(作业管理器的一部分)决定是时候启动另一个检查点时,它通知每个任务管理器启动检查点n。
(2)所有源实例检查它们自己的状态,并将检查点屏障n插入到它们的传出流中。在您的示例中,源是Kafka消费者,它们检查每个分区的当前偏移量。
(3)无论何时检查点屏障到达有状态操作符中输入队列的头部,该操作符都会对其状态进行检查点操作并转发屏障。这一部分有些复杂--但基本上,状态保存在一个多版本的、并发控制的哈希图中。操作员创建可由检查点屏障后面的事件修改的状态的新版本n+1,并创建新线程以异步快照版本n中的所有状态。
在您的例子中,窗口和接收器是有状态的。窗口的状态包括当前窗口内容、触发器的状态以及用于窗口处理的其他状态(如果有)。
(4)接收器使用屏障的到达来刷新任何排队的输出,并提交挂起的事务。同样,这里也有一些复杂性,因为事务接收器使用两阶段提交协议。
在您的应用程序中,如果检查点间隔远远小于窗口持续时间,那么接收器将在从窗口接收任何输出之前完成许多检查点。
(5)当检查点协调器从每个任务收到检查点已完成的反馈时,它最终确定检查点元数据。
在恢复期间,每个操作员的状态将被重置为最近检查点中的状态。这意味着源被倒带到检查点中的偏移量,并且处理恢复到窗口和接收器中的状态,该状态对应于在消费事件直到这些偏移量之后它应该是的状态。
注意:为了保持合理的简单性,我略过了一些细节。此外,FLIP-76将引入一种新的检查点方法。
https://stackoverflow.com/questions/60656685
复制相似问题