我有一个包含异构StructType的数据集,可以按类型分组并对其应用JSON。例如,RDD[(Type, JSON)]和Set[Type],包含原始RDD中的所有类型。
现在,我想将这些JSON写入到一个类型化的Parquet文件中,并按类型进行分区。
我现在做的是:
val getSchema: Type => StructType = ???
val buildRow: StructType => JSON => Row = ???
types.foreach { jsonType =>
val sparkSchema: StructType = getSchema(jsonType)
val rows: RDD[Row] = rdd
.filter(k => k == jsonType)
.map { case (_, json) => buildRow(sparkSchema)(json) }
spark.createDataFrame(rows, sparkSchema).write.parquet(path)
}它可以工作,但效率非常低,因为它需要多次遍历原始RDD -我通常有几十甚至数百种类型。
有没有更好的解决方案?我尝试联合数据帧,但失败了,因为不能合并具有不同属性的行。或者至少我可以通过取消持久化数据帧来释放它所占用的资源?
发布于 2021-10-05 06:36:32
我会想到一些选项(有些可能适合您的需求,也可能不适合您的需求)。
JSON如果不同的类型不需要是单独的文件,你可以使用像{type1col : StructType1, type2 : StructType2, etc}这样的模式,写出只有一个结构列是populated.
发布于 2021-10-07 08:03:49
这将有助于加速拼图文件的写入。由于types的类型为Set[Type],因此foreach循环将按顺序发生。这意味着拼图文件是一个接一个地写的(不是并行的)。由于每次拼接文件写入都是相互独立的,因此如果一次写入多个文件应该没有问题。
使用scala并行集合(参见:https://docs.scala-lang.org/overviews/parallel-collections/overview.html)将是实现这一目标的最简单方法。尝试编辑第4行以:
types.par.foreach { jsonType =>如果原始RDD被多次访问,也要缓存它。
https://stackoverflow.com/questions/69361566
复制相似问题