首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何更新worker节点上的SPARK broadcast值?

如何更新worker节点上的SPARK broadcast值?
EN

Stack Overflow用户
提问于 2018-01-23 10:59:16
回答 2查看 909关注 0票数 2

我有一个从数据库中获取的广播值。我定义了驱动程序主机上的广播

代码语言:javascript
复制
val stampsBroadcast = ssc.sparkContext.broadcast(stampListMap)

此值(stampsBroadcast.value)用于工作节点执行器。一旦执行器完成任务(向数据库添加新密钥)。我需要更新广播值来添加这个新密钥。

我试着使用:

代码语言:javascript
复制
stampsBroadcast.unpersist(false)
ssc.sparkContext.broadcast(NewstampsBroadcastValue)

但是似乎我不能在worker节点上使用ssc。如果我在驱动程序主节点上重新广播,我如何从工作节点获取新数据?

EN

回答 2

Stack Overflow用户

发布于 2018-01-23 12:02:49

您不能从工作节点创建广播变量。

在您的例子中,基本上您需要Accumulators。定义驱动程序上的累加器。在工作节点上,可以更新累加器的值。同样,您可以获取驱动程序上的更新值。

注:您不能在工作节点上检索累加器的值。从工作节点只能更新值。

下面是spark docs中的一个例子:

代码语言:javascript
复制
// creating the accumulator on driver
scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

// updating the accumulator on worker nodes
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

// fetaching the value
scala> accum.value
res2: Long = 10
票数 2
EN

Stack Overflow用户

发布于 2018-12-17 14:36:20

第一:将spark更新为spark2.3

第二:将文件流作为文件流(sparkStream.readStream.textFile('/../my.txt')...load()),文件流可以自我更新内容。如果现在使用stream join static,则可以在spark2.3+中使用stream join stream。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48393506

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档