首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spark Dataframe -为特定密钥组的值更改写入新记录

Spark Dataframe -为特定密钥组的值更改写入新记录
EN

Stack Overflow用户
提问于 2017-11-19 12:44:24
回答 2查看 407关注 0票数 0

当特定“键”组的"AMT“列发生变化时,需要写入一行。

例如:

代码语言:javascript
复制
Scenarios-1: For KEY=2, first change is 90 to 20, So need to write a record with value (20-90). 
Similarly the next change for the same key group is 20 to 30.5, So again need to write another record with value (30.5 - 20) 

Scenarios-2: For KEY=1, only one record for this KEY group so write as is

Scenarios-3: For KEY=3, Since the same AMT value exists twice, so write once

如何实现这一点?使用窗口函数还是通过groupBy agg函数?

输入数据示例:

代码语言:javascript
复制
val DF1 = List((1,34.6),(2,90.0),(2,90.0),(2,20.0),(2,30.5),(3,89.0),(3,89.0)).toDF("KEY", "AMT")

DF1.show(false)
+-----+-------------------+
|KEY  |AMT                |
+-----+-------------------+
|1    |34.6               |
|2    |90.0               |
|2    |90.0               |
|2    |20.0               |----->[ 20.0 - 90.0 = -70.0 ]
|2    |30.5               |----->[ 30.5 - 20.0 =  10.5 ]
|3    |89.0               |
|3    |89.0               |
+-----+-------------------+

期望值:

代码语言:javascript
复制
scala> df2.show()
+----+--------------------+
|KEY | AMT                |
+----+--------------------+
|  1 |       34.6         |-----> As Is 
|  2 |       -70.0        |----->[ 20.0 - 90.0 = -70.0 ]
|  2 |       10.5         |----->[ 30.5 - 20.0 =  10.5 ]
|  3 |       89.0         |-----> As Is, with one record only
+----+--------------------+
EN

回答 2

Stack Overflow用户

发布于 2017-11-20 05:50:40

我试着用pyspark而不是scala来解决这个问题。

代码语言:javascript
复制
from pyspark.sql.functions import lead
from pyspark.sql.window import Window
w1=Window().partitionBy("key").orderBy("key")
DF4 =spark.createDataFrame([(1,34.6),(2,90.0),(2,90.0),(2,20.0),(2,30.5),(3,89.0),(3,89.0)],["KEY", "AMT"])
DF4.createOrReplaceTempView('keyamt')
DF7=spark.sql('select distinct key,amt from keyamt where key in ( select key from (select key,count(distinct(amt))dist from keyamt group by key) where dist=1)')
DF8=DF4.join(DF7,DF4['KEY']==DF7['KEY'],'leftanti').withColumn('new_col',((lag('AMT',1).over(w1)).cast('double') ))
DF9=DF8.withColumn('new_col1', ((DF8['AMT']-DF8['new_col'].cast('double'))))
DF9.withColumn('new_col1', ((DF9['AMT']-DF9['new_col'].cast('double')))).na.fill(0)
DF9.filter(DF9['new_col1'] !=0).select(DF9['KEY'],DF9['new_col1']).union(DF7).orderBy(DF9['KEY'])

输出:

代码语言:javascript
复制
+---+--------+
|KEY|new_col1|
+---+--------+
|  1|    34.6|
|  2|   -70.0|
|  2|    10.5|
|  3|    89.0|
+---+--------+
票数 0
EN

Stack Overflow用户

发布于 2018-01-19 14:34:37

您可以使用window函数结合whenlead、用于排序的monotically_increasing_id()withColumn接口来实现您的逻辑,如下所示

代码语言:javascript
复制
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("KEY").orderBy("rowNo")
val tempdf = DF1.withColumn("rowNo", monotonically_increasing_id())
tempdf.select($"KEY", when(lead("AMT", 1).over(windowSpec).isNull || (lead("AMT", 1).over(windowSpec)-$"AMT").as("AMT")===lit(0.0), $"AMT").otherwise(lead("AMT", 1).over(windowSpec)-$"AMT").as("AMT")).show(false)
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/47373754

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档