首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在迭代星火数据集记录时添加多个列

在迭代星火数据集记录时添加多个列
EN

Stack Overflow用户
提问于 2017-07-27 13:44:28
回答 2查看 1K关注 0票数 0

火花2.1.x在这里。我有一堆JSON文件(有相同的模式),我正在读到一个星火Dataset,如下所示:

代码语言:javascript
复制
val ds = spark.read.json("some/path/to/lots/of/json/*.json")

然后,我可以打印ds模式,并看到所有内容都已正确读取:

代码语言:javascript
复制
ds.printSchema()

// Outputs:
root
 |-- fizz: boolean (nullable = true)
 |-- moniker: string (nullable = true)
 |-- buzz: string (nullable = true)
 |-- foo: string (nullable = true)
 |-- bar: string (nullable = true)

请注意moniker字符串列。我现在想:

  1. 向此数据集和/或其模式中添加三个新列;(a)名为special_date的日期/时间列;(b)名为special_uuid的UUID列;(c)名为special_phrase的字符串列;然后
  2. 我需要迭代ds中的所有记录,对于每个记录,将其moniker值传递给三个后续函数:(a) deriveSpecialDate(val moniker : String) : Date、(b) deriveSpecialUuid(val moniker : String) : UUID和(c) deriveSpecialPhrase(val moniker : String) : String。然后,每个函数的输出都需要成为相应列的记录的值。

我最好的尝试:

代码语言:javascript
复制
val ds = spark.read.json("some/path/to/lots/of/json/*.json")

ds.foreach(record => {
  val moniker : String = record.select("moniker")
  val specialDate : Date = deriveSpecialDate(moniker)
  val specialUuid : UUID = deriveSpecialUuid(moniker)
  val specialPhrase : String = deriveSpecialPhrase(moniker)

  // This doesn't work because special_* fields don't exist in the original
  // schema dervied from the JSON files. We're ADDING these columns after the
  // JSON read and then populating their values dynamically.
  record.special_date = specialDate
  record.special_uuid = specialUuid
  record.special_phrase = specialPhrase
})

知道如何做到这一点吗?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2017-07-27 14:25:48

我将使用spark中的udf (用户定义函数)来增强原始数据集的3列。

代码语言:javascript
复制
val deriveSpecialDate = udf((moniker: String) => // implement here)
val deriveSpecialUuid= udf((moniker: String) => // implement here)
val deriveSpecialPhrase = udf((moniker: String) => // implement here)

在那之后,你可以做这样的事情:

代码语言:javascript
复制
ds.withColumn("special_date", deriveSpecialDate(col("moniker)))
.withColumn("special_uuid", deriveSpecialUuid(col("moniker)))
.withColumn("special_phrase", deriveSpecialPhrase (col("moniker)))

它将为您带来三列的新数据格式。如果需要,还可以使用map函数将其转换为数据集。

票数 1
EN

Stack Overflow用户

发布于 2017-07-27 14:48:45

若要创建新列,可以使用withColumn。如果您已经有了一个函数,则需要将该函数注册为UDF (用户定义函数)。

代码语言:javascript
复制
val sd = sqlContext.udf.register("deriveSpecialDate",deriveSpecialDate _ )
val su = sqlContext.udf.register("deriveSpecialUuid",deriveSpecialUuid _ )
val sp = sqlContext.udf.register("deriveSpecialPhrase", deriveSpecialPhrase _)

要使用这个udf,您需要在列中创建一个新列,如

代码语言:javascript
复制
ds.withColumn("special_date", sd($"moniker))
 .withColumn("special_uuid", su($"moniker))
 .withColumn("special_phrase", sp($"moniker))

这样,您就可以获得包含三个新添加列的原始数据集。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/45352646

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档