当我想到按某个键的动作时,我通常会想到把所有与钥匙匹配的事件都扔到同一个桶里的类比。正如您可以想象的那样,当Flink应用程序开始处理大量数据时,您所选择的键开始变得重要,因为您希望确保您能够很好地清理状态。这就引出了我的问题,Flink到底是如何清理这些“水桶”的?如果存储桶是空的(所有的MapStates和ValueStates都是空的),Flink是否关闭密钥空间的那个区域并删除桶?
示例:
传入数据格式: {userId,computerId,amountOfTimeLoggedOn}
键:用户Key/ComputerId
当前密钥空间:
Flink最终会把Bob,计算机11从关键空间中移除吗?还是说它会永远存在下去,因为它曾经有过一个事件?
发布于 2020-04-30 14:58:26
Flink不为没有任何与其关联的用户值的状态键存储任何数据,至少在现有的状态后端:堆(内存中)或RocksDB中。
键空间在Flink中是虚拟的,Flink不对哪些具体键可能存在作出任何假设。每个键没有预先分配的桶或密钥子集。只有当用户应用程序为某个键写入一些值时,它才会占用存储空间。
一般的想法是,所有具有相同密钥的记录都在同一台机器上进行处理(这有点像您所说的在同一个桶中)。特定密钥的本地状态也总是保存在同一台机器上(如果有存储的话)。不过,这与检查点无关。
对于您的示例,如果在某个时间点为Bob、Computer 11编写了一些值,然后随后删除,则Flink将使用键将其完全删除。
发布于 2020-05-01 15:56:11
简短回答
它在时间的帮助下清理了Flink状态的(TTL)特性和Java垃圾收集器(GC)。TTL特性将删除对状态条目的任何引用,GC将取回分配的内存。
长答案
你的问题可分为三个子问题:
我会尽量简短。
Flink如何基于键对数据进行分区?
对于键控流上的操作符,Flink在一致散列算法的帮助下对密钥上的数据进行分区。它创建了桶的max_parallelism数。每个操作符实例被分配一个或多个这些桶。无论何时向下游发送数据,都会将密钥分配给其中的一个桶,然后将其发送给相关的操作符实例。此处不存储键,因为范围是通过数学计算的。因此,在任何时候,都不会清除任何区域或删除桶。您可以创建任意类型的密钥。它不会影响内存的键空间或范围。
Flink如何使用密钥存储状态?
所有操作符实例都有一个实例级的状态存储。这个存储定义了操作符实例的状态上下文,它可以存储多个命名状态存储,例如“计数”、“和”、“某些名称”等。这些命名状态存储是可以根据数据键存储值的键值存储。
当我们用运算符的open()函数中的状态描述符初始化状态时,就会创建这些KV存储。即getRuntimeContext().getValueState().
这些KV存储只在需要在状态中存储某些内容时才存储数据。(比如HashMap.put(k,v))。因此,除非状态更新方法(如update**,** add**,** put**)被调用),否则不会存储键或值。
所以,
Flink如何清理状态以获取密钥?
除非用户需要或用户手动完成,否则Flink不会删除状态。如前所述,Flink具有状态的TTL特性。此TTL将标记状态过期,并在调用清理策略时删除它。这些清理策略会改变wrt后端类型和清理时间。对于堆状态后端,它将从状态表中删除条目,即删除对条目的任何引用。这个非引用条目占用的内存将由Java清理。对于RocksDB状态后端,它只需调用RocksDB的本机delete方法。
https://stackoverflow.com/questions/61507880
复制相似问题