首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >火花频率计数

火花频率计数
EN

Stack Overflow用户
提问于 2018-02-05 13:25:46
回答 1查看 297关注 0票数 0

我正在学习PySpark地图和缩减。我想要做的是使用mapreduce将rdd转换为一个频率计数,根据每个t的u列出现的次数。例如:

输入:

代码语言:javascript
复制
 rdd = [u"(u't1', u'u1', 0.8)",
       u"(u't1', u'u2', 0.1)",
       u"(u't1', u'u3', 0.3)",
       u"(u't1', u'u4', 0.4)",
       u"(u't2', u'u1',  0.8)",
       u"(u't2', u'u2',  0.3)"]

产出:

代码语言:javascript
复制
output= u"(u't1', u' u1', 0.8, 4)",
       u"(u't1', u' u2', 0.1, 4)",
       u"(u't1', u' u3', 0.3, 4)",
       u"(u't1', u' u4', 0.4, 4)",
       u"(u't2', u' u1',  0.8, 2)",
       u"(u't2', u' u2',  0.3, 2)"]

我试着做

代码语言:javascript
复制
 rdd.map(lambda row: ((row[0], (row[1], row[2])), 1)).\
         reduceByKey(lambda (a1,b1,c1),(a2,b2,c2): (a1+a2,b1+b2,c1+c2))

但working.error消息的行数太多而无法解压。有什么建议吗?谢谢。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-02-05 14:31:02

一种可能是

代码语言:javascript
复制
>>> rdd = sc.parallelize([('t1', 'u1', 0.8),
...        ('t1', 'u2', 0.1),
...        ('t1', 'u3', 0.3),
...        ('t1', 'u4', 0.4),
...        ('t2', 'u1',  0.8),
...        ('t2', 'u2',  0.3)])

>>> rdd1 = rdd.map(lambda r: (r[0],(r[1],r[2])))
>>> rdd2 = sc.parallelize(rdd.map(lambda r: (r[0],(r[1],r[2]))).countByKey().items())

>>> rdd1.join(rdd2).map(lambda (a,((b,c),d)): (a,b,c,d)).collect()              
[('t2', 'u1', 0.8, 2), ('t2', 'u2', 0.3, 2), ('t1', 'u1', 0.8, 4), ('t1', 'u2', 0.1, 4), ('t1', 'u3', 0.3, 4), ('t1', 'u4', 0.4, 4)]
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48623626

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档