首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >当当前微批没有新的此类数据时,从以前的微批处理中触发蒸汽化的updateStateByKey值

当当前微批没有新的此类数据时,从以前的微批处理中触发蒸汽化的updateStateByKey值
EN

Stack Overflow用户
提问于 2016-12-15 18:49:56
回答 1查看 2.9K关注 0票数 0

我不清楚我是否想向所有微批次上的顶级N位显示一些简单计数的密钥,在不使用UNIONing或JOINing的情况下,这是否有可能在标准火花流中实现!使用保存/持久化的数据集,同时考虑到当前处理可能没有位于前5位的进程密钥。

也就是说,如果当前处理初始微批的前5位是x、y、z、a、b,那么,如果下一个微批处理只以x、c、m作为数据,那么如果c和m在前5项中的出现次数少于这些优先级,那么我能检索a、b、y和z作为前5的一部分吗?

也许这是一个糟糕的用例。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2016-12-16 03:10:47

代码语言:javascript
复制
rdd1 = sc.parallelize(list('abcd')).map(lambda x: (x, 110 - ord(x)))
rdd2 = sc.parallelize(list('cdef')).map(lambda x: (x, 2))

rddQueue = ssc.queueStream([rdd1, rdd2])


def func(new_values, old_value):
    return sum(new_values) + (old_value or 0)


rddQueue = rddQueue.updateStateByKey(func).transform(lambda x: x.sortBy(lambda y: y[1], ascending=False))

rddQueue.pprint()

产出:

代码语言:javascript
复制
-------------------------------------------                                     
Time: 2016-12-16 11:06:54
-------------------------------------------
('a', 13)
('b', 12)
('c', 11)
('d', 10)

-------------------------------------------                                     
Time: 2016-12-16 11:06:57
-------------------------------------------
('a', 13)
('c', 13)
('b', 12)
('d', 12)
('f', 2)
('e', 2)

“取回”意味着什么?

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/41171319

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档