首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >将在星火数据集中/Dataframe中第一次排序的行合并

将在星火数据集中/Dataframe中第一次排序的行合并
EN

Stack Overflow用户
提问于 2017-06-06 09:13:34
回答 2查看 686关注 0票数 1

假设我们在星火中有一个dataset/dataframe,其中有3列IDWordTimestamp

我想编写一个UDAF函数,在这里我可以这样做

代码语言:javascript
复制
df.show()

ID | Word | Timestamp
1  | I    | "2017-1-1 00:01"
1  | am   | "2017-1-1 00:02"
1  | Chris | "2017-1-1 00:03"
2  | I    | "2017-1-1 00:01"
2  | am   | "2017-1-1 00:02"
2  | Jessica | "2017-1-1 00:03"

val df_merged = df.groupBy("ID")
  .sort("ID", "Timestamp")
  .agg(custom_agg("ID", "Word", "Timestamp")

df_merged.show

ID | Words         | StartTime        |      EndTime     |
1  | "I am Chris"  | "2017-1-1 00:01" | "2017-1-1 00:03" |
1  | "I am Jessica"  | "2017-1-1 00:01" | "2017-1-1 00:03" |

问题是如何确保在我的Words中按正确的顺序合并列UDAF

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2017-06-07 05:37:34

这里有一个使用Spark2的groupByKey (与未键入的Dataset一起使用)的解决方案,groupByKey的.The优势是您可以访问这个组(您可以在mapGroups中获得一个Iterator[Row] ):

代码语言:javascript
复制
 df.groupByKey(r => r.getAs[Int]("ID"))
      .mapGroups{case(id,rows) => {
        val sorted = rows
          .toVector
          .map(r => (r.getAs[String]("Word"),r.getAs[java.sql.Timestamp]("Timestamp")))
          .sortBy(_._2.getTime)

        (id, 
         sorted.map(_._1).mkString(" "),
         sorted.map(_._2).head,
         sorted.map(_._2).last
         )  
        }
      }.toDF("ID","Words","StartTime","EndTime")
票数 1
EN

Stack Overflow用户

发布于 2017-06-06 09:54:02

对不起,我不使用Scala,希望你能读一读。

Window函数可以做您想做的事情:

代码语言:javascript
复制
df = df.withColumn('Words', f.collect_list(df['Word']).over(
    Window().partitionBy(df['ID']).orderBy('Timestamp').rowsBetween(start=Window.unboundedPreceding,
                                                                    end=Window.unboundedFollowing)))

输出:

代码语言:javascript
复制
+---+-------+-----------------+----------------+                                
| ID|   Word|        Timestamp|           Words|
+---+-------+-----------------+----------------+
|  1|      I|2017-1-1 00:01:00|  [I, am, Chris]|
|  1|     am|2017-1-1 00:02:00|  [I, am, Chris]|
|  1|  Chris|2017-1-1 00:03:00|  [I, am, Chris]|
|  2|      I|2017-1-1 00:01:00|[I, am, Jessica]|
|  2|     am|2017-1-1 00:02:00|[I, am, Jessica]|
|  2|Jessica|2017-1-1 00:03:00|[I, am, Jessica]|
+---+-------+-----------------+----------------+

然后groupBy上面的数据:

代码语言:javascript
复制
df = df.groupBy(df['ID'], df['Words']).agg(
    f.min(df['Timestamp']).alias('StartTime'), f.max(df['Timestamp']).alias('EndTime'))
df = df.withColumn('Words', f.concat_ws(' ', df['Words']))

输出:

代码语言:javascript
复制
+---+------------+-----------------+-----------------+                          
| ID|       Words|        StartTime|          EndTime|
+---+------------+-----------------+-----------------+
|  1|  I am Chris|2017-1-1 00:01:00|2017-1-1 00:03:00|
|  2|I am Jessica|2017-1-1 00:01:00|2017-1-1 00:03:00|
+---+------------+-----------------+-----------------+
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/44385934

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档