首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在Spark中使用Scala连接两个RDDs

在Spark中使用Scala连接两个RDDs
EN

Stack Overflow用户
提问于 2017-02-09 22:41:07
回答 1查看 603关注 0票数 0

我正在尝试在Spark上实现局部异常因子。所以我有一组从文件中读取的点,然后为每个点找到N个最近的邻居。使用zipWithIndex()命令为每个点指定一个索引

所以现在我有两个RDDs首先

代码语言:javascript
复制
RDD[(Index:Long, Array[(NeighborIndex:Long, Distance:Double)])]

其中,长整型表示其索引,数组由其N个最近邻域组成,长整型表示这些邻域的索引位置,双精度表示它们与给定点的距离

第二

RDD[(Index:Long,LocalReachabilityDensity:Double)]

这里,Long再次表示给定点的索引,Double表示其局部可达性密度

我想要的是一个RDD,它包含所有的点,以及它们的N个最近邻居和它们的局部可达性密度的数组

代码语言:javascript
复制
RDD[(Index:Long, Array[(NeighborIndex:Long,LocalReachabilityDensity:Double)])]

因此,基本上在这里,Long将表示一个点的索引,该数组将是它的N个最近邻居,以及它们的索引值和局部可达性密度。

根据我的理解,我需要在第一个RDD上运行一个map,然后将其数组中的值与包含Local Reachability的第二个RDD连接起来,以获得其N个邻居的所有给定索引的Local Reachability。但我不确定如何实现这一点。如果有人能帮我,那就太好了

EN

回答 1

Stack Overflow用户

发布于 2017-02-10 07:20:25

给定:

代码语言:javascript
复制
val rdd1: RDD[(index: Long, Array[(neighborIndex: Long, distance: Double)])] = ...
val rdd2: RDD[(index: Long, localReachabilityDensity: Double)] = ...

我真的一点也不喜欢使用Scala的Array。我也不喜欢您的抽象是跨用途的;换句话说,rdd2中的index被隐藏在rdd1中的各种条目中。这使得事情很难理解,也招致了Spark RDD API的限制,在转换第一个RDD时,您不能访问第二个RDD。我认为你应该重写你当前的工作,以产生更容易使用的抽象。

但如果你一定要这样做:

代码语言:javascript
复制
val flipped = rdd1.map { 
  case (index, array) => 
    array.map {
      case (neighborIndex, distance) => (neighborIndex, (index, distance))
    }.elements.toVector
}.flatMap(identity)
 .groupBy(_._1)
val result = flipped.join(rdd2).mapValues {
   case (indexDistances, localReachabilityDensity) => 
      indexDistances.map {
         case (index, _) => (index, localReachabilityDensity)
      }    
}

其基本思想是翻转rdd1以将neighborIndex值“提取”到顶层作为PairRDD的键,然后允许我使用rdd2执行join。并用Vector替换Array。一旦你在相同的索引上做了连接,组合起来就容易多了。

请注意,这是我突然想到的,可能并不完美。这个想法并不是要给你一个复制-粘贴的解决方案,而是建议一个不同的方向。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/42139566

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档