如何将RDD[ArrayRow]转换为RDDRow
详细信息:
我有一些用例,我的解析函数对于某些数据返回类型ArrayRow,对于某些数据返回类型Row。如何将这两个文件都转换为RDDRow以供进一步使用?
代码示例
private def getRows(rdd: RDD[String], parser: Parser): RDD[Row] = {
var processedLines = rdd.map { line =>
map(p => parser.processBeacon(line) }
val rddOfRowsList = processedLines.map { x =>
x match {
case Right(obj) => obj.map { p =>
MyRow.getValue(p)
}//I can use flatmap here
case Left(obj) =>
MyRow.getValue(obj)
}//Cant use flatmap here
}
// Here I have to convert rddOfRowsList to RDD[Row]
//?????
val rowsRdd =?????
//
rowsRdd}
def processLine(logMap: Map[String, String]):Either[Map[String, Object], Array[Map[String, Object]]] =
{
//process
}发布于 2017-08-16 04:54:08
使用flatMap;
rdd.flatMap(identity)发布于 2017-08-16 15:56:08
您可以使用平面映射来获得新的rdd,然后使用联合来组合它们。
发布于 2017-08-16 16:38:03
使用flatMap扁平化RDD的内容
https://stackoverflow.com/questions/45701269
复制相似问题