首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spark (Scala)成对减去数据帧中的所有行

Spark (Scala)成对减去数据帧中的所有行
EN

Stack Overflow用户
提问于 2021-07-03 19:56:54
回答 2查看 99关注 0票数 0

我有一个数据框,它看起来像是这样:

代码语言:javascript
复制
+-----+-----+------+-----+
|col1 |col2 |col3  |col4 |
+-----+-----+------+-----+
|1.1  |2.3  |10.0  |1    |
|2.2  |1.5  |5.0   |1    |
|3.3  |1.3  |1.5   |1    |
|4.4  |0.5  |7.0   |1    |
|5.5  |1.2  |8.1   |2    |
|6.6  |2.3  |8.2   |2    |
|7.7  |4.5  |10.3  |2    |
+-----+-----+------+-----+

我想从上面的行中减去每一行,但前提是它们在col4中有相同的条目,所以2-1,3-2,而不是5-4。此外,不应该更改col4,因此结果将是

代码语言:javascript
复制
+-----+-----+------+------+
|col1 |col2 |col3  |col4  |
+-----+-----+------+------+
|1.1  |-0.8 |-5.0  |1     |
|1.1  |-0.2 |-3.5  |1     |
|1.1  |-0.8 |5.5   |1     |
|1.1  |1.1  |0.1   |2     |
|1.1  |2.2  |2.1   |2     |
+-----+-----+------+------+

这听起来很简单,但我似乎想不通

EN

回答 2

Stack Overflow用户

发布于 2021-07-05 01:29:50

您可以使用spark-sql来实现这一点,即使用数据帧创建一个临时视图并应用以下sql。它使用窗口函数LAG减去按col1排序并按col4分区的前一个行值。使用row_number标识并过滤由col4划分的每个组中的第一个行值。

代码语言:javascript
复制
df.createOrReplaceTempView('my_temp_view')

results = sparkSession.sql('<insert sql below here>')
代码语言:javascript
复制
SELECT
     col1,
     col2,
     col3,
     col4
FROM (
SELECT
    (col1 - (LAG(col1,1,0) OVER (PARTITION BY col4 ORDER BY col1) )) as col1,
    (col2 - (LAG(col2,1,0) OVER (PARTITION BY col4 ORDER BY col1) )) as col2,
    (col3 - (LAG(col3,1,0) OVER (PARTITION BY col4 ORDER BY col1) )) as col3,
    col4,
    ROW_NUMBER() OVER (PARTITION BY col4 ORDER BY col1) rn
FROM
    my_temp_view
) t 
WHERE rn <> 1

db-fiddle

票数 0
EN

Stack Overflow用户

发布于 2021-07-05 03:12:55

这里只是一个基于zipWithIndex和DF的自连接的想法-一些开销,你可以裁剪,z是你的col4。

在规模上,我不确定Catalyst Optimizer将应用的性能,我查看了.explain(真的);我并不完全相信,但我发现有时很难解释输出。保证了数据的排序。

代码语言:javascript
复制
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField,StructType,IntegerType, ArrayType, LongType}

val df = sc.parallelize(Seq( (1.0, 2.0, 1), (0.0, -1.0, 1), (3.0, 4.0, 1), (6.0, -2.3, 4))).toDF("x", "y", "z")
val newSchema = StructType(df.schema.fields ++ Array(StructField("rowid", LongType, false)))

val rddWithId = df.rdd.zipWithIndex
val dfZippedWithId =  spark.createDataFrame(rddWithId.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)

dfZippedWithId.show(false)
dfZippedWithId.printSchema()

val res = dfZippedWithId.as("dfZ1").join(dfZippedWithId.as("dfZ2"), $"dfZ1.z" ===  $"dfZ2.z" &&
                                                                    $"dfZ1.rowid" ===  $"dfZ2.rowid" -1
                                                                   ,"inner")
                        .withColumn("newx", $"dfZ2.x" - $"dfZ1.x")//.explain(true)

res.show(false)

返回输入:

代码语言:javascript
复制
+---+----+---+-----+
|x  |y   |z  |rowid|
+---+----+---+-----+
|1.0|2.0 |1  |0    |
|0.0|-1.0|1  |1    |
|3.0|4.0 |1  |2    |
|6.0|-2.3|4  |3    |
+---+----+---+-----+

以及您可以通过选择和添加额外计算来定制的结果:

代码语言:javascript
复制
+---+----+---+-----+---+----+---+-----+----+
|x  |y   |z  |rowid|x  |y   |z  |rowid|newx|
+---+----+---+-----+---+----+---+-----+----+
|1.0|2.0 |1  |0    |0.0|-1.0|1  |1    |-1.0|
|0.0|-1.0|1  |1    |3.0|4.0 |1  |2    |3.0 |
+---+----+---+-----+---+----+---+-----+----+
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68236048

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档