概述 BlockManager是spark自己的存储系统,RDD-Cache、 Shuffle-output、broadcast 等的实现都是基于BlockManager来实现的,BlockManager 也是分布式结构,在driver和所有executor上都会有blockmanager节点,每个节点上存储的block信息都会汇报给driver端的blockManagerMaster作统一管理,BlockManager 随后都创建了自己blockManager,创建blockManager的时候都创建了BlockManagerSlaveEndpoint。 blockManager创建后还不能直接使用,接着都会调用blockManager的initialize方法,通过与master通信向master进行注册,master收到消息后会将blockManager blockManager对外服务 blockManager典型的几个应用场景如下: spark shuffle过程的数据就是通过blockManager来存储的。
接下来分别介绍 Master 端和 Slave 端的 BlockManager。 端的 BlockManager 的简称。 接下来,我们看看 BlockManager 是如何创建的。 创建 BlockManager 一图胜千言,我们还是先来看看 Master 是如何创建的: ? 该 BlockManager 也就是 Storage 模块的 Master 或 Slave 了。 注册 BlockManager BlockManager 实例在被创建后,不能直接使用,必须调用其 initialize 方法才能使用。
最近一直在忙,没顾得上写文章,新年的第一篇文章,希望大家可以喜欢;好了,今天接着之前的内容,来聊聊BlockManager的工作原理,上图来分析; ? BlockManager原理图 在DAGShceduler中有一个BlockManagerMaster对象,该对象的工作就是负责管理全局所有BlockManager的元数据,当集群中有BlockManager 注册完成的时候,其会向BlockManagerMaster发送自己元数据信息;BlockManagerMaster会为BlockManager创建一个属于这个BlockManager的BlockManagerInfo ,用于存放BlockManager的信息。 ,后者通过向其他BlockManager的ConnectionManager建立连接,然后本节点向建立连接的BlockManager节点拉取数据;当BlockManager的相关信息发生变化的时候,BlockManager
然后在启动jbo的时候由Driver上的BlockManagerMaster对存在于Executor上的BlockManager统一管理,注册Executor的BlockManager、更新Executor 在Spark1.6时,Drvier的BlockManagerMaster与BlockManager之间的通信,不再是通过AkkaUtil,而是用了RpcEndpoint,也就木有了BlockManagerMasterActor BlockManagerMaster与BlockManager之间的通信已经使用RPC远程过程调用来实现,RPC相关配置参数如下: spark.rpc.retry.wait 3s(默认)等待时长 、 好的,我们继续,每个executor中的BlockManager的创建,都要经过BlockManagerMaster注册BlockManagerId. ? Executor或Driver自身的BlockMnager在初始化时,需要向Driver的BlockManager注册BlockMnager信息,注册的消息内容包括BlockMnagerI的d、时间戳、
Core存储体系中的内存存储和磁盘存储逻辑基本上讲完了,而负责将这些组件统一管理并发挥作用的就是BlockManager,那么从本文开始,我们就来逐渐探索它的细节…… No,还不急,本文还是来看先于BlockManager 顾名思义,它是负责管理各个BlockManager的。 之前提到过一句,BlockManager是典型的主从架构设计,不管Driver还是Executor上都要有BlockManager实例,那么必然就得存在一个协调组件——Spark中就是BlockManagerMaster blockLocations:维护块ID与持有对应块的BlockManager ID的映射关系。 由于所有的动作都对应到BlockManager的方法调用,所以我们在讲解BlockManager时,再来看这部分的具体实现。
BlockManagerMaster负责接受Executor上的BlockManager的注册以及管理BlockManager的元数据信息。 用于存放BlockManager的信息。 在创建SparkContext的时候,会调用SparkEnv.blockManager.initialize方法实例化BlockManager对象,在创建Executor对象的时候也会创建BlockManager )创建BlockManager对象,这个BlockManager就是Driver上的BlockManager,它负责管理集群中Executor上的BlockManager。 创建BlockManager的关键方法如下,完整的源代码你可以在BlockManager这个类中看到。
1)表示的是Executor的BlockManager与Driver的BlockManager进行消息通讯,比如注册BlockManager啊、更新BlockManager之类的。 主要是因为网上没找到又懒得画,so~~~ 块管理器BlockManager的实现 如结构图所示,BlockManager是其他所有BlockManager的抽象类(父类?接口? 在初始化时,需要向Driver的BlockManager注册BlockManager信息。 blockManager.externalShuffleServiceEnabled || (blockManager.blockManagerId ! 块管理器BlockManager 前面已经介绍了BlockManager中的主要组件,现在来看看BlockManager自身的实现。
当中有,直接从blockManager当中取。 下面我们的目标要放到blockManager。 BlockManager BlockManager这个类比较大,我们从两方面开始看吧,putBytes和get方法。 ok,下面就重点讲BlockManager和BlockManagerMaster之间的关系,以及BlockManager之间是如何相互传输数据。 只是写入到BlockManager,但是tellMaster为false的话,就相当于存在本地了,别的BlockManager是没法获取到的。 1、先通过MetaId从BlockManager里面取出来Meta信息。 2、通过Meta信息,构造分片id,去BlockManager里面取。
本文为 Spark 2.0 源码分析笔记,某些实现可能与其他版本有所出入 再次重申标题中的 Master 是指 Spark Storage 模块的 Master,是运行在 driver 上的 BlockManager 及其包含的 BlockManagerMaster、RpcEnv 及 RpcEndpoint 等;而 Slave 则是指 Spark Storage 模块的 Slave,是运行在 executor 上的 BlockManager 用于 Slave(executor 端 BlockManager) 向 Master(driver 端 BlockManager) 注册,触发时机: executor 端 BlockManager 在初始化时 BlockManager 中都存储了哪些 RDD 的哪些 block(对应 partition)以及各个 block 的信息 ---- case class BlockManagerHeartbeat (blockManagerId: BlockManagerId) 用于 Slave 向 Master 发心跳信息,以通知 Master 其上的某个 BlockManager 还存活着 ---- case
5.下来,创建BlockManager,BlockManager负责对Block的管理,只有在BlockManager的初始化方法initialize被调用后,它才是有效的。 BlockManager作为存储系统的一部分。这么就继续深入,围绕BlockManager进行阅读。 查阅资料,BlockManager主要由以下部分组成: ·shuffle客户端ShuffleClient; ·BlockManagerMaster(对存在于所有Executor上的BlockManager BlockManager的初始化: ? 那么BlockManager的实质运行机制如下图: ? (1)表示Executor 的BlockManager中的BlockManagerMaster与Driver的BlockManagerActor进行消息通信,比如注册BlockManager、更新Block
._ val blockManager = SparkEnv.get.blockManager if (! } 这个方法中涉及到了块管理器BlockManager,它是Spark存储子系统中的基础组件,我们现在暂时不考虑它,后面还会对它进行十分详尽的分析。 writeBlocks()方法的执行逻辑如下: 获取BlockManager实例,调用其putSingle()方法将广播数据作为单个对象写入本地存储。 = SparkEnv.get.blockManager blockManager.getLocalValues(broadcastId) match { case 再次调用BlockManager.putSingle()方法将广播数据作为单个对象写入本地存储,再将其加入广播缓存Map中,下次读取时就不用大费周章了。
概述 BlockManager是spark的存储子系统,spark涉及的RDD数据,shuffle数据,BroadCast广播变量等都是依托BlockManager来存取的。 Executor端的BlockManager把自己注册到Driver,这样driver就有了所有的Executor端的BlockManager的信息,包括地址和内存等资源状态。 Executor端的BlockManager也会把自己管理的所有Block信息上报给Driver端的BlockManager,这样Driver就提供了一个查询所有Block的元数据的服务。 ,当然也会注册BlockManager自身信息BlockManagerInfo到Driver端的BlockManager。 BlockManager启动过程中,通过RpcEnv实例化BlockManager的网络服务,以供其他Blockamanger进行访问。
driver 为什么会同时将 data 放到磁盘和 blockManager 里面? 该方法先去本地 blockManager 那里询问 bdata 的 data 在不在 blockManager 里面,如果不在就使用下面的两种 fetch 方式之一去将 data fetch 过来。 该方法先去本地 blockManager 那里询问 bdata 的 data 在不在 blockManager 里面,如果不在就使用 http 协议连接 driver 上的 httpServer,将 data 先询问所在的 executor 里的 blockManager 是会否包含 data(通过查询 data 的 broadcastId),包含就直接从本地 blockManager 读取 data。 对于第二个问题,每个 executor 都包含一个 blockManager 用来管理存放在 executor 里的数据,将公共数据存放在 blockManager 中(StorageLevel 为内存
和NameNodeResourceChecker,blockManager比较关键。 blockManager.isPopulatingReplQueues(); StartupProgress prog = NameNode.getStartupProgress(); ,blockManager负责管理文件系统中文件的物理块与实际存储位置的映射关系, // 是NameNode的核心功能之一。 blockManager.activate(conf, completeBlocksTotal); } finally { writeUnlock("startCommonServices" serviceAddress.getHostName() : ""; } blockManager.activate 启动blockManager.activate 主要是初始化blockManager
Storage 模块也是 Master/Slave 架构,Master 是运行在 driver 上的 BlockManager实例,Slave 是运行在 executor 上的 BlockManager Storage 模块 Master Slaves 架构.jpg 在 driver 端,创建 SparkContext 时会创建 driver 端的 SparkEnv,在构造 SparkEnv 时会创建 BlockManager ,而该 BlockManager 持有 RpcEnv 和 BlockManagerMaster。 rpcEndpointRef 来给 Storage Slave 发送消息下达命令的 而在 slave 端(各个 executor),同样会创建 SparkEnv,创建 SparkEnv 时同样会创建 BlockManager ,slave 端的 BlockManager 同样会持有 RpcEnv 以及 BlockManagerMaster。
task在运行的时候,想要使用广播变量中的数据,此时首先会在自己本地的Executor对应的BlockManager中尝试获取变量,如果本地没有,BlockManager就会从Driver或者其他节点的 BlockManager上远程拉取变量的复本,并由本地的BlockManager进行管理;之后此Executor的所有task都会直接从本地的BlockManager中获取变量。
紧接着blockManager的创建后创建。如下: ? 随之我们继续深入看这个broadcastManager是怎么创建与实现的。 ? 我们可以看到,在创建cacheManager对象的时候,传入了blockManager,真正的缓存对象,依旧是blockManager,cacheManager是为blockManager做了代理。 当迭代计算中,如果判断使用了缓存,就会调用getOrCompute,从blockManager.get(key)获取存储的block,如果存在,则封装new InterruptibleIterator返回 如果存储级别不允许使用内存,那么直接调用BlockManager的putIterator方法。
广播变量存储目前基于Spark实现的BlockManager分布式存储系统,Spark中的shuffle数据、加载HDFS数据时切分过来的block块都存储在BlockManager中,不是今天的讨论点 的writeBlocks方法) 2)每个executor在获取广播变量时首先从本地的BlockManager获取。 TorrentBroadcast会在driver端的BlockManager里面存储广播变量对象,并将广播对象分割成若干序列化block块(默认4M),存储于BlockManager。 TorrentBroadcast在executor端存储一个对象的同时会将获取的block存储于BlockManager,并向driver端的BlockManager汇报block的存储信息。 请求数据的时候会先获取block的所有存储位置信息,并且是随机的在所有存储了该executor的BlockManager去获取,避免了数据请求服务集中于一点。
它们与Spark的BlockManager紧密集成——BlockManager管理本地存储的元数据和实际数据块,当需要跨节点访问时,便委托BlockTransferService完成传输任务。 例如,它与BlockManager之间的交互是通过回调机制实现的:当传输服务需要读取或写入Block时,会调用BlockManager提供的接口来访问存储系统(内存、磁盘或外部存储)。 与BlockManager的协同工作 NettyBlockRpcServer并非独立运作,而是与BlockManager紧密协作。 当接收到Block请求时,服务器通过BlockManager获取本地存储的Block数据,BlockManager负责管理Block的存储、缓存和元数据信息。 与BlockManager的协作机制 在Spark分布式计算框架中,BlockTransferService与BlockManager的协作是保障数据高效传输和一致性的核心机制。
2) BlockManager BlockManager块管理者,是Spark架构中的一个模块,也是一个主从架构。 BlockManagerMaster,主对象,存在于Driver中。 无论在Driver端的BlockManager还是在Excutor端的BlockManager都含有四个对象: ① DiskStore:负责磁盘的管理。 ② MemoryStore:负责内存的管理。 d) 获取到磁盘小文件的地址后,会通过BlockManager中的ConnectionManager连接数据所在节点上的ConnectionManager,然后通过BlockTransferService