首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >火花: GroupBy后的TOPN

火花: GroupBy后的TOPN
EN

Stack Overflow用户
提问于 2016-03-10 13:44:25
回答 5查看 2.4K关注 0票数 1

我有一个RDD P映射到该类:

代码语言:javascript
复制
case class MyRating(userId:Int, itemId:Int, rating:Double)

我感兴趣的是为每个用户找到TopN条目,即GroupBy userId,以及在每个形成的组内,过滤出基于最高等级的TopN (例如10)条目。

我做了以下工作:

代码语言:javascript
复制
val A : RDD[((Int), Iterable[MyRating])] = P.keyBy(r => (r.userId)).groupByKey
val B : RDD[((Int), List[MyRating])] = key.mapValues(iter => iter.toList.sortBy(_.rating, false))
val C = values.groupByKey.take(10)

显然,在.take( 10 )之后应用groupByKey只会给我留下10个键(用户),不会过滤掉每个用户的top10评分。

我们如何将.take(N)应用到groupBy之后,从而使其作用于值的某一部分而不是键本身?

EN

回答 5

Stack Overflow用户

发布于 2016-03-10 14:05:45

一种天真的方法是采用n个值:

代码语言:javascript
复制
B.mapValues(_.take(n))

但是,如果您只需要一小部分值,最好使用aggregateByKey并在运行时删除过时的记录,而不是对所有内容进行分组。您可能希望在实践中获得更有效的东西(您可以检查top /takeOrdered的Spark ),但您可以从以下内容开始:

代码语言:javascript
复制
import scala.math.Ordering
import scala.collection.mutable.PriorityQueue

implicit val ord = Ordering.by[MyRating, Double](_.rating)

val pairs = rdd.keyBy(_.userId)
pairs.aggregateByKey(new scala.collection.mutable.PriorityQueue[MyRating]())(
  (acc, x) => {
    acc.enqueue(x)
    acc.take(n)
  },
  (acc1, acc2) => (acc1 ++ acc2).take(n)
)

请注意,上面的代码段需要Scala 2.11+,因为SI-7568

票数 3
EN

Stack Overflow用户

发布于 2016-03-10 14:05:55

如果我正确理解,您需要做的是:按用户id对RDD进行分组,然后对每个(id,list)元组返回Id,并将列表排序并裁剪为10个元素。

代码语言:javascript
复制
P
  .groupBy(_.userId)  
  .map{ case (key, it) => 
    (key, it.toList.sortBy(mr => -mr.rating).take(10)) 
  }
票数 3
EN

Stack Overflow用户

发布于 2016-03-10 14:42:26

您非常接近,但是您需要在A到B的映射中获取顶部-N项,例如,如果您想从列表中获取前2位MyRating项,下面的代码就可以做到这一点。B是一个RDD,其中包含每个MyRatings的前2位userId的列表。(此外,sortBy函数只需使评级为负值即可工作)。

代码语言:javascript
复制
case class MyRating(userId:Int, itemId:Int, rating:Double)

val plist:List[MyRating] = List(MyRating(1,0,1),MyRating(1,1,5),MyRating(1,2,7),MyRating(1,3,9),MyRating(1,4,10),MyRating(2,5,1),MyRating(2,6,5),MyRating(2,6,7))
val P: org.apache.spark.rdd.RDD[MyRating] = sc.parallelize(plist)

val A : RDD[((Int), Iterable[MyRating])] = P.keyBy(r => (r.userId)).groupByKey
val TOPCOUNT = 2
val B : RDD[((Int), List[MyRating])] = A.mapValues(iter => iter.toList.sortBy(- _.rating).take(TOPCOUNT))
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/35918262

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档