employee.txt :
100|Surender
101|Rajasalary.txt :
100|2016-JAN|15000
100|2016-FEB|15000嗨
我正在使用scala在sparkcore中进行一些基本操作。
要求计算每个员工的总薪资。如果员工在薪资文件中没有匹配的记录,则将其薪资显示为0。
我试过下面的代码。我可以去参加,但我不知道如何读懂“无”和“一些”,也无法继续下去。
有人能帮我达到预期的产量吗。
scala> val empRDD = sc.textFile("/user/cloudera/inputfiles/employee.txt")
scala> val salaryRDD = sc.textFile("/user/cloudera/inputfiles/salary.txt")
scala> val empMapRDD = empRDD.map( elem => elem.split("\\|"))
scala> val salaryMapRDD = salaryRDD.map(elem => elem.split("\\|"))
scala> val empKeyValueRDD = empMapRDD.map(elem => (elem(0),elem(1))
scala> val salaryKeyValueRDD = salaryMapRDD.map(elem => (elem(0),elem(2)))
scala> val joinedRDD = empKeyValueRDD.leftOuterJoin(salaryKeyValueRDD)
scala> joinedRDD.collect
res3: Array[(String, (String, Option[String]))] = Array((101,(Raja,None)), (100,(Surender,Some(15000))), (100,(Surender,Some(15000))))预期输出:
Array((100,Surender,30000), (101,Raja,0))发布于 2017-02-20 12:51:19
val joinedRDD = empKeyValueRDD.leftOuterJoin(salaryKeyValueRDD)
.groupBy(x => (x._1, x._2._1))
.map(r => {
val sal = r._2.map(x => x._2._2 match {
case None => 0
case Some(num) => num.toLong
}).sum
(r._1._1, r._1._2, sal)
})
println(joinedRDD.collect.toList)
//List((100,Surender,30000), (101,Raja,0))在groupBy(x => (x._1, x._2._1))之后,中间数据将如下所示
List(
((100,Surender),CompactBuffer(
(100,(Surender,Some(15000))),
(100,(Surender,Some(15000))))
),
((101,Raja),CompactBuffer(
(101,(Raja,None)))
)
)发布于 2017-02-20 11:11:51
我尝试了下面的代码样式,并得到了结果
...
scala> joinedRDD.map( elem => ((elem._1, elem._2._1),elem._2._2 match { case Some(i) => i.toInt case None => 0 } ) ).reduceByKey((x,y) => x+y).map(elem => (elem._1._1,elem._1._2,elem._2)).collect产出:
Array[(String, String, Int)] = Array((100,Surender,30000), (101,Raja,0))如果有其他方法达到同样的效果,请告诉我。
https://stackoverflow.com/questions/42341980
复制相似问题