广播变量可以进行共享,但是不可以进行修改; Accumulators(累加器)是可以在不同任务中对同一个变量进行累加操作。
* * @param accumulators the accumulators (saved in a row) which contains the current * * * @param accumulators the accumulators (saved in a row) which contains the current * : Row, input: Row) /** * Initializes the accumulators and save them to a accumulators row * * @param a First row of accumulators * @param b The other row of accumulators * @return aggregated results */ def resetAccumulator(accumulators: Row) /** * Cleanup for the accumulators
,如果为空则,通过createAccumulators创建,获取当前对应key数量inputCnt,如果为空,则初始化为0 val input = inputC.row // get accumulators and input counter var accumulators = state.value() var inputCnt = cntState.value() if(null== accumulators ){ firstRow =true accumulators =function.createAccumulators() }else{ firstRow = , input) function.setAggregationResults(accumulators, newRow.row) }else{ inputCnt -=1 // retract input function.retract(accumulators, input) function.setAggregationResults(accumulators, newRow.row
同时在类对象构建的时候向我们的Accumulators注册了累加器。累加器的add操作的返回值类型和我们传入的值类型可以不一样。所以,我们一定要定义好如何累加和合并值。 也即add方法 object Accumulators: 该方法在Driver端管理着我们的累加器,也包含了特定累加器的聚合操作。 Accumulators.register(this) 2,Executor端的反序列化得到我们对象的过程 首先,我们的value_ 可以看到其并不支持序列化 @volatile @transient Other internal accumulators, such SQL // metrics, still need to register here. { def addInPlace(t1: Int, t2: Int): Int = t1 + t2 def zero(initialValue: Int): Int = 0 } 返回后更新了我们的Accumulators
= state.value() var inputCnt = cntState.value() if(null== accumulators){ if(! inputC.change){ return } firstRow =true accumulators =function.createAccumulators()// 并且会调用getValue获取当前的结果数据 if(inputC.change){ inputCnt +=1 // accumulate input function.accumulate(accumulators , input) function.setAggregationResults(accumulators, newRow.row)//会调用getValue }else{ inputCnt -=1 // retract input function.retract(accumulators, input) function.setAggregationResults(accumulators
同时在类对象构建的时候向我们的Accumulators注册了累加器。累加器的add操作的返回值类型和我们传入的值类型可以不一样。所以,我们一定要定义好如何累加和合并值。 也即add方法 object Accumulators: 该方法在Driver端管理着我们的累加器,也包含了特定累加器的聚合操作。 Accumulators.register(this) 2,Executor端的反序列化得到我们对象的过程 首先,我们的value_ 可以看到其并不支持序列化 @volatile @transient Other internal accumulators, such SQL // metrics, still need to register here. { def addInPlace(t1: Int, t2: Int): Int = t1 + t2 def zero(initialValue: Int): Int = 0 } 返回后更新了我们的Accumulators
lt;jobid>/vertices /jobs/<jobid>/config /jobs/<jobid>/exceptions /jobs/<jobid>/accumulators /<jobid>/vertices/<vertexid>/taskmanagers /jobs/<jobid>/vertices/<vertexid>/accumulators /jobs/<jobid>/vertices/<vertexid>/subtasks/accumulators /jobs/<jobid>/vertices/< /jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>/attempts/<attempt>/accumulators null,"timestamp":null,"all-exceptions":[],"truncated":false} [root@hadoop01 ~]# /jobs/<jobid>/accumulators
Weight的载入,以掩盖载入Weight的时间; 30GiB/s的带宽完成256×256Weight的载入需要大约1430个Cycles,也就意味着一组Weight至少需要计算1430Cycles,因此Accumulators 的深度需要为2K(1430取2的幂次,论文中给出的数值是1350,差异未知); 由于MAC和Activation模块之间需要同时进行计算,因此Accumulators需要用两倍存储来进行pingpang 设计,因此Accumulators中存储的深度设计为4k; 因此从硬件设计上来看,只要TPU ops/Weight Byte达到1400左右,理论上TPU就能以接近100%的效率进行计算。 因此此处将其矩阵乘法单元修改为32×32,其余数据位宽也进行相应修改,此类修改包括 Resource TPU SimpleTPU Matrix Multiply Unit 256*256 32*32 Accumulators 由于Weight FIFO实现上的困难(难以采用C语言描述), Weight采用1K*32*8b的BRAM存放,Pingpang使用; 由于Matrix Multiply Unit和Accumulators
boost库的官方文档,和我们下载下来的文档,是有路径的对应关系的官网URL样例:https://www.boost.org/doc/libs/1_86_0/doc/html/accumulators.html 我们下载下来的url样例:boost/1_86_0/doc/html/accumulators.html我们拷贝到我们项目中的样例:data/input/accumulators.html //我们把下载下来的 copy data/input/url head ="https://www.boost.org/doc/libs/1_86_0/doc/html";url tail = [data/input](删除)/accumulators.html -> url tail =/accumulators.htmlurl = url_head + url_tail ;相当于形成了一个官网链接!
currentTime); BaseRow currentKey = ctx.getCurrentKey(); boolean firstRow; BaseRow accumulators = accState.value(); if (null == accumulators) { firstRow = true; // $39类型的对象,在GroupAggsHandler$39对象内部的createAccumulators方法中会回调我们自定义的udf的createAccumulator()方法 accumulators function.createAccumulators(); } else { firstRow = false; } // set accumulators to handler first function.setAccumulators(accumulators); // get previous aggregate result
Spark 支持两种类型的共享变量:广播变量(broadcast variables),可以在内存的所有的结点上缓存变量;累加器(accumulators):只能用于做加法的变量,例如计数或求和。 Accumulators Accumulators are variables that are only “added” to through an associative operation and Spark natively supports accumulators of numeric types, and programmers can add support for new types. If accumulators are created with a name, they will be displayed in Spark’s UI. Accumulators do not change the lazy evaluation model of Spark.
Consequently, state access, * accumulators, broadcast variables and the distributed cache are disabled (String name, Accumulator<V, A> accumulator) { throw new UnsupportedOperationException("Accumulators Accumulator<V, A> getAccumulator(String name) { throw new UnsupportedOperationException("Accumulators >> getAllAccumulators() { throw new UnsupportedOperationException("Accumulators are not supported
输出结果 resultDataSet.print() //(张三,语文,50) //(李四,数学,70) //(王五,英文,86) } } 1.7 Flink Accumulators 通过累加器打印出多少个元素 参考代码 import org.apache.flink.api.common.JobExecutionResult import org.apache.flink.api.common.accumulators.IntCounter = { counter.add(1) value } }) resultDataSet.writeAsText("data/output/Accumulators Int]("MyAccumulator") println("累加器的值:"+MyAccumlatorValue) //累加器的值:4 } } Flink Broadcast 和 Accumulators 广播变量可以进行共享,但是不可以进行修改 Accumulators(累加器)是可以在不同任务中对同一个变量进行累加操作。
Consequently, state access, * accumulators, broadcast variables and the distributed cache are disabled (String name, Accumulator<V, A> accumulator) { throw new UnsupportedOperationException("Accumulators Accumulator<V, A> getAccumulator(String name) { throw new UnsupportedOperationException("Accumulators >> getAllAccumulators() { throw new UnsupportedOperationException("Accumulators are not supported
概述 累加器(Accumulators)是一个简单的构造器,具有加法操作和获取最终累加结果操作,在作业结束后可以使用。 com.qunar.innovation.data.bean.AdsPushBehavior; import com.qunar.innovation.data.utils.ConstantUtil; import org.apache.flink.api.common.accumulators.LongCounter
customer_id, c64_1, l3_balance_Status, sys_update_date, sys_creation_Date from accumulators c64_1, l3_balance_Status, sys_update_date, sys_creation_Date from ape1_accumulators
5、共享变量 Spark提供了两种限制的共享变量,Broadcast和Accumulators。 b5c40191-a864-4c7d-b9bf-d87e1a4e787c) scala> broadcastVar.value res0: Array[Int] = Array(1, 2, 3) (2)Accumulators
Spark中一共有两个共享变量:Broadcast Variables、Accumulators Broadcast Variables 广播变量是一个只读变量,存放后,在集群中任何节点都可以访问 Accumulators 累加器,功能和名字差不多,可以在并行的情况下高效的进行累加 参考 Spark Shuffle 原理 Spark Shuffle原理及相关调优 官方文档
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions() // Create internal accumulators if the stage has no accumulators initialized. // Reset internal accumulators only if this stage
['$i', 10]}}"))) ).forEach(printDocumentBlock, callbackWhenFinished); For $group operations use the Accumulators 对于 $group 操作使用 Accumulators 来处理任何 累加操作 。 下面的例子中,我们使用 Aggregates.group 结合 Accumulators.sum 来累加所有 i 的和。