首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >星火联接数据集和数据集

星火联接数据集和数据集
EN

Stack Overflow用户
提问于 2017-06-01 13:38:34
回答 1查看 1.8K关注 0票数 0

我有一个名为DataFrame链接,它在Row中具有动态的字段/列数量。但是,有些字段具有包含id的结构ClassNameId

ClassNameId's总是String型的。

我有两个Datasets,每个都是不同类型的ClassName每个Dataset至少有字段id (String)和typeName (String),它们总是填充[ClassName]的字符串值。

例如,如果我有3 DataSets类型的ABC

链接:

+----+-----+-----+-----+ | id | AId | BId | CId | +----+-----+-----+-----+ | XX | A01 | B02 | C04 | | XY | null| B05 | C07 |

A:

+-----+----------+-----+-----+ | id | typeName | ... | ... | +-----+----------+-----+-----+ | A01 | A | ... | ... |

B:

+-----+----------+-----+-----+ | id | typeName | ... | ... | +-----+----------+-----+-----+ | B02 | B | ... | ... |

首选的最终结果将是Link Dataframe,其中每个Id要么被替换,要么由一个名为[ClassName]的字段追加,该字段封装了原始对象。

结果:

+----+----------------+----------------+----------------+ | id | A | B | C | +----+----------------+----------------+----------------+ | XX | A(A01, A, ...) | B(B02, B, ...) | C(C04, C, ...) | | XY | null | B(B05, B, ...) | C(C07, C, ...) |

Things我试过

  • 递归呼叫joinWith。第一个调用成功地返回一个tuple/Row,其中第一个元素是原始的Row,第二个是匹配的[ClassName],但是第二个迭代开始嵌套这些结果。试图使用map“取消”这些结果,要么会导致编码器混乱(因为结果的Row不是固定类型),要么编码太复杂,导致催化剂错误
  • 加入,因为RDD还不能解决这个问题。

任何想法都欢迎。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-06-08 15:50:56

所以我想出了如何做我想做的事。我做了一些改变,让它为我工作,但这是一个参考的目的,我将展示我的步骤,也许它可以是有用的,在未来的人?

  1. 首先,我声明了一个数据类型,它共享了我感兴趣的A、B、C等所有属性,并使类从这个超级类型扩展
代码语言:javascript
复制
case class Base(id: String, typeName: String)
case class A(override val id: String, override val typeName: String) extends Base(id, typeName)
  1. 接下来,我加载链接Dataframe
代码语言:javascript
复制
val linkDataFrame = spark.read.parquet("[path]")
  1. 我想要将这个DataFrame转换成可连接的东西,这意味着为连接的源创建一个占位符,以及将所有单个Id字段(AId、BId等)转换为源-> id的Map的方法。->有一个有用的map方法。此外,我们还需要将Base类转换为StructType,以便在编码器中使用。尝试了多种方法,但无法绕过特定的声明(否则就会产生错误)
代码语言:javascript
复制
val linkDataFrame = spark.read.parquet("[path]")

case class LinkReformatted(ids: Map[String, Long], sources: Map[String, Base])

// Maps each column ending with Id into a Map of (columnname1 (-Id), value1, columnname2 (-Id), value2)
val mapper = linkDataFrame.columns.toList
  .filter(
    _.matches("(?i).*Id$")
  )
  .flatMap(
    c => List(lit(c.replaceAll("(?i)Id$", "")), col(c))
)

val baseStructType = ScalaReflection.schemaFor[Base].dataType.asInstanceOf[StructType]
  1. 所有这些部分使创建一个新的DataFrame成为可能,Id的全部位于一个名为Id的字段中,并且在一个空的Map[String, Base]中为创建了一个占位符
代码语言:javascript
复制
val linkDatasetReformatted = linkDataFrame.select(
    map(mapper: _*).alias("ids")
  )
  .withColumn("sources", lit(null).cast(MapType(StringType, baseStructType)))
  .as[LinkReformatted]
  1. 下一步是将所有源Datasets (A、B等)加入到这个经过重新格式化的链接数据集中。在这种尾递归方法中发生了很多事情。
代码语言:javascript
复制
@tailrec
def recursiveJoinBases(sourceDataset: Dataset[LinkReformatted], datasets: List[Dataset[Base]]): Dataset[LinkReformatted] = datasets match {
  case Nil => sourceDataset // Nothing left to join, return it
  case baseDataset :: remainingDatasets => {

    val typeName = baseDataset.head.typeName // extract the type from base (each field hase same value)
    val masterName = "source" // something to name the source

    val joinedDataset = sourceDataset.as(masterName) // joining source
      .joinWith(
      baseDataset.as(typeName), // with a base A,B, etc
      col(s"$typeName.id") === col(s"$masterName.ids.$typeName"), // join on source.ids.[typeName]
      "left_outer"
    )
      .map {
        case (source, base) => {
          val newSources = if (source.sources == null) Map(typeName -> base) else source.sources + (typeName -> base) // append or create map of sources          
          source.copy(sources = newSources)
        }
      }
      .as[LinkReformatted]
    recursiveJoinBases(joinedDataset, remainingDatasets)
  }
}
  1. 现在您将得到一个Dataset of LinkReformatted记录,其中对于ids中的每个对应的typeName -> id字段是字段中的对应typeName -> Base。对我来说就够了。我可以在这个最终的数据集上使用一些映射函数来提取我所需要的一切。

我希望这能有所帮助。我知道这不是我想要的解决方案,也不是很简单。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/44309128

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档