我有一个RDD,它看起来像这样:
[((String, String, String), (String, String))]示例数据如下:
((10,1,a),(x,3))
((10,2,b),(y,5))
((11,2,b),
((11,3,c),(z,4))因此,如果key中的第二个字符串的值是2或3,则将其替换为2-3,如果它是1,或者如果rdd类似于第三个,则删除该rdd。
因此,预期输出如下所示:
((10,2-3,b),(y,5))
((11,2-3,c),(z,4))发布于 2017-11-13 16:15:18
给定输入数据为
val rdd = spark.sparkContext.parallelize(Seq(
(("10","1","a"),("x","3")),
(("10","2","b"),("y","5")),
(("11","2","b"),()),
(("11","3","c"),("z","4"))
))您可以执行以下操作来获得所需的输出,如
rdd.filter(x => x._1._2 != "1").filter(x => x._2 != ()).map(x => {
if(x._1._2 == "2" || x._1._2 == "3") ((x._1._1, "2-3", x._1._3), x._2)
else ((x._1._1, x._1._2, x._1._3), x._2)
})您的输出将是
((10,2-3,b),(y,5))
((11,2-3,c),(z,4))感谢philantrovert指出它必须是String而不是Int。
https://stackoverflow.com/questions/47258682
复制相似问题