首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spark中的增量更新

Spark中的增量更新
EN

Stack Overflow用户
提问于 2018-07-16 20:45:06
回答 1查看 1.5K关注 0票数 2

在Scala中,在Spark中添加增量更新的最有效方法是什么?

我有一个雇员数据格式E1,它是用主键empId归档的。

我还有一个最新的员工数据,我只想将更新的、新的和删除的数据写回存档数据。

例如:

雇员档案:

代码语言:javascript
复制
EmpId, EmpName
1      Tom
2      Harry

雇员最近:

代码语言:javascript
复制
EmpId, EmpName
2      Harry Lewis
3      Hermoine

差额应返回:

代码语言:javascript
复制
EmpId, EmpName, deleted
1      Tom         yes
2      Harry Lewis no
3      Hermoine    no
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-07-17 02:36:40

但是,如果您只想找到更新的或新的行,就可以使用except,因为删除的行应该存在,所以要复杂一些。假设E1是归档的employee数据格式,而E2是最近的,那么您可以在Scala中使用完全连接,如下所示:

代码语言:javascript
复制
E1.withColumnRenamed("EmpName", "EmpNameOld")
  .join(E2, Seq("EmpId"), "fullouter")
  .where($"EmpName".isNull || $"EmpNameOld".isNull || $"EmpName" =!= $"EmpNameOld")
  .withColumn("deleted", when($"EmpName".isNull, "yes").otherwise("no"))
  .withColumn("EmpName", coalesce($"EmpName", $"EmpNameOld"))
  .drop("EmpNameOld")

这将为您提供所需的结果,包括更新的行、新行和已删除的行。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51369855

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档