我有一个数据框,它看起来像是这样:
+-----+-----+------+-----+
|col1 |col2 |col3 |col4 |
+-----+-----+------+-----+
|1.1 |2.3 |10.0 |1 |
|2.2 |1.5 |5.0 |1 |
|3.3 |1.3 |1.5 |1 |
|4.4 |0.5 |7.0 |1 |
|5.5 |1.2 |8.1 |2 |
|6.6 |2.3 |8.2 |2 |
|7.7 |4.5 |10.3 |2 |
+-----+-----+------+-----+我想从上面的行中减去每一行,但前提是它们在col4中有相同的条目,所以2-1,3-2,而不是5-4。此外,不应该更改col4,因此结果将是
+-----+-----+------+------+
|col1 |col2 |col3 |col4 |
+-----+-----+------+------+
|1.1 |-0.8 |-5.0 |1 |
|1.1 |-0.2 |-3.5 |1 |
|1.1 |-0.8 |5.5 |1 |
|1.1 |1.1 |0.1 |2 |
|1.1 |2.2 |2.1 |2 |
+-----+-----+------+------+这听起来很简单,但我似乎想不通
发布于 2021-07-05 01:29:50
您可以使用spark-sql来实现这一点,即使用数据帧创建一个临时视图并应用以下sql。它使用窗口函数LAG减去按col1排序并按col4分区的前一个行值。使用row_number标识并过滤由col4划分的每个组中的第一个行值。
df.createOrReplaceTempView('my_temp_view')
results = sparkSession.sql('<insert sql below here>')SELECT
col1,
col2,
col3,
col4
FROM (
SELECT
(col1 - (LAG(col1,1,0) OVER (PARTITION BY col4 ORDER BY col1) )) as col1,
(col2 - (LAG(col2,1,0) OVER (PARTITION BY col4 ORDER BY col1) )) as col2,
(col3 - (LAG(col3,1,0) OVER (PARTITION BY col4 ORDER BY col1) )) as col3,
col4,
ROW_NUMBER() OVER (PARTITION BY col4 ORDER BY col1) rn
FROM
my_temp_view
) t
WHERE rn <> 1发布于 2021-07-05 03:12:55
这里只是一个基于zipWithIndex和DF的自连接的想法-一些开销,你可以裁剪,z是你的col4。
在规模上,我不确定Catalyst Optimizer将应用的性能,我查看了.explain(真的);我并不完全相信,但我发现有时很难解释输出。保证了数据的排序。
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField,StructType,IntegerType, ArrayType, LongType}
val df = sc.parallelize(Seq( (1.0, 2.0, 1), (0.0, -1.0, 1), (3.0, 4.0, 1), (6.0, -2.3, 4))).toDF("x", "y", "z")
val newSchema = StructType(df.schema.fields ++ Array(StructField("rowid", LongType, false)))
val rddWithId = df.rdd.zipWithIndex
val dfZippedWithId = spark.createDataFrame(rddWithId.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)
dfZippedWithId.show(false)
dfZippedWithId.printSchema()
val res = dfZippedWithId.as("dfZ1").join(dfZippedWithId.as("dfZ2"), $"dfZ1.z" === $"dfZ2.z" &&
$"dfZ1.rowid" === $"dfZ2.rowid" -1
,"inner")
.withColumn("newx", $"dfZ2.x" - $"dfZ1.x")//.explain(true)
res.show(false)返回输入:
+---+----+---+-----+
|x |y |z |rowid|
+---+----+---+-----+
|1.0|2.0 |1 |0 |
|0.0|-1.0|1 |1 |
|3.0|4.0 |1 |2 |
|6.0|-2.3|4 |3 |
+---+----+---+-----+以及您可以通过选择和添加额外计算来定制的结果:
+---+----+---+-----+---+----+---+-----+----+
|x |y |z |rowid|x |y |z |rowid|newx|
+---+----+---+-----+---+----+---+-----+----+
|1.0|2.0 |1 |0 |0.0|-1.0|1 |1 |-1.0|
|0.0|-1.0|1 |1 |3.0|4.0 |1 |2 |3.0 |
+---+----+---+-----+---+----+---+-----+----+https://stackoverflow.com/questions/68236048
复制相似问题