我有一个包含事件详细信息的数据框架,我试图按日期和用户I获得最近报告的前5位事件。这是我迄今尝试过的代码。
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val df = sc.parallelize(Seq( ("20180515114049", "user001","e001","cross-over","some data related to even"),
("20180515114049", "user004","e002","cross-limit","some data related to event"),
("20180515114049", "user001","e001","cross-over","some data related to event"),
("20180615114049", "user001","e001","cross-over","some data related to event"),
("20180715114049", "user003","e004","cross-cl","some data related to event"),
("20180715114049", "user005","e001","cross-over","some data related to event"),
("20180715114049", "user005","e002","cross-limit","some data related to event"),
("20180715114049", "user005","e003","no-cross","some data related to event"),
("20180715114049", "user005","e004","cross-over","some data related to event"),
("20180715114049", "user005","e005","dl-over","some data related to event"),
("20180715114049", "user005","e003","no-cross","some data related to event"),
("20180815114049", "user006","e001","cross-over","some data related to event"),
("20180915114049", "user001","e001","cross-over","some data related to event"),
("20180105114049", "user001","e006","straight","some data related to event")
)).toDF("eventtime", "userid","eventid","event_title","eventdata")
df.show()
+--------------+-------+-------+-----------+--------------------+
| eventtime| userid|eventid|event_title| eventdata|
+--------------+-------+-------+-----------+--------------------+
|20180515114049|user001| e001| cross-over|some data related...|
|20180515114049|user004| e002|cross-limit|some data related...|
|20180515114049|user001| e001| cross-over|some data related...|
|20180615114049|user001| e001| cross-over|some data related...|
|20180715114049|user003| e004| cross-cl|some data related...|
|20180715114049|user005| e001| cross-over|some data related...|
|20180715114049|user005| e002|cross-limit|some data related...|
|20180715114049|user005| e003| no-cross|some data related...|
|20180715114049|user005| e004| cross-over|some data related...|
|20180715114049|user005| e005| dl-over|some data related...|
|20180715114049|user005| e003| no-cross|some data related...|
|20180815114049|user006| e001| cross-over|some data related...|
|20180915114049|user001| e001| cross-over|some data related...|
|20180105114049|user001| e006| straight|some data related...|
+--------------+-------+-------+-----------+--------------------+
val df2= df.groupBy($"userid",$"eventid").agg(last($"eventtime") as "lasteventtime")
df2.show(false)
+-------+-------+--------------+
|userid |eventid|lasteventtime |
+-------+-------+--------------+
|user005|e004 |20180715114049|
|user005|e001 |20180715114049|
|user001|e006 |20180105114049|
|user001|e001 |20180915114049|
|user005|e002 |20180715114049|
|user006|e001 |20180815114049|
|user004|e002 |20180515114049|
|user005|e005 |20180715114049|
|user005|e003 |20180715114049|
|user003|e004 |20180715114049|
+-------+-------+--------------+..。这里是我正在努力的部分,如何连接,汇总上一个报告组,以排名和获得前5位的上一次报告。..。
val w = Window.partitionBy($"userid",$"event_title",$"eventid").orderBy($"eventtime".desc)
val contentByRank = df.withColumn("rank", dense_rank().over(w)).filter($"rank" <= 5)
contentByRank.show(20,false)此外,如何获得过滤排名前5,在这种情况下,我们可能有多个事件,具有相同的排名。
+--------------+-------+-------+-----------+--------------------------+----+
|eventtime |userid |eventid|event_title|eventdata |rank|
+--------------+-------+-------+-----------+--------------------------+----+
|20180515114049|user004|e002 |cross-limit|some data related to event|1 |
|20180715114049|user005|e004 |cross-over |some data related to event|1 |
|20180815114049|user006|e001 |cross-over |some data related to event|1 |
|20180715114049|user005|e003 |no-cross |some data related to event|1 |
|20180715114049|user005|e003 |no-cross |some data related to event|1 |
|20180715114049|user005|e005 |dl-over |some data related to event|1 |
|20180715114049|user003|e004 |cross-cl |some data related to event|1 |
|20180715114049|user005|e001 |cross-over |some data related to event|1 |
|20180105114049|user001|e006 |straight |some data related to event|1 |
|20180715114049|user005|e002 |cross-limit|some data related to event|1 |
|20180915114049|user001|e001 |cross-over |some data related to event|1 |
|20180615114049|user001|e001 |cross-over |some data related to event|2 |
|20180515114049|user001|e001 |cross-over |some data related to even |3 |
|20180515114049|user001|e001 |cross-over |some data related to event|3 |
+--------------+-------+-------+-----------+--------------------------+----+发布于 2018-07-04 14:19:15
我已经决定了这个解决办法。首先根据上次报告的时间聚合数据,然后将其与原始DF连接起来,以消除所有不必要的数据,并对结果数据进行排序。
val df2= df.groupBy($"userid",$"eventid").agg(last($"eventtime") as "eventtime")
val lasteventdf=df.join(df2,Seq("eventtime", "userid","eventid"))
val w = Window.partitionBy($"userid",$"event_title",$"eventid").orderBy($"eventtime".desc)
val contentByRank = lasteventdf.withColumn("rank", dense_rank().over(w)).filter($"rank" <= 5)
contentByRank.show(20,false)
--------------+-------+-------+-----------+----------------------------+----+
|eventtime |userid |eventid|event_title|eventdata |rank|
+--------------+-------+-------+-----------+----------------------------+----+
|20180515114049|user004|e002 |cross-limit|some data related to event |1 |
|20180715114049|user005|e004 |cross-over |some data relat7ed to event |1 |
|20180815114049|user006|e001 |cross-over |some data re22lated to event|1 |
|20180715114049|user005|e003 |no-cross |some data relate6d to event |1 |
|20180715114049|user005|e003 |no-cross |some data rel9ated to event |1 |
|20180715114049|user005|e005 |dl-over |some data relat8ed to event |1 |
|20180715114049|user003|e004 |cross-cl |some data related2 to event |1 |
|20180715114049|user005|e001 |cross-over |some data related4 to event |1 |
|20180105114049|user001|e006 |straight |some data relat4ed to event |1 |
|20180715114049|user005|e002 |cross-limit|some data related5 to event |1 |
|20180915114049|user001|e001 |cross-over |some data rel3ated to event |1 |
+--------------+-------+-------+-----------+----------------------------+----+https://stackoverflow.com/questions/51163325
复制相似问题