火花2.1.x在这里。我有一堆JSON文件(有相同的模式),我正在读到一个星火Dataset,如下所示:
val ds = spark.read.json("some/path/to/lots/of/json/*.json")然后,我可以打印ds模式,并看到所有内容都已正确读取:
ds.printSchema()
// Outputs:
root
|-- fizz: boolean (nullable = true)
|-- moniker: string (nullable = true)
|-- buzz: string (nullable = true)
|-- foo: string (nullable = true)
|-- bar: string (nullable = true)请注意moniker字符串列。我现在想:
special_date的日期/时间列;(b)名为special_uuid的UUID列;(c)名为special_phrase的字符串列;然后ds中的所有记录,对于每个记录,将其moniker值传递给三个后续函数:(a) deriveSpecialDate(val moniker : String) : Date、(b) deriveSpecialUuid(val moniker : String) : UUID和(c) deriveSpecialPhrase(val moniker : String) : String。然后,每个函数的输出都需要成为相应列的记录的值。我最好的尝试:
val ds = spark.read.json("some/path/to/lots/of/json/*.json")
ds.foreach(record => {
val moniker : String = record.select("moniker")
val specialDate : Date = deriveSpecialDate(moniker)
val specialUuid : UUID = deriveSpecialUuid(moniker)
val specialPhrase : String = deriveSpecialPhrase(moniker)
// This doesn't work because special_* fields don't exist in the original
// schema dervied from the JSON files. We're ADDING these columns after the
// JSON read and then populating their values dynamically.
record.special_date = specialDate
record.special_uuid = specialUuid
record.special_phrase = specialPhrase
})知道如何做到这一点吗?
发布于 2017-07-27 14:25:48
我将使用spark中的udf (用户定义函数)来增强原始数据集的3列。
val deriveSpecialDate = udf((moniker: String) => // implement here)
val deriveSpecialUuid= udf((moniker: String) => // implement here)
val deriveSpecialPhrase = udf((moniker: String) => // implement here)在那之后,你可以做这样的事情:
ds.withColumn("special_date", deriveSpecialDate(col("moniker)))
.withColumn("special_uuid", deriveSpecialUuid(col("moniker)))
.withColumn("special_phrase", deriveSpecialPhrase (col("moniker)))它将为您带来三列的新数据格式。如果需要,还可以使用map函数将其转换为数据集。
发布于 2017-07-27 14:48:45
若要创建新列,可以使用withColumn。如果您已经有了一个函数,则需要将该函数注册为UDF (用户定义函数)。
val sd = sqlContext.udf.register("deriveSpecialDate",deriveSpecialDate _ )
val su = sqlContext.udf.register("deriveSpecialUuid",deriveSpecialUuid _ )
val sp = sqlContext.udf.register("deriveSpecialPhrase", deriveSpecialPhrase _)要使用这个udf,您需要在列中创建一个新列,如
ds.withColumn("special_date", sd($"moniker))
.withColumn("special_uuid", su($"moniker))
.withColumn("special_phrase", sp($"moniker))这样,您就可以获得包含三个新添加列的原始数据集。
https://stackoverflow.com/questions/45352646
复制相似问题