我有一个pyspark函数,但需要将其转换为Scala
PySpark
for i in [c for c in r.columns if c.startswith("_")]:
r = r.withColumn(i, F.col(i)["id"])由于scala数据类型是不可变的,因此Scala有没有更好的方法让我创建多个新列,而不像我在pyspark中所做的那样,val df1 = df.withcolumn,val df2 =df1.with column?
表r如下
+-----------+-------------+-------------+-------------+-------------+
| _0| _1| _2| _3| _4|
+-----------+-------------+-------------+-------------+-------------+
|[1, Carter]| [5, Banks]|[11, Derrick]| [4, Hood]| [12, Jef]|
|[1, Carter]| [12, Jef]| [4, Hood]| [5, Banks]|[11, Derrick]|
|[1, Carter]| [4, Hood]| [12, Jef]|[11, Derrick]| [5, Banks]|
|[1, Carter]| [12, Jef]| [5, Banks]|[11, Derrick]| [4, Hood]|
|[1, Carter]| [4, Hood]| [12, Jef]| [5, Banks]|[11, Derrick]|
|[1, Carter]|[11, Derrick]| [12, Jef]| [4, Hood]| [5, Banks]|
|[1, Carter]| [12, Jef]|[11, Derrick]| [5, Banks]| [4, Hood]|
|[1, Carter]| [5, Banks]| [4, Hood]|[11, Derrick]| [12, Jef]|
|[1, Carter]|[11, Derrick]| [5, Banks]| [4, Hood]| [12, Jef]|
|[1, Carter]| [5, Banks]|[11, Derrick]| [12, Jef]| [4, Hood]|
|[1, Carter]| [5, Banks]| [12, Jef]|[11, Derrick]| [4, Hood]|
|[1, Carter]| [5, Banks]| [12, Jef]| [4, Hood]|[11, Derrick]|
|[1, Carter]|[11, Derrick]| [5, Banks]| [12, Jef]| [4, Hood]|
|[1, Carter]| [4, Hood]|[11, Derrick]| [5, Banks]| [12, Jef]|
|[1, Carter]|[11, Derrick]| [4, Hood]| [5, Banks]| [12, Jef]|
|[1, Carter]| [12, Jef]| [5, Banks]| [4, Hood]|[11, Derrick]|
|[1, Carter]| [12, Jef]|[11, Derrick]| [4, Hood]| [5, Banks]|
|[1, Carter]| [4, Hood]|[11, Derrick]| [12, Jef]| [5, Banks]|
|[1, Carter]|[11, Derrick]| [4, Hood]| [12, Jef]| [5, Banks]|
|[1, Carter]| [12, Jef]| [4, Hood]|[11, Derrick]| [5, Banks]|
+-----------+-------------+-------------+-------------+-------------+发布于 2021-07-14 20:44:47
您可以使用单个select (每个.withColumn创建一个要解析的新数据集)来完成此操作
// either replace with the internal id column, or take as is
val updates = r.columns.map(c => if (c.startsWith("_")) col(s"$c.id") as c else col(c))
val newDf = r.select(updates:_*) // _* expands the Sequence into a parameter list发布于 2021-07-14 20:48:01
您可以使用foldLeft
import org.apache.spark.sql.functions.{col}
val updDf = df
.columns
.filter(_.startsWith("_"))
.foldLeft(df)((df, c) => df.withColumn(s"new_$c", col(c).getItem("id")))https://stackoverflow.com/questions/68375211
复制相似问题