Given,
我有一个Flink作业,它从ActiveMQ源代码读取&写到mysql数据库,键控在标识符上。我每隔一秒就为这项工作启用检查点。我将检查点指向一个Minio实例,验证检查点是否与jobid一起工作。我部署这个任务是一个Openshift (在下面的Kubernetes)--我可以在需要的情况下扩大/减少这个任务。
问题
在部署(滚动)作业或由于错误/错误导致作业下降时,如果ActiveMQ中有未使用的消息或Flink中的未确认消息(但写入数据库),则当作业恢复(或部署新作业)时,作业处理已经处理消息,从而导致在数据库中插入重复的记录。
问题
jobid在每个部署中不断变化,恢复是如何发生的?Exactly-Once)中的重复,我是否可以编写特定于数据库的查询(upsert)来更新给定的记录&如果没有,可以插入吗?发布于 2020-03-05 20:17:06
JDBC目前只支持至少一次,这意味着您在恢复时得到了重复的消息。目前有一个草案可以添加对一次的支持,它可能会在1.11中发布。
检查点不应该帮助工作从原来的地方恢复过来吗?
是的,但上次成功的检查点和恢复之间的时间可能会产生观察到的重复。我给出了一个更详细的有关主题的回答。
在我(滚动)部署新作业之前,我是否应该接受检查点?
绝对一点儿没错。实际上,您应该在保存点中使用cancel。这是改变拓扑结构的唯一可靠方法。此外,“取消与保存点”避免了数据中的任何重复,因为它优雅地关闭了作业。
如果作业因错误或群集失败而退出,会发生什么情况?
它应该自动重新启动(取决于您的重新启动设置)。它将使用最新的恢复检查点。这肯定会导致重复。
在每次部署时,工作人员的身份都在不断变化,那么恢复是如何发生的呢?
您通常会显式地指向同一个检查点目录(在S3?)。
由于我不能期望从数据库中获得幂等性,所以重新插入实现精确一次处理的唯一方法吗?
目前,我看不出有什么办法可以绕过它。它应该会随着1.11而改变。
https://stackoverflow.com/questions/60547570
复制相似问题