首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >火花增量加载覆盖旧记录

火花增量加载覆盖旧记录
EN

Stack Overflow用户
提问于 2018-12-03 03:43:23
回答 3查看 14K关注 0票数 4

我需要使用Spark (PySpark)对表进行增量加载

下面是一个例子:

第一天

代码语言:javascript
复制
id | value
-----------
1  | abc
2  | def

第2天

代码语言:javascript
复制
id | value
-----------
2  | cde
3  | xyz

预期结果

代码语言:javascript
复制
id | value
-----------
1  | abc
2  | cde
3  | xyz

这可以很容易地在关系数据库中完成,

想知道这是否可以在火花或其他转型工具,例如普雷斯托?

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2018-12-03 09:18:12

这就是你要的!第一个Dataframe:

代码语言:javascript
复制
 >>> list1 = [(1, 'abc'),(2,'def')]
 >>> olddf = spark.createDataFrame(list1, ['id', 'value'])
 >>> olddf.show();
 +---+-----+
 | id|value|
 +---+-----+
 |  1|  abc|
 |  2|  def|
 +---+-----+

第二个Dataframe:

代码语言:javascript
复制
>>> list2 = [(2, 'cde'),(3,'xyz')]
>>> newdf = spark.createDataFrame(list2, ['id', 'value'])
>>> newdf.show();
+---+-----+
| id|value|
+---+-----+
|  2|  cde|
|  3|  xyz|
+---+-----+

现在,使用完全外部连接和合并来连接和合并这两个数据文件,同时使用合并函数,同时使用用户定义的值来选择和替换空值。

代码语言:javascript
复制
from pyspark.sql.functions import *

>>> df = olddf.join(newdf, olddf.id == newdf.id,'full_outer').select(coalesce(olddf.id,newdf.id).alias("id"),coalesce(newdf.value,olddf.value).alias("value"))
>>> df.show();
+---+-----+
| id|value|
+---+-----+
|  1|  abc|
|  3|  xyz|
|  2|  cde|
+---+-----+

我希望这能解决你的问题。:-)

票数 13
EN

Stack Overflow用户

发布于 2018-12-03 04:37:46

数据附加是通过union函数实现的。我将演示一个例子,并创建2个数据,正如您在问题中提到的。

代码语言:javascript
复制
from pyspark.sql.types import Row
df1 = sqlContext.createDataFrame([Row(id=1,value="abc"),Row(id=2,value="def")])

df1.show()
+---+-----+
| id|value|
+---+-----+
|  1|  abc|
|  2|  def|
+---+-----+

df2 = sqlContext.createDataFrame([Row(id=2,value="cde"),Row(id=3,value="xyz")])
df2.show()
+---+-----+
| id|value|
+---+-----+
|  2|  cde|
|  3|  xyz|
+---+-----+

让我们在这两个数据文件之间执行一个union,您将得到所需的结果。

代码语言:javascript
复制
df2.union(df1).dropDuplicates(["id"]).show()
+---+-----+
| id|value|
+---+-----+
|  1|  abc|
|  3|  xyz|
|  2|  cde|
+---+-----+

可以使用ascpyspark.sql.functions对输出进行排序。

代码语言:javascript
复制
from pyspark.sql.functions import asc


df2.union(df1).dropDuplicates(["id"]).sort(asc("id")).show()
+---+-----+
| id|value|
+---+-----+
|  1|  abc|
|  2|  cde|
|  3|  xyz|
+---+-----+
票数 2
EN

Stack Overflow用户

发布于 2018-12-03 05:50:01

解决方法是,在dataframe中添加一个日期列,然后根据id排序,然后在降序中按日期顺序排序,然后取== 1,它将始终根据id给出最新的记录。

代码语言:javascript
复制
df.("rank", rank().over(Window.partitionBy($"id").orderBy($"date".desc)))
  .filter($"rank" === 1)
  .drop($"rank")
  .orderBy($"id")
  .show
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/53587175

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档