我在Flink中构建了一个工作流,它由一个自定义源、一系列映射/平台映射和一个接收器组成。
自定义源的run()方法迭代存储在文件夹中的文件,并通过上下文的collects ()方法收集每个文件的名称和内容(我有一个自定义对象,该对象将此信息存储在两个字段中)。
然后,我有一系列映射/平面图来转换这些对象,然后使用自定义接收器将这些对象打印成文件。Flink的Web中生成的执行图如下:

我有一个集群或2个工作人员设置为每个有6个槽(他们都有6个核心,也)。我将并行设置为12,从执行图中我看到源的并行性为1,而工作流的其余部分则为12。
当我运行工作流(在专用文件夹中有大约15K文件)时,我使用htop监视员工的资源。所有的核心达到100%的利用率在大部分时间,但大约每30分钟左右,8-10的核心成为闲置约2-3分钟。
我的问题如下:
.... [Flat Map -> Map -> Map -> Sink: Unnamed (**3/12**)] INFO ....、.... [Flat Map -> Map -> Map -> Sink: Unnamed (**5/12**)] INFO ....等)中得到了打印。但是,我不明白的是,如果一个插槽正在执行源角色,而我在集群中有12个插槽,那么其余的工作流是如何由12个时隙执行的?是否有一个时隙同时代表工作流其余部分的源和一个实例?如果是,这一特定时隙的资源是如何分配的?是否有人可以解释在这个工作流中正在执行的步骤?例如(这可能是错误的):插槽。
我相信我上面描述的是错误的,但我举了一个例子来更好地解释我的问题。
发布于 2020-08-31 09:50:11
发布于 2020-08-31 16:33:49
为了回答有关并行化您的读取的具体问题,我将执行以下操作.
RichSourceFunction.
open()方法实现自定义源,调用getRuntimeContext().getNumberOfParallelSubtasks()获取总并行性,调用getRuntimeContext().getIndexOfThisSubtask()获取子任务的索引initialized.
run()方法,当您在文件上迭代时,获取每个文件名的hashCode(),模块化总的并行性。如果这等于子任务的索引,则处理它.通过这种方式,您可以将工作分散到12个子任务上,而不需要子任务尝试处理相同的文件。
https://stackoverflow.com/questions/63668191
复制相似问题