1 概述 Flink 整个系统主要由两个组件组成,分别为 JobManager 和 TaskManager,Flink 架构也遵循 Master - Slave 架构设计原则,JobManager 为 2 Client 客户端 客户端负责将任务提交到集群,与 JobManager 构建 Akka 连接,然后将任务提交到 JobManager,通过和 JobManager 之间进行交互获取任务执行状态。 3 JobManager JobManager 负责整个 Flink 集群任务的调度以及资源的管理,从客户端中获取提交的应用,然后根据集群中 TaskManager 上 TaskSlot 的使用情况,为提交的应用分配相应的 JobManager 相当于整个集群的 Master 节点,且整个集群有且只有一个活跃的 JobManager ,负责整个集群的任务管理和资源管理。 客户端通过将编写好的 Flink 应用编译打包,提交到 JobManager,然后 JobManager 会根据已注册在 JobManager 中 TaskManager 的资源情况,将任务分配给有资源的
序 本文主要研究一下flink JobManager的High Availability flink-forward-berlin-2017-patrick-lucas-flink-in-containerland metadata的存储路径 masters文件 localhost:8081 localhost:8082 masters文件用于指定jobmanager的地址 HighAvailabilityMode /org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java public enum HighAvailabilityMode { ") .defaultValue("0") .withDeprecatedKeys("recovery.jobmanager.port") The JobManager's hostname and port * @throws ConfigurationException if the JobManager's address
JobManager 在 Flink 集群中发挥着重要的作用,包括任务调度和资源管理等工作。如果 JobManager 宕机,那么整个集群的任务都将失败。 为了解决 JobManager 的单点问题,Flink 也设计了 HA 机制来保障整个集群的稳定性。 我们就以 JobManager 为例,看一下机遇 ZooKeeper 的选举流程的具体实现。 我们以 TaskManager 获取 JobManager 的 leader 为例。 在梳理过程中,我们以 JobManager 为例,其他几个用到高可用的服务的选举逻辑也是一样的。
[1] jobmanager.memory.flink.size[2] 进程总内存 taskmanager.memory.process.size[3] jobmanager.memory.process.size Jobmanager内存配置 JobManager内存配置入口在JobManagerProcessUtils.processSpecFromConfig方法中,加载方式与taskmanager的内存配置加载方式大同小异 [6] jobmanager.memory.flink.size[7] 进程总内存 taskmanager.memory.process.size[8] jobmanager.memory.process.size 关于各内存部分的更多细节,请分别参考 TaskManager[12] 和 JobManager[13] 的相关文档。 只有在 jobmanager.memory.enable-jvm-direct-memory-limit[29] 设置为 true 时,JobManager 才会设置 JVM 直接内存限制。
都知道Flink中的角色分为Jobmanager,TaskManger 在启动脚本里面已经找到了jobmanager的启动类org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint create()方法中 其中包括了一些创建以及启动ResourceManager(有用于请求solt的RPC,初始化所有solt到resourceManager的soltManager的RPC(这个会在jobmanager 后面随缘更新到job启动Graph转换会提到 回到前面的Dispatcher.start()将传入的rpcService启动起来了,等待接受来自Driver端提交上来的JobGraph差不多启动完成了 这里jobmanager
."); //...... } jobmanager.heap.size配置用于指定JobManager的大小,默认是1024m;jobmanager.heap.mb配置已经被废弃 ConfigurationUtils 、taskmanager.heap.size来配置 jobmanager.sh flink-1.7.1/flink-dist/src/main/flink-bin/bin/jobmanager.sh # 配置用于指定JobManager的大小,默认是1024m;jobmanager.heap.mb配置已经被废弃;ConfigurationUtils的getJobManagerHeapMemory方法从Configuration 配置;FLINK_ENV_JAVA_OPTS的配置则取决于env.java.opts以及env.java.opts.jobmanager;因而要配置jobmanager的heap大小的话,可以指定FLINK_JM_HEAP 环境变量(比如FLINK_JM_HEAP=512m),或者在flink-conf.yaml中指定jobmanager.heap.size doc jobmanager.heap.size
JobManager协调每个flink应用的部署,它负责执行定时任务和资源管理。 每一个Flink集群都有一个jobManager, 如果jobManager出现问题之后,将不能提交新的任务和运行新任务失败,这样会造成单点失败,所以需要构建高可用的JobMangager。 类似zookeeper一样,构建好了高可用的jobManager之后,如果其中一个出现问题之后,其他可用的jobManager将会接管任务,变为leader。不会造成flink的任务执行失败。 可以在单机版和集群版构建jobManager flink ha主要分为两种,flink独立部署时的ha, flink on yarn模式部署时的ha 一.flink独立部署(Standalone模式) 从图上看道需要启动至少两个独立的jobmanager进程 下面我们来看一下配置 localhost:8081 localhost:8082 ha配置放到下文去说 二.on yarn模式(yarn session
问题背景 近期接到客户反馈,某地域的作业不定期的出现 JobManager 崩溃重启的问题。 具体现象如下: JobManager 在正常运行中,没有任何预兆地,突然报too old resource version错误,紧接着容器就自动退出了: 2020-10-17 14:51:36.289 该问题会触发 ResourceManager 对 JobManager 的重新初始化过程,作业也会从最近的一次 Checkpoint 恢复。 但是我们认为,对于这种资源版本不够新的问题,并不属于故障,因此也不需要重启 JobManager 这么重的操作,只需要重新初始化一次 watcher,令其资源版本更新到最新即可。 对此我们尝试了不少方案,例如主动令 JobManager 的 JVM 较长时间停顿等等,但是难以触发同样的现象。
本文则侧重于讲解 JobManager 的内存布局和相关经验分享,助力作业跑的更快、更稳定。 JobManager 内存分区总览同样地,我们从 Flink 官网的 JobManager 内存分区图 [5] 开始说起,图片可以看到,相比 TaskManager 的内存分区而言,JobManager 但这并不意味着 JobManager 的内存更好管理;相反,这表示 Flink 社区对 JobManager 内存控制的还很粗粒度,因此出问题时更隐蔽,更难定位,因此一定要特别重视。 JobManager 各内存区域详解同样地,我们逐一分析上述各个内存区的用途,以及线上配置和调优经验。 ,新启动的 JobManager 找不到可用的快照信息,可能造成数据丢失或重复计算的后果。
本文则侧重于讲解 JobManager 的内存布局和相关经验分享,助力作业跑的更快、更稳定。 JobManager 内存分区总览 同样地,我们从 Flink 官网的 JobManager 内存分区图 [5] 开始说起, 可以看到,相比 TaskManager 的内存分区而言,JobManager 但这并不意味着 JobManager 的内存更好管理;相反,这表示 Flink 社区对 JobManager 内存控制的还很粗粒度,因此出问题时更隐蔽,更难定位,因此一定要特别重视。 ,新启动的 JobManager 找不到可用的快照信息,可能造成数据丢失或重复计算的后果。 Flink 的配置参数为 jobmanager.memory.jvm-overhead.fraction,默认为 0.1 即 10% 的 JVM 进程总内存;同时也受最小阈值(参数为 jobmanager.memory.jvm-overhead.min
上线接近一年之久,客户反馈通过yarn-session模式启动的Flink集群JobManager出现故障:基于webui上传jar方式执行任务失败,堆栈提示内存溢出的异常(java.lang.OutOfMemoryError /yarn-session.sh -Denv.java.opts.jobmanager="-XX:CompressedClassSpaceSize=512M" -d 查看JobManager进程JVM参数 /yarn-session.sh -Denv.java.opts.jobmanager="-XX:MaxMetaspaceSize=1G" -d 如上图所示,CompressedClassSpaceSize /yarn-session.sh -Denv.java.opts.jobmanager="-XX:MaxMetaspaceSize=1G -XX:CompressedClassSpaceSize=512M /yarn-session.sh -Denv.java.opts.jobmanager="-XX:+TraceClassLoading -XX:+TraceClassUnloading" 通过查看日志文件统计类加载与类卸载情况
1.常用的数据结构定义 Flink中,JobManager内部维护了多个数据结构,用于存储和管理作业的元数据信息。 JobGraph是由客户端提交作业时生成的,并由JobManager进行解析和管理。 JobManager在作业执行结束后,会生成JobResult并返回给客户端。 综上所述,JobManager内部维护了多个数据结构,用于存储和管理作业的元数据信息。 JobManager 会将 JobGraph 转换成 ExecutionGraph 。 局部的终结意味着作业的执行已经被对应的 JobManager 终结,但是集群中另外的 JobManager 依然可以从高可用存储里获取作业信息并重启。
Flink核心架构解析:JobManager、TaskManager和Client的角色 JobManager:集群的大脑与指挥中心 在Flink分布式架构中,JobManager扮演着集群主节点的角色 由于其全局协调的特性,通常在生产环境中会部署多个JobManager实例,通过主备模式实现高可用性。 在执行阶段,TaskManager接收来自JobManager的指令,启动具体的SubTask线程。 在执行过程中,TaskManager会定期向JobManager发送心跳和状态更新,JobManager则全局监控作业进度,处理可能的异常(如节点故障或背压)。 步骤二:JobManager接收与解析 JobManager作为集群的"大脑",接收客户端提交的JobGraph。它首先进行验证,确保作业逻辑正确,然后将其转换为ExecutionGraph。
每个Flink集群至少要有一个JobManager,但在生产环境中通常是高可用模式部署,即部署多台JobManager,其中一台作为Leader,其他的作为Standby节点。 这样就能避免JobManager单机故障影响到整个Flink集群的可用性。JobManager主要由以下几部分组成,下面我们分别来看每部分的作用。 ,再由JobManager生成并行版本的ExecutionGraph,待JobManager将task调度后,生成的图被称为PhysicalGraph。 SessionModeSessionMode下,所有的任务共享JobManager和TaskManager,JobManager的生命周期不受提交的Job影响,会长期运行。 客户端无需将依赖包上传到JobManager,只负责提交作业,减轻了客户端的压力。提交作业后,JobManager主动从HDFS拉取依赖包。
作业提交的JobManager模式: 应用模式:专为一个应用运行集群。作业的主要方法(或客户端)在 JobManager 上执行。 外部组件(全部可选) 高可用性服务提供商 Flink 的 JobManager 可以运行在高可用模式下,这使得 Flink 从 JobManager 故障中恢复。 作业提交的JobManager模式: 应用模式:专为一个应用运行集群。作业的主要方法(或客户端)在 JobManager 上执行。 作业提交的JobManager模式: 应用模式:专为一个应用运行集群。作业的主要方法(或客户端)在 JobManager 上执行。 作业提交的JobManager模式: 应用模式:专为一个应用运行集群。作业的主要方法(或客户端)在 JobManager 上执行。
1.24.0 flink:1.9.2-scala_2.12 操作步骤 SSH登录服务器; 创建文件docker-compose.yml,内容如下: version: "2.1" services: jobmanager image: flink:1.9.2-scala_2.12 expose: - "6123" ports: - "8081:8081" command: jobmanager environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager taskmanager: image: flink:1.9.2-scala _2.12 expose: - "6121" - "6122" depends_on: - jobmanager command: taskmanager links: - "jobmanager:jobmanager" environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager
ResourceManager:负责资源的管理,整个Flink集群中只有一个 JobManager:负责管理作业的执行,Flink集群中有多个作业,每个作业都有自己的JobManager Flin集群运行模式 在将作业提交到AM的Dispatcher后,Dispatcher首先会启动一个JobManager,然后JobManager会向ResourceManager申请资源启动作业中的具体任务,此时根据Flink ,然后TaskExecutor进行记录,会向JobManager进行注册。 SlotPool:SlotPool属于JobManager,用于缓存所有的Task请求和被分配给该JobManager的Slot,当有Slot被提供后,SlotPool会从缓存的请求中选择相应的请求和Slot (5.requestSlot) TaskManager如果还没有执行过该JobManager的Task的话,它会与相应的JobManager建立连接,发起提供Slot的RPC请求(6. offset)
以下的内存配置适用于 TaskManager 的1.10版和 JobManager 进程的1.11版。 总进程内存 taskmanager.memory.process.size jobmanager.memory.process.size 对于本地执行,请参阅TaskManager和JobManager 这意味着必须明确配置以下没有默认值的选项子集之一: 对于任务管理器: 对于 JobManager: taskmanager.memory.flink.size jobmanager.memory.flink.size (***)只有在设置了相应选项时,才会为JobManager 进程添加 JVM Direct 内存限制jobmanager.memory.enable-jvm-direct-memory-limit。 另请查看TaskManager和 JobManager的详细内存模型以了解如何配置相关组件。
Flink session cluster 会包含以下组件: JobManager 以 Deployment 的方式运行在 K8S 集群 TaskManagers 也是以 Deployment 的方式运行在 K8S 集群 JobManager 的 REST 和 UI 端口通过 Service 部署在 K8S 集群 2.1 Deploy Flink session cluster on Kubernetes kubectl create -f jobmanager-service.yaml kubectl create -f jobmanager-deployment.yaml kubectl create 成功部署了一个 JobManager 的 Pod 和两个 TaskManager 的 Pod。 ? 部署成功后,通过以下方法来查看 Flink UI。 ? 访问 Flink UI 的地址。 http://localhost:8001/api/v1/namespaces/default/services/flink-jobmanager:ui/proxy/#/overview ?
二、Flink运行时架构 Flink整个系统主要由两个组件组成,分别为JobManager和TaskManager,Flink架构也遵循Master-Slave架构设计原则,JobManager为Master Flink Clients客户端 Flink客户端负责将任务提交到集群,与JobManager构建Akka连接,然后将任务提交到JobManager,通过和JobManager之间进行交互获取任务执行状态 JobManager JobManager负责整个Flink集群任务的调度以及资源的管理,从客户端中获取提交的应用,然后根据集群中TaskManager上TaskSlot的使用情况,为提交的应用分配相应的 JobManager相当于整个集群的Master节点,Flink HA 集群中可以有多个JobManager,但整个集群中有且仅有一个活跃的JobManager,其他的都是StandBy。 协调过程都是在Flink JobManager中完成。