首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何展平结构数组类型的列(由Spark ML API返回)?

如何展平结构数组类型的列(由Spark ML API返回)?
EN

Stack Overflow用户
提问于 2017-10-14 02:32:26
回答 2查看 9.8K关注 0票数 6

也许只是因为我对API比较陌生,但我觉得Spark ML方法经常返回不必要的难以使用的DFs。

这一次,是肌萎缩侧索硬化症模型把我给绊倒了。具体地说,是recommendForAllUsers方法。让我们重新构造它将返回的DF类型:

代码语言:javascript
复制
scala> val arrayType = ArrayType(new StructType().add("itemId", IntegerType).add("rating", FloatType))

scala> val recs = Seq((1, Array((1, .7), (2, .5))), (2, Array((0, .9), (4, .1)))).
  toDF("userId", "recommendations").
  select($"userId", $"recommendations".cast(arrayType))

scala> recs.show()
代码语言:javascript
复制
+------+------------------+
|userId|   recommendations|
+------+------------------+
|     1|[[1,0.7], [2,0.5]]|
|     2|[[0,0.9], [4,0.1]]|
+------+------------------+
代码语言:javascript
复制
scala> recs.printSchema
代码语言:javascript
复制
root
 |-- userId: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- itemId: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)

现在,我只关心recommendations列中的itemId。毕竟,这个方法是recommendForAllUsers而不是recommendAndScoreForAllUsers (好吧,好吧,我就不再那么无礼了……)

我该怎么做??

当我创建一个UDF时,我想我已经拥有它了:

代码语言:javascript
复制
scala> val itemIds = udf((arr: Array[(Int, Float)]) => arr.map(_._1))

但这会产生一个错误:

代码语言:javascript
复制
scala> recs.withColumn("items", items($"recommendations"))
org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(recommendations)' due to data type mismatch: argument 1 requires array<struct<_1:int,_2:float>> type, however, '`recommendations`' is of array<struct<itemId:int,rating:float>> type.;;
'Project [userId#87, recommendations#92, UDF(recommendations#92) AS items#238]
+- Project [userId#87, cast(recommendations#88 as array<struct<itemId:int,rating:float>>) AS recommendations#92]
   +- Project [_1#84 AS userId#87, _2#85 AS recommendations#88]
      +- LocalRelation [_1#84, _2#85]

有什么想法吗?谢谢!

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2017-10-14 04:47:17

哇,我的同事想出了一个非常优雅的解决方案:

代码语言:javascript
复制
scala> recs.select($"userId", $"recommendations.itemId").show
+------+------+
|userId|itemId|
+------+------+
|     1|[1, 2]|
|     2|[0, 4]|
+------+------+

因此,也许Spark ML API毕竟并不那么难:)

票数 7
EN

Stack Overflow用户

发布于 2017-10-14 03:02:43

使用数组作为列的类型,例如recommendations,使用explode函数(或更高级的flatMap运算符)会非常有效率。

列分解(e:):

为给定数组或映射列中的每个元素创建一个新行。

这为您提供了可以使用的简单结构。

代码语言:javascript
复制
import org.apache.spark.sql.types._
val structType = new StructType().
  add($"itemId".int).
  add($"rating".float)
val arrayType = ArrayType(structType)
val recs = Seq((1, Array((1, .7), (2, .5))), (2, Array((0, .9), (4, .1)))).
  toDF("userId", "recommendations").
  select($"userId", $"recommendations" cast arrayType)

val exploded = recs.withColumn("recs", explode($"recommendations"))
scala> exploded.show
+------+------------------+-------+
|userId|   recommendations|   recs|
+------+------------------+-------+
|     1|[[1,0.7], [2,0.5]]|[1,0.7]|
|     1|[[1,0.7], [2,0.5]]|[2,0.5]|
|     2|[[0,0.9], [4,0.1]]|[0,0.9]|
|     2|[[0,0.9], [4,0.1]]|[4,0.1]|
+------+------------------+-------+

在带有* (星号)的select运算符中,结构很好,可以将它们展平为每个结构字段的列。

你可以做select($"element.*")

代码语言:javascript
复制
scala> exploded.select("userId", "recs.*").show
+------+------+------+
|userId|itemId|rating|
+------+------+------+
|     1|     1|   0.7|
|     1|     2|   0.5|
|     2|     0|   0.9|
|     2|     4|   0.1|
+------+------+------+

我想这能做你想要的。

附注:尽量远离UDF,因为它们会“触发”从内部格式(InternalRow)到JVM对象的行转换,这可能会导致过多的GC。

票数 5
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/46736063

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档