在Scala中,在Spark中添加增量更新的最有效方法是什么?
我有一个雇员数据格式E1,它是用主键empId归档的。
我还有一个最新的员工数据,我只想将更新的、新的和删除的数据写回存档数据。
例如:
雇员档案:
EmpId, EmpName
1 Tom
2 Harry雇员最近:
EmpId, EmpName
2 Harry Lewis
3 Hermoine差额应返回:
EmpId, EmpName, deleted
1 Tom yes
2 Harry Lewis no
3 Hermoine no发布于 2018-07-17 02:36:40
但是,如果您只想找到更新的或新的行,就可以使用except,因为删除的行应该存在,所以要复杂一些。假设E1是归档的employee数据格式,而E2是最近的,那么您可以在Scala中使用完全连接,如下所示:
E1.withColumnRenamed("EmpName", "EmpNameOld")
.join(E2, Seq("EmpId"), "fullouter")
.where($"EmpName".isNull || $"EmpNameOld".isNull || $"EmpName" =!= $"EmpNameOld")
.withColumn("deleted", when($"EmpName".isNull, "yes").otherwise("no"))
.withColumn("EmpName", coalesce($"EmpName", $"EmpNameOld"))
.drop("EmpNameOld")这将为您提供所需的结果,包括更新的行、新行和已删除的行。
https://stackoverflow.com/questions/51369855
复制相似问题