我有两个数据集如下所示。每个数据集在每一行中都有",“分隔的数字。
数据集1 1,2,0,8,0 2,0,9,0,3 数据集2 7,5,4,6,3 4,9,2,1,8
我必须用数据集2的对应值替换第一个数据集的零。
结果会是这样
1,2,4,8,3 2,9,9,1 3
我用下面的代码替换了值。
val rdd1 = sc.textFile(dataset1).flatMap(l => l.split(","))
val rdd2 = sc.textFile(dataset2).flatMap(l => l.split(","))
val result = rdd1.zip(rdd2).map( x => if(x._1 == "0") x._2 else x._1)我得到的输出是RDDString格式的。但是我需要RDD[ArrayString]格式的输出,因为这种格式更适合我的进一步转换。
发布于 2016-04-28 19:34:35
如果您想要一个RDD[Array[String]],其中数组的每个元素对应于一条线,那么在拆分后不要平面映射这些值,只需映射它们。
scala> val rdd1 = sc.parallelize(List("1,2,0,8,0", "2,0,9,0,3")).map(l => l.split(","))
rdd1: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[1] at map at <console>:27
scala> val rdd2 = sc.parallelize(List("7,5,4,6,3", "4,9,2,1,8")).map(l => l.split(","))
rdd2: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[3] at map at <console>:27
scala> val result = rdd1.zip(rdd2).map{case(arr1, arr2) => arr1.zip(arr2).map{case(v1, v2) => if(v1 == "0") v2 else v1}}
result: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[5] at map at <console>:31
scala> result.collect
res0: Array[Array[String]] = Array(Array(1, 2, 4, 8, 3), Array(2, 9, 9, 1, 3))或者不那么冗长:
val result = rdd1.zip(rdd2).map(t => t._1.zip(t._2).map(x => if(x._1 == "0") x._2 else x._1))https://stackoverflow.com/questions/36923266
复制相似问题