首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在使用sparklyr调用collect_list时根据另一个变量保留顺序

在使用sparklyr调用collect_list时根据另一个变量保留顺序
EN

Stack Overflow用户
提问于 2019-05-10 18:43:27
回答 2查看 368关注 0票数 0

这个问题本质上是对this question的重复,除了我在R中工作之外,这个问题的解决方案看起来很可靠,但我还没有找到如何在窗口函数上以同样的方式在sparklyr中应用collect_list

我有一个星火DataFrame,其结构如下:

代码语言:javascript
复制
------------------------------
userid |     date     | city
------------------------------
   1   |  2018-08-02  |   A
   1   |  2018-08-03  |   B
   1   |  2018-08-04  |   C
   2   |  2018-08-17  |   G
   2   |  2018-08-20  |   E
   2   |  2018-08-23  |   F

我试图将DataFrame按userid分组,按date对每个组进行排序,并将city列折叠为其值的连接。期望产出:

代码语言:javascript
复制
------------------
userid | cities
------------------
   1   |  A, B, C
   2   |  G, E, F

问题是,我尝试使用的每一种方法都会产生一些用户(appx )。对5000名用户的测试中,3%的人没有按正确的顺序排列“城市”栏。

尝试1:使用dplyrcollect_list

代码语言:javascript
复制
my_sdf %>%
  dplyr::group_by(userid) %>%
  dplyr::arrange(date) %>%
  dplyr::summarise(cities = paste(collect_list(city), sep = ", ")))

尝试2:使用replyr::gapply,因为该操作符合“分组顺序应用”的描述。

代码语言:javascript
复制
get_cities <- . %>%
   summarise(cities = paste(collect_list(city), sep = ", "))

my_sdf %>%
  replyr::gapply(gcolumn = "userid",
                 f = get_cities,
                 ocolumn = "date",
                 partitionMethod = "group_by")

尝试3:写为SQL窗口函数。

代码语言:javascript
复制
my_sdf %>% 
  spark_session(sc) %>%
  sparklyr::invoke("sql", 
                   "SELECT userid, CONCAT_WS(', ', collect_list(city)) AS cities
                   OVER (PARTITION BY userid
                         ORDER BY date)
                   FROM my_sdf") %>%
  sparklyr::sdf_register() %>%
  sparklyr::sdf_copy_to(sc, ., "my_sdf", overwrite = T)

^引发以下错误:

代码语言:javascript
复制
Error: org.apache.spark.sql.catalyst.parser.ParseException: 
mismatched input 'OVER' expecting <EOF>(line 2, pos 19)

== SQL ==
SELECT userid, conversion_location, CONCAT_WS(' > ', collect_list(channel)) AS path
                   OVER (PARTITION BY userid, conversion_location
-------------------^^^
                         ORDER BY occurred_at)
                   FROM paths_model
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2019-05-13 21:54:06

解决了!我误解了collect_list()和Spark如何协同工作。我没有意识到列表可以返回,我认为连接必须在查询中进行。以下内容产生了所需的结果:

代码语言:javascript
复制
spark_output <- spark_session(sc) %>%
  sparklyr::invoke("sql", 
                   "SELECT userid, collect_list(city)
                   OVER (PARTITION BY userid
                         ORDER BY date
                         ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
                   AS cities
                   FROM my_sdf") %>%
  sdf_register() %>%
  group_by(userid) %>%
  filter(row_number(userid) == 1) %>%
  ungroup() %>%
  mutate(cities = paste(cities, sep = " > ")) %>%
  sdf_register()
票数 0
EN

Stack Overflow用户

发布于 2019-05-10 19:15:16

好的:所以我承认下面的解决方案根本没有效率(它使用了for循环,并且实际上是很多代码用于看起来可能是一个简单的任务),但我认为这应该是有效的:

代码语言:javascript
复制
#install.packages("tidyverse") # if needed
library(tidyverse)

df <- tribble(
  ~userid, ~date, ~city,
  1   ,  "2018-08-02"  ,   "A",
  1   ,  "2018-08-03"  ,   "B",
  1   ,  "2018-08-04"  ,   "C",
  2   ,  "2018-08-17"  ,   "G",
  2   ,  "2018-08-20"  ,   "E",
  2   ,  "2018-08-23"  ,   "F"
)

cityPerId <- df %>% 
  spread(key = date, value = city) 

toMutate <- NA
for (i in 1:nrow(cityPerId)) {
  cities <- cityPerId[i,][2:ncol(cityPerId)] %>% t() %>%
    as.vector() %>% 
    na.omit()
  collapsedCities <- paste(cities, collapse = ",")
  toMutate <- c(toMutate, collapsedCities)
}
toMutate <- toMutate[2:length(toMutate)]

final <- cityPerId %>% 
  mutate(cities = toMutate) %>% 
  select(userid, cities)
票数 -1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56083243

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档