首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >将RDD从`org.apache.spark.rdd.RDD[((String,String),Double)]转换为`org.apache.spark.rdd.RDD[((String),List[Double])]

将RDD从`org.apache.spark.rdd.RDD[((String,String),Double)]转换为`org.apache.spark.rdd.RDD[((String),List[Double])]
EN

Stack Overflow用户
提问于 2014-12-16 16:44:16
回答 2查看 6.4K关注 0票数 0

我有一个RDD:

代码语言:javascript
复制
  val rdd: org.apache.spark.rdd.RDD[((String, String), Double)] =
    sc.parallelize(List(
      (("a", "b"), 1.0),
      (("a", "c"), 3.0),
      (("a", "d"), 2.0)
      )) 

我正在尝试将这个RDD从org.apache.spark.rdd.RDD[((String, String), Double)]类型转换为org.apache.spark.rdd.RDD[((String), List[Double])]

RDD中的每个键都应该是唯一的,并对其值进行排序。

因此,上面的rdd结构将转换为:

代码语言:javascript
复制
val newRdd : [((String), List[Double])] = RDD("a" , List(1,2,3))

要获得我使用的键的唯一列表:

代码语言:javascript
复制
val r2 : org.apache.spark.rdd.RDD[(String, Double)] =  rdd.map(m => (m._1._1 , m._2))

如何将每个键转换为包含排序双的列表?

整个代码:

代码语言:javascript
复制
import org.apache.spark.SparkContext;

object group {
  println("Welcome to the Scala worksheet")       //> Welcome to the Scala worksheet

  val conf = new org.apache.spark.SparkConf()
    .setMaster("local")
    .setAppName("distances")
    .setSparkHome("C:\\spark-1.1.0-bin-hadoop2.4\\spark-1.1.0-bin-hadoop2.4")
    .set("spark.executor.memory", "1g")           //> conf  : org.apache.spark.SparkConf = org.apache.spark.SparkConf@1bd0dd4

  val sc = new SparkContext(conf)                 //> 14/12/16 16:44:56 INFO spark.SecurityManager: Changing view acls to: a511381
                                                  //| ,
                                                  //| 14/12/16 16:44:56 INFO spark.SecurityManager: Changing modify acls to: a5113
                                                  //| 81,
                                                  //| 14/12/16 16:44:56 INFO spark.SecurityManager: SecurityManager: authenticatio
                                                  //| n disabled; ui acls disabled; users with view permissions: Set(a511381, ); u
                                                  //| sers with modify permissions: Set(a511381, )
                                                  //| 14/12/16 16:44:57 INFO slf4j.Slf4jLogger: Slf4jLogger started
                                                  //| 14/12/16 16:44:57 INFO Remoting: Starting remoting
                                                  //| 14/12/16 16:44:57 INFO Remoting: Remoting started; listening on addresses :[
                                                  //| akka.tcp://sparkDriver@LA342399.dmn1.fmr.com:51092]
                                                  //| 14/12/16 16:44:57 INFO Remoting: Remoting now listens on addresses: [akka.tc
                                                  //| p://sparkDriver@LA342399.dmn1.fmr.com:51092]
                                                  //| 14/12/16 16:44:57 INFO util.Utils: Successfully started service 'sparkDriver
                                                  //| ' on port 51092.
                                                  //| 14/12/16 16:44:57 INFO spark.SparkEnv: Registering MapOutputTracker
                                                  //| 14/12/16 16:44:57 INFO spark.SparkEnv:
                                                  //| Output exceeds cutoff limit.

  val rdd: org.apache.spark.rdd.RDD[((String, String), Double)] =
    sc.parallelize(List(
      (("a", "b"), 1.0),
      (("a", "c"), 3.0),
      (("a", "d"), 2.0)
      ))                                          //> rdd  : org.apache.spark.rdd.RDD[((String, String), Double)] = ParallelCollec
                                                  //| tionRDD[0] at parallelize at group.scala:15

     val r2 : org.apache.spark.rdd.RDD[(String, Double)] =  rdd.map(m => (m._1._1 , m._2))
                                                  //> r2  : org.apache.spark.rdd.RDD[(String, Double)] = MappedRDD[1] at map at gr
                                                  //| oup.scala:21

     val m1 = r2.collect                          //> 14/12/16 16:44:59 INFO spark.SparkContext: Starting job: collect at group.sc
                                                  //| ala:23
                                                  //| 14/12/16 16:44:59 INFO scheduler.DAGScheduler: Got job 0 (collect at group.s
                                                  //| cala:23) with 1 output partitions (allowLocal=false)
                                                  //| 14/12/16 16:44:59 INFO scheduler.DAGScheduler: Final stage: Stage 0(collect 
                                                  //| at group.scala:23)
                                                  //| 14/12/16 16:44:59 INFO scheduler.DAGScheduler: Parents of final stage: List(
                                                  //| )
                                                  //| 14/12/16 16:44:59 INFO scheduler.DAGScheduler: Missing parents: List()
                                                  //| 14/12/16 16:44:59 INFO scheduler.DAGScheduler: Submitting Stage 0 (MappedRDD
                                                  //| [1] at map at group.scala:21), which has no missing parents
                                                  //| 14/12/16 16:44:59 WARN util.SizeEstimator: Failed to check whether UseCompre
                                                  //| ssedOops is set; assuming yes
                                                  //| 14/12/16 16:44:59 INFO storage.MemoryStore: ensureFreeSpace(1584) called wit
                                                  //| h curMem=0, maxMem=140142182
                                                  //| 14/12/16 16:44:59 INFO storage.MemoryStore: Block broadcast_0 stored as valu
                                                  //| es in memory (estimated size 1584.0 B
                                                  //| Output exceeds cutoff limit.
     m1.foreach { case (e, i) => println(e + "," + i) }
                                                  //> a,1.0
                                                  //| a,3.0
                                                  //| a,2.0


}
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2014-12-16 17:27:13

使用groupByKey

代码语言:javascript
复制
val r3: RDD[String, Iterable[Double]] = r2.groupByKey

如果您真的希望第二个元素是List而不是一般的Iterable,那么您可以使用mapValues

代码语言:javascript
复制
val r4 = r3.mapValues(_.toList)

确保您的import org.apache.spark.SparkContext._在顶部,以便这些功能可用。

票数 1
EN

Stack Overflow用户

发布于 2014-12-16 18:17:42

嗨,与@Imm解决方案,您的值将不会排序,如果它发生将是一个牺牲品。要获得排序列表,只需添加:

val r4 =r3.mapValue(_.toList.sorted),因此r4将有一个rdd,其中每个值列表都将对每个键进行排序。

我希望这将是有用的

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/27509647

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档