本文主要是整理博主收集的 Flink、OLAP 高频面试题。之后每周都会有一篇。
博主把这一期的面试题先贴出来,大家自己感受感受。
下面的答案都是博主收集小伙伴萌的答案 + 博主自己的理解进行的一个总结。
端对端 exactly-once 有 3 个条件:
其中前两项一般大多数引擎都支持,我们需要关注的就是第 3 项,目前有两种常用方法:
一些我们能在其他 OLAP 引擎上面见到的优化有:
在 ClickHouse 上特别突出的有:
ClickHouse 采用列存储,这对于分析型请求非常高效。
行存储:从存储系统读取所有满足条件的行数据,然后在内存中过滤出需要的字段,速度较慢。比如,一个表有 10 列,我其实只查 1 列数据的话,行存储还是会把 10 列数据都扫描一遍。
1
列存储:仅从存储系统中读取必要的列数据,无用列不读取,速度非常快。相同的例子,一个表有 10 列,我其实只查 1 列数据的话,列存储就只扫描这一列数据
2
由于 ClickHouse 采用列存储,相同列的数据连续存储,且底层数据在存储时是经过排序的,这样数据的局部规律性非常强,有利于获得更高的数据压缩比。
此外,ClickHouse 除了支持 LZ4、ZSTD 等通用压缩算法外,还支持 Delta、DoubleDelta、Gorilla 等专用编码算法,用于进一步提高数据压缩比。
其中 DoubleDelta、Gorilla 是 Facebook 专为时间序数据而设计的编码算法,理论上在列存储环境下,可接近专用时序存储的压缩比,详细可参考 Gorilla 论文。
3
列存用于裁剪不必要的字段读取,而索引则用于裁剪不必要的记录读取。ClickHouse 支持丰富的索引,从而在查询时尽可能的裁剪不必要的记录读取,提高查询性能。
ClickHouse 中最基础的索引是主键索引。ClickHouse 的底层数据按建表时指定的 ORDER BY 列进行排序,并按 index_granularity 参数切分成数据块,然后抽取每个数据块的第一行形成一份稀疏的排序索引。
用户在查询时,如果查询条件包含主键列,则可以基于稀疏索引进行快速的裁剪。
ClickHouse 支持更多其他的索引类型,不同索引用于不同场景下的查询裁剪,具体汇总如下,更详细的介绍参考 ClickHouse 官方文档:
4
OLAP 分析领域有两个典型的方向:
前者更为灵活,但需要的技术栈相对复杂;后者实现相对简单,但要达到的极致性能,需要生成所有常见查询对应的物化视图,消耗大量计算、存储资源。
物化视图的原理如下图所示,可以在不同维度上对原始数据进行预计算汇总,这样我们查询时就可以直接查询到聚合好的数据上面,查询效率更高:
5
其会在不同的场景使用不同的算法。
例如,在去重函数 uniqCombined 中,会根据数据量选择不同的算法:数据量比较少的时候,会选择使用 Array 来保存;数据量中等的时候,使用 HashSet;数据量很大的时候,会使用 HyperLogLog 算法。
并且表引擎很丰富,有 20 多种,每种表引擎都做了很多的优化,这个道理就和小伙伴萌工作时为每类工作场景专门设计对应的解决方案一样,效果当然是不错的。
向量化执行。SIMD 被广泛地应用于文本转换、数据过滤、数据解压和 JSON 转换等场景。相对于单纯使用 CPU,利用寄存器暴力优化也算是一种降维打击,毕竟 "能用机器资源解决的问题就别手动优化"。
以商品订单数据为例,查询某个订单总价格的处理过程,由传统的按行遍历处理的过程,转换为按 Block 处理的过程。
具体如下图:
6
由于拥有 Yandex 的天然优势,经常会使用真实数据来进行测试,尝试使用于各个场景。也因此获得了快速的版本更新换代,基本维持在一个月一更新。
并且在业界有新的算法出现时,ClickHouse 的开发人员也会积极去测试。
其实这个问题主要是为了让大家不要陷入一个固有想法中。
举个例子:
只是这些引擎尝尝被用于满足对应场景的需求。
比如 ClickHouse 用于大宽表的灵活 SQL 计算,这种场景的并发肯定不会很高。Redis 常被用于小 key 小 value set,get 场景,那么这种场景的并发肯定也不会低的。
每种引擎都有对应的瓶颈处,只要你没有达到这个瓶颈阈值,并发都不会低。
参考了很多小伙伴的解决方案,大概分为几种:
博主认为其中第一种方案大家基本都能答上来,第二种和第三种是相对比比较创新的,但是实现逻辑较复杂,大家可以学习对应的思想。
这里博主结合大家的想法给出答案:
首先我们使用最简单直接的方式 2 个指标分拆开来计算:
上面这个方法在 90% 的场景都没有啥问题,但是如果心跳日志数据 QPS 都很大,则每个任务都去消费一遍,链路稳定性差。
这里我们可以做一次优化,我们可以发现上面这 2 个指标其实是有先后顺序关系的。
所以为了减少流量,其实同时在线用户可以作为 DAU 的输入。优化链路如下:
最终这样输出的数据无论是在来一个 ads 任务做聚合还是直接导入到 MySQL、ClickHouse、Druid 都可以,因为都只是计算 count 而已。
Flink 对状态做了能力扩展,即 TTL。它的能力其实和 redis 的过期策略类似,举例:
那么首先我们看下什么场景需要用到 TTL 机制呢?举例:
比如计算 DAU 使用 Flink MapState 进行去重,到第二天的时候,第一天的 MapState 就可以删除了,就可以用 Flink State TTL 进行自动删除(当然你也可以通过代码逻辑进行手动删除)。
其实在 Flink DataStream API 中,TTL 功能还是比较少用的。Flink State TTL 在 Flink SQL 中是被大规模应用的,几乎除了窗口类、ETL(DWD 明细处理任务)类的任务之外,SQL 任务基本都会用到 State TTL。
那么我们在要怎么开启 TTL 呢?这里分 DataStream API 和 SQL API:
private final MapStateDescriptor<String, List<Item>> mapStateDesc =
new MapStateDescriptor<>(
"itemsMap",
BasicTypeInfo.STRING_TYPE_INFO,
new ListTypeInfo<>(Item.class));
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 使用 StateTtlConfig 开启 State TTL
mapStateDesc.enableTimeToLive(StateTtlConfig
.newBuilder(Time.milliseconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupInRocksdbCompactFilter(10)
.build());
}
关于 StateTtlConfig 的每个配置项的功能如下图所示:
1
StreamTableEnvironment
.getConfig()
.getConfiguration()
.setString("table.exec.state.ttl", "180 s");
注意:SQL 中 TTL 的策略不如 DataStream 那么多,SQL 中 TTL 只支持下图所示策略:
6
首先我们来想想,要做到 TTL 的话,要具备什么条件呢?
想想 Redis 的 TTL 设置,如果我们要设置 TTL 则必然需要给一条数据给一个时间戳,只有这样才能判断这条数据是否过期了。
在 Flink 中设置 State TTL,就会有这样一个时间戳,具体实现时,Flink 会把时间戳字段和具体数据字段存储作为同级存储到 State 中。
举个例子,我要将一个 String 存储到 State 中时:
接下来以 FileSystem 状态后端下的 MapState 作为案例来说:
2
3
注意: 任务设置了 State TTL 和不设置 State TTL 的状态是不兼容的。这里大家在使用时一定要注意。防止出现任务从 Checkpoint 恢复不了的情况。但是你可以去修改 TTL 时长,因为修改时长并不会改变 State 存储结构。
了解了基础数据结构之后,我们再来看看 Flink 提供的 State 过期的 4 种删除策略:
访问 State 的时候根据时间戳判断是否过期,如果过期则主动删除 State 数据。以 MapState 为例,如下图所示,在 MapState.get(key) 时会进行判断是否过期:
这个删除策略是不需要用户进行配置的,只要你打开了 State TTL 功能,就会默认执行。
4
从状态恢复(checkpoint、savepoint)的时候采取做过期删除,但是不支持 rocksdb 增量 checkpoint。
StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupFullSnapshot()
.build()
访问 state 的时候,主动去遍历一些 state 数据判断是否过期,如果过期则主动删除 State 数据。
StateTtlConfig
.newBuilder(Time.seconds(1))
// 每访问 1 此 state,遍历 1000 条进行删除
.cleanupIncrementally(1000, true)
.build()
5
注意:
仅仅支持 rocksdb。在 rockdb 做 compaction 的时候遍历进行删除。
StateTtlConfig
.newBuilder(Time.seconds(1))
// 做 compaction 时每隔 3 个 entry,重新更新一下时间戳(这个时间戳是 Flink 用于和数据中的时间戳来比较判断是否过期)
.cleanupInRocksdbCompactFilter(3)
.build()
注意:rocksdb compaction 时调用 TTL 过滤器会降低 compaction 速度。因为 TTL 过滤器需要解析上次访问的时间戳,并对每个将参与压缩的状态进行是否过期检查。对于集合型状态类型(比如 ListState 和 MapState),会对集合中每个元素进行检查。