我有一个关于TopK的问题要用Spark来解决。
源文件是这样的:
baoshi,13
xinxi,80
baoshi,99
xinxi,32
baoshi,50
xinxi,43
baoshi,210
xinxi,100下面是我的代码:
import org.apache.spark.{SparkConf, SparkContext}
object TopKTest {
def main(args: Array[String]): Unit = {
val file = "file:///home/hadoop/rdd-test/TopK3.txt"
val conf = new SparkConf().setAppName("TopKTest").setMaster("local")
val sc = new SparkContext(conf)
val txt = sc.textFile(file)
val rdd2 =txt.map(line=>(line.split(",")(0)
,line.split(",")(1).trim))
val rdd=rdd2.groupByKey()
val rdd1 = rdd.map(line=> {
val f = line._1
val s = line._2
val t = s.toList.sortWith(_ > _).take(2)
(f, t)
})
rdd1.foreach(println)
}
}预期的结果是:
(xinxi,List(100, 80))
(baoshi,List(210, 99))发布于 2018-07-21 18:56:12
这是因为您比较的是Strings,而不是数字。
变化
val rdd2 =txt.map(line=>(line.split(",")(0)
,line.split(",")(1).trim))至
val rdd2 =txt.map(line=>(line.split(",")(0)
,line.split(",")(1).trim.toLong))发布于 2018-07-22 22:44:41
下面是方法:
scala> import org.apache.spark.mllib.rdd.MLPairRDDFunctions._
import org.apache.spark.mllib.rdd.MLPairRDDFunctions._
scala> val rdd = spark.sparkContext.textFile("D:\\test\\input.txt")
rdd: org.apache.spark.rdd.RDD[String] = D:\test\input.txt MapPartitionsRDD[1] at textFile at <console>:26
scala> rdd.foreach(println)
xinxi,43
baoshi,13
baoshi,210
xinxi,80
xinxi,100
baoshi,99
xinxi,32
baoshi,50
scala> val rdd1 = rdd.map(row => (row.split(",")(0), row.split(",")(1).toInt))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[2] at map at <console>:28
scala> val rdd2 = rdd1.topByKey(2)
rdd2: org.apache.spark.rdd.RDD[(String, Array[Int])] = MapPartitionsRDD[4] at mapValues at MLPairRDDFunctions.scala:50
scala> val rdd3 = rdd2.map(m => (m._1, m._2.toList))
rdd3: org.apache.spark.rdd.RDD[(String, List[Int])] = MapPartitionsRDD[5] at map at <console>:32
scala> rdd3.foreach(println)
(xinxi,List(100, 80))
(baoshi,List(210, 99))https://stackoverflow.com/questions/51455192
复制相似问题