首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spark:来自异构数据的编写包

Spark:来自异构数据的编写包
EN

Stack Overflow用户
提问于 2021-09-28 12:14:13
回答 2查看 112关注 0票数 2

我有一个包含异构StructType的数据集,可以按类型分组并对其应用JSON。例如,RDD[(Type, JSON)]Set[Type],包含原始RDD中的所有类型。

现在,我想将这些JSON写入到一个类型化的Parquet文件中,并按类型进行分区。

我现在做的是:

代码语言:javascript
复制
val getSchema: Type => StructType = ???
val buildRow: StructType => JSON => Row = ???

types.foreach { jsonType =>
  val sparkSchema: StructType = getSchema(jsonType)
  val rows: RDD[Row] = rdd
    .filter(k => k == jsonType)
    .map { case (_, json) => buildRow(sparkSchema)(json) }
  spark.createDataFrame(rows, sparkSchema).write.parquet(path)
}

它可以工作,但效率非常低,因为它需要多次遍历原始RDD -我通常有几十甚至数百种类型。

有没有更好的解决方案?我尝试联合数据帧,但失败了,因为不能合并具有不同属性的行。或者至少我可以通过取消持久化数据帧来释放它所占用的资源?

EN

回答 2

Stack Overflow用户

发布于 2021-10-05 06:36:32

我会想到一些选项(有些可能适合您的需求,也可能不适合您的需求)。

JSON如果不同的类型不需要是单独的文件,你可以使用像{type1col : StructType1, type2 : StructType2, etc}这样的模式,写出只有一个结构列是populated.

  • Repartition数据的文件,以获得同一分区(+某种次键)中相同类型的所有
  1. 对象,并写出分区的数据。然后读入它并按类型过滤(如果它是分区写出来的,你只需要加载数据一次)。这将只需要读取数据2倍(但混洗可以是expensive).
  2. You可以创建一个自定义的WriteBatch,操作在( type,JSON)的行上,在写入之前应用转换,并为每种类型保留一个打开的文件句柄。如果您需要单独的文件,这可能是最有效的,但需要相当多的代码来维护和调试。
票数 2
EN

Stack Overflow用户

发布于 2021-10-07 08:03:49

这将有助于加速拼图文件的写入。由于types的类型为Set[Type],因此foreach循环将按顺序发生。这意味着拼图文件是一个接一个地写的(不是并行的)。由于每次拼接文件写入都是相互独立的,因此如果一次写入多个文件应该没有问题。

使用scala并行集合(参见:https://docs.scala-lang.org/overviews/parallel-collections/overview.html)将是实现这一目标的最简单方法。尝试编辑第4行以:

代码语言:javascript
复制
types.par.foreach { jsonType =>

如果原始RDD被多次访问,也要缓存它。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69361566

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档