我有一个名为DataFrame的链接,它在Row中具有动态的字段/列数量。但是,有些字段具有包含id的结构ClassNameId。
ClassNameId's总是String型的。
我有两个Datasets,每个都是不同类型的ClassName每个Dataset至少有字段id (String)和typeName (String),它们总是填充[ClassName]的字符串值。
例如,如果我有3 DataSets类型的A,B和C
链接:
+----+-----+-----+-----+ | id | AId | BId | CId | +----+-----+-----+-----+ | XX | A01 | B02 | C04 | | XY | null| B05 | C07 |
A:
+-----+----------+-----+-----+ | id | typeName | ... | ... | +-----+----------+-----+-----+ | A01 | A | ... | ... |
B:
+-----+----------+-----+-----+ | id | typeName | ... | ... | +-----+----------+-----+-----+ | B02 | B | ... | ... |
首选的最终结果将是Link Dataframe,其中每个Id要么被替换,要么由一个名为[ClassName]的字段追加,该字段封装了原始对象。
结果:
+----+----------------+----------------+----------------+ | id | A | B | C | +----+----------------+----------------+----------------+ | XX | A(A01, A, ...) | B(B02, B, ...) | C(C04, C, ...) | | XY | null | B(B05, B, ...) | C(C07, C, ...) |
Things我试过
Row,其中第一个元素是原始的Row,第二个是匹配的[ClassName],但是第二个迭代开始嵌套这些结果。试图使用map“取消”这些结果,要么会导致编码器混乱(因为结果的Row不是固定类型),要么编码太复杂,导致催化剂错误。任何想法都欢迎。
发布于 2017-06-08 15:50:56
所以我想出了如何做我想做的事。我做了一些改变,让它为我工作,但这是一个参考的目的,我将展示我的步骤,也许它可以是有用的,在未来的人?
case class Base(id: String, typeName: String)
case class A(override val id: String, override val typeName: String) extends Base(id, typeName)Dataframeval linkDataFrame = spark.read.parquet("[path]")DataFrame转换成可连接的东西,这意味着为连接的源创建一个占位符,以及将所有单个Id字段(AId、BId等)转换为源-> id的Map的方法。->有一个有用的map方法。此外,我们还需要将Base类转换为StructType,以便在编码器中使用。尝试了多种方法,但无法绕过特定的声明(否则就会产生错误)val linkDataFrame = spark.read.parquet("[path]")
case class LinkReformatted(ids: Map[String, Long], sources: Map[String, Base])
// Maps each column ending with Id into a Map of (columnname1 (-Id), value1, columnname2 (-Id), value2)
val mapper = linkDataFrame.columns.toList
.filter(
_.matches("(?i).*Id$")
)
.flatMap(
c => List(lit(c.replaceAll("(?i)Id$", "")), col(c))
)
val baseStructType = ScalaReflection.schemaFor[Base].dataType.asInstanceOf[StructType]DataFrame成为可能,Id的全部位于一个名为Id的字段中,并且在一个空的Map[String, Base]中为源创建了一个占位符val linkDatasetReformatted = linkDataFrame.select(
map(mapper: _*).alias("ids")
)
.withColumn("sources", lit(null).cast(MapType(StringType, baseStructType)))
.as[LinkReformatted]Datasets (A、B等)加入到这个经过重新格式化的链接数据集中。在这种尾递归方法中发生了很多事情。@tailrec
def recursiveJoinBases(sourceDataset: Dataset[LinkReformatted], datasets: List[Dataset[Base]]): Dataset[LinkReformatted] = datasets match {
case Nil => sourceDataset // Nothing left to join, return it
case baseDataset :: remainingDatasets => {
val typeName = baseDataset.head.typeName // extract the type from base (each field hase same value)
val masterName = "source" // something to name the source
val joinedDataset = sourceDataset.as(masterName) // joining source
.joinWith(
baseDataset.as(typeName), // with a base A,B, etc
col(s"$typeName.id") === col(s"$masterName.ids.$typeName"), // join on source.ids.[typeName]
"left_outer"
)
.map {
case (source, base) => {
val newSources = if (source.sources == null) Map(typeName -> base) else source.sources + (typeName -> base) // append or create map of sources
source.copy(sources = newSources)
}
}
.as[LinkReformatted]
recursiveJoinBases(joinedDataset, remainingDatasets)
}
}Dataset of LinkReformatted记录,其中对于ids中的每个对应的typeName -> id,字段是源字段中的对应typeName -> Base。对我来说就够了。我可以在这个最终的数据集上使用一些映射函数来提取我所需要的一切。我希望这能有所帮助。我知道这不是我想要的解决方案,也不是很简单。
https://stackoverflow.com/questions/44309128
复制相似问题