我的问题是,对于固定集群设置中的flink作业中的操作符,了解一个很好的并行性选择。假设我们有一个flink作业DAG,其中包含map和reduce类型的操作符,它们之间有流水线边(没有阻塞边)。一个示例DAG如下:
Scan -> Keyword Search -> Aggregation假设有一个固定大小的M机器集群,每个机器都有C核,而DAG是在集群上运行的唯一工作流。Flink允许用户为单个操作符设置并行性。我通常为每个操作符设置M*C并行。但从性能角度(例如执行时间)来看,这是最好的选择吗?我们能利用运算符的属性来做出更好的选择吗?例如,如果我们知道aggregation比较昂贵,那么我们应该只将M*C并行化分配给aggregation操作符,并减少其他运算符的并行性吗?希望这也能减少背压的机会。
我并不是在寻找一个正确的公式来给我“最好”的并行性。我只是在寻找一种直觉/准则/想法,可以用来做决定。令人惊讶的是,我找不到多少关于这个话题的文献可读。
注意:我知道在最近的Flink动态缩放反应模式。但是我的问题是关于一个只运行一个工作流的固定集群,这意味着动态缩放是不相关的。我看了this的问题,但没有得到答案。
发布于 2022-05-23 15:05:01
我对此的看法有点不同。在我看来,有两个关键问题需要考虑:
(1)我是否想保持时隙统一?换句话说,每个时隙是每个任务的一个实例,还是我要调整特定任务的并行性?
(2)每槽有多少个核?
我对(1)的回答默认为“保持一致”。我还没有见过很多情况,调整单个操作符(或任务)的并行性已经证明是值得的。
改变并行性通常是适得其反的,如果它意味着打破一个操作符链。不管怎么说,在不寻常的情况下做洗牌都是有意义的,但总的来说,我不认为有什么意义。既然有些插槽会有每个操作符的实例,而且插槽都是统一的,那么为什么会有一些任务较少分配给它们的插槽呢?(在这里,我假设您不想费劲地设置插槽共享组,当然可以这样做。)从操作的角度来看,沿着这条路走下去会使事情变得更加复杂,而且几乎没有什么收获。在我看来,最好在其他地方进行优化(例如,序列化)。
至于每个插槽的核心,许多作业受益于每个插槽有两个核心,对于一些复杂的任务,你会想要更高的。因此,我认为,对于简单的ETL作业,M*C与M*C/2 (或更低的)作业具有整体的并行性,而M*C/2(或更低的)则是用于执行更强的任务的。
为了说明极端情况:
一个简单的ETL工作可能类似于
source -> map -> sink所有连接都在转发连接。由于只有一个任务,而且因为Flink每个任务只使用一个线程,所以在本例中,每个时隙只使用一个线程。因此,每个插槽分配一个以上的核心都是完全的浪费。无论如何,任务可能是i/o约束的。
在另一个极端,我看到了涉及~30个联接的作业,一个或多个ML模型的评估,以及加窗口的聚合等等。当然,您需要多个CPU核心来处理这样一个作业的每个并行部分(而且需要多个CPU内核)。
通常,大部分CPU工作都会进入序列化和反序列化,特别是在RocksDB中。我会试图为每个事件找出涉及多少个RocksDB状态访问、keyBy和再平衡--并提供足够的内核,使所有ser/de都可以同时进行(如果您关心的是最大化吞吐量)。对于最简单的工作,一个核心可以跟上。当你到达窗口连接的时候,你可能已经突破了一个核心所能跟上的极限--取决于你的源和汇能走多快,以及你对资源浪费的谨慎程度。
示例:假设您选择的并行度为50,每槽有2个内核,或并行性为100,每槽有一个核心。在这两种情况下,都有相同的资源可用--哪一种表现更好?
一般情况下,如果每个时隙有足够的任务/线程来保持两个内核的忙碌(如果整个管道适合于一个任务,那么反序列化程序也可以在自己的线程中运行),我希望每个时隙具有更多的内核,这样的情况会稍微好一些。使用较少的插槽,每个时隙将有更多的键和键组,这将有助于避免数据倾斜,并且随着任务的减少,检查点(如果启用)将表现得更好一些。进程间通信也更有可能采取优化的(内存中)路径。
https://stackoverflow.com/questions/72345290
复制相似问题