首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用星火联接的自定义重复删除策略

使用星火联接的自定义重复删除策略
EN

Stack Overflow用户
提问于 2018-02-14 12:58:24
回答 2查看 1.1K关注 0票数 0

我有两个以上的表,我希望加入它们并创建一个查询速度更快的表。

表-1

代码语言:javascript
复制
---------------
user  | activityId
---------------
user1 | 123
user2 | 123
user3 | 123
user4 | 123
user5 | 123
---------------

表-2

代码语言:javascript
复制
---------------------------------
user  | activityId | event-1-time
---------------------------------
user2 | 123        | 1001
user2 | 123        | 1002
user3 | 123        | 1003
user5 | 123        | 1004
---------------------------------

表-3

代码语言:javascript
复制
---------------------------------
user  | activityId | event-2-time
---------------------------------
user2 | 123        | 10001
user5 | 123        | 10002
---------------------------------

表-1 over (user,activityId)上的左联接与表2和表3将产生如下结果:

连接-数据

代码语言:javascript
复制
--------------------------------------------------------------------
user  | activityId | event-1 | event-1-time | event-2 | event-2-time
--------------------------------------------------------------------
user1 | 123        | 0       | null         | 0       | null
user2 | 123        | 1       | 1001         | 1       | 10001
user2 | 123        | 1       | 1002         | 1       | 10001
user3 | 123        | 1       | 1003         | 0       | null
user4 | 123        | 0       | null         | 0       | null
user5 | 123        | 1       | 1004         | 1       | 10002
--------------------------------------------------------------------

我希望删除事件-2在同一时间引入的冗余,即事件-2只出现一次,但由于事件-1出现了两次而报告了两次。

换句话说,用户和activityId分组记录在每个表级别上都应该是不同的。

我要跟随输出。我不在乎关系(事件-1和事件-2)。是否有任何允许自定义连接并实现此行为的内容?

代码语言:javascript
复制
user  | activityId | event-1 | event-1-time | event-2 | event-2-time
--------------------------------------------------------------------
user1 | 123        | 0       | null         | 0       | null
user2 | 123        | 1       | 1001         | 1       | 10001
user2 | 123        | 1       | 1002         | 0       | null
user3 | 123        | 1       | 1003         | 0       | null
user4 | 123        | 0       | null         | 0       | null
user5 | 123        | 1       | 1004         | 1       | 10002
--------------------------------------------------------------------

编辑:

我使用Scala连接这些表。使用的查询:

代码语言:javascript
复制
val joined = table1.join(table2, Seq("user","activityId"), "left").join(table3, Seq("user","activityId"), "left")

joined.select(table1("user"), table1("activityId"), when(table2("activityId").isNull,0).otherwise(1) as "event-1", 
table2("timestamp") as "event-1-time"), when(table3("activityId").isNull, 0).otherwise(1) as "event-2", table3("timestamp") as "event-2-time").show
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2018-02-16 01:55:30

您应该为E 29E 110每组用户E 211E 112E 213 activityId E 117>创建一个附加列,并在E 218 outer join 处理<>E 223>中使用该添加的列。

代码语言:javascript
复制
import org.apache.spark.sql.expressions._
def windowSpec = Window.partitionBy("user").orderBy("activityId")

import org.apache.spark.sql.functions._
val tempTable1 = table1.withColumn("rowNumber", row_number().over(windowSpec))
val tempTable2 = table2.withColumn("rowNumber", row_number().over(windowSpec)).withColumn("event-1", lit(1))
val tempTable3 = table3.withColumn("rowNumber", row_number().over(windowSpec)).withColumn("event-2", lit(1))

tempTable1
    .join(tempTable2, Seq("user", "activityId", "rowNumber"), "outer")
    .join(tempTable3, Seq("user", "activityId", "rowNumber"), "outer")
    .drop("rowNumber")
    .na.fill(0)

您应该将所需的输出dataframe作为

代码语言:javascript
复制
+-----+----------+------------+-------+------------+-------+
|user |activityId|event-1-time|event-1|event-2-time|event-2|
+-----+----------+------------+-------+------------+-------+
|user1|123       |null        |0      |null        |0      |
|user2|123       |1002        |1      |null        |0      |
|user2|123       |1001        |1      |10001       |1      |
|user3|123       |1003        |1      |null        |0      |
|user4|123       |null        |0      |null        |0      |
|user5|123       |1004        |1      |10002       |1      |
+-----+----------+------------+-------+------------+-------+
票数 1
EN

Stack Overflow用户

发布于 2018-02-15 08:53:37

下面是需求的代码实现

代码语言:javascript
复制
from pyspark.sql import Row
ll = [('test',123),('test',123),('test',123),('test',123)]
rdd = sc.parallelize(ll)
test1 = rdd.map(lambda x: Row(user=x[0], activityid=int(x[1])))
test1_df = sqlContext.createDataFrame(test1)

mm = [('test',123,1001),('test',123,1002),('test',123,1003),('test',123,1004)]
rdd1 = sc.parallelize(mm)
test2 = rdd1.map(lambda x: Row(user=x[0], 
activityid=int(x[1]),event_time_1=int(x[2])))
test2_df = sqlContext.createDataFrame(test2)

nn = [('test',123,10001),('test',123,10002)]
rdd2 = sc.parallelize(nn)
test3 = rdd2.map(lambda x: Row(user=x[0], 
activityid=int(x[1]),event_time_2=int(x[2])))
test3_df = sqlContext.createDataFrame(test3)

from pyspark.sql.window import Window
import pyspark.sql.functions as func
from pyspark.sql.functions import dense_rank, rank

n = Window.partitionBy(test2_df.user,test2_df.activityid).orderBy(test2_df.event_time_1)
int2_df = test2_df.select("user","activityid","event_time_1",rank().over(n).alias("col_rank")).filter('col_rank = 1')

o = Window.partitionBy(test3_df.user,test3_df.activityid).orderBy(test3_df.event_time_2)
int3_df = test3_df.select("user","activityid","event_time_2",rank().over(o).alias("col_rank")).filter('col_rank = 1')

test1_df.distinct().join(int2_df,["user","activityid"],"leftouter").join(int3_df,["user","activityid"],"leftouter").show(10)

+----+----------+------------+--------+------------+--------+
|user|activityid|event_time_1|col_rank|event_time_2|col_rank|
+----+----------+------------+--------+------------+--------+
|test|       123|        1001|       1|       10001|       1|
+----+----------+------------+--------+------------+--------+
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48787831

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档