我不清楚我是否想向所有微批次上的顶级N位显示一些简单计数的密钥,在不使用UNIONing或JOINing的情况下,这是否有可能在标准火花流中实现!使用保存/持久化的数据集,同时考虑到当前处理可能没有位于前5位的进程密钥。
也就是说,如果当前处理初始微批的前5位是x、y、z、a、b,那么,如果下一个微批处理只以x、c、m作为数据,那么如果c和m在前5项中的出现次数少于这些优先级,那么我能检索a、b、y和z作为前5的一部分吗?
也许这是一个糟糕的用例。
发布于 2016-12-16 03:10:47
rdd1 = sc.parallelize(list('abcd')).map(lambda x: (x, 110 - ord(x)))
rdd2 = sc.parallelize(list('cdef')).map(lambda x: (x, 2))
rddQueue = ssc.queueStream([rdd1, rdd2])
def func(new_values, old_value):
return sum(new_values) + (old_value or 0)
rddQueue = rddQueue.updateStateByKey(func).transform(lambda x: x.sortBy(lambda y: y[1], ascending=False))
rddQueue.pprint()产出:
-------------------------------------------
Time: 2016-12-16 11:06:54
-------------------------------------------
('a', 13)
('b', 12)
('c', 11)
('d', 10)
-------------------------------------------
Time: 2016-12-16 11:06:57
-------------------------------------------
('a', 13)
('c', 13)
('b', 12)
('d', 12)
('f', 2)
('e', 2)“取回”意味着什么?
https://stackoverflow.com/questions/41171319
复制相似问题