首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Sparklyr中split-apply-combine策略错误处理

Sparklyr中split-apply-combine策略错误处理
EN

Stack Overflow用户
提问于 2019-06-04 01:40:47
回答 1查看 142关注 0票数 0

我有一个名为"userid“的Spark DataFrame,我正在使用sparklyr对其进行操作。每个userid可以有从一行数据到几百行数据的任何地方。我将一个函数应用于每个userid组,该函数根据特定的事件条件压缩它所包含的行数。就像这样

代码语言:javascript
复制
sdf %>%
  group_by(userid) %>%
  ... %>%   # using dplyr::filter and dplyr::mutate
  ungroup()

我想把这个函数包装在一个错误处理程序中,比如purrr::possibly,这样如果一个错误出现在一个组中,计算就不会中断。

到目前为止,我使用replyr包取得了最大的成功。具体地说,replyr::gapply“通过grouping列中的值对from进行分区,对每个组应用通用转换,然后将这些组重新绑定在一起”。有两种数据分区方法:"group_by“和"extract”。作者仅推荐在组数量小于或等于100的情况下使用"extract“,但"group_by”方法并不像我预期的那样工作:

代码语言:javascript
复制
library(sparklyr)
library(dplyr) 
library(replyr)   # replyr::gapply
library(purrr)    # purrr::possibly

sc <- spark_connect(master = "local")

# Create a test data frame to use gapply on.
test_spark <- tibble(
  userid = c(1, 1, 2, 2, 3, 3),
  occurred_at = seq(1, 6)
) %>%
  sdf_copy_to(sc, ., "test_spark")

# Create a data frame that purrr::possibly should return in case of error.
default_spark <- tibble(userid = -1, max = -1, min = -1) %>%
  sdf_copy_to(sc, ., "default_spark")

#####################################################
# Method 1: gapply with partitionMethod = "group_by".
#####################################################

# Create a function which may throw an error. The group column, userid, is not 
# included since gapply( , partitionMethod = "group_by") creates it.
# - A print statement is included to show that when gapply uses "group_by", the 
# function is only called once.

fun_for_groups <- function(sdf) {
  temp <- sample(c(1,2), 1)
  print(temp)
  if (temp == 2) {
    log("a")
  } else {
    sdf %>%
      summarise(max = max(occurred_at),
                min = min(occurred_at))
  }
}

# Wrap the risk function to try and handle the error gracefully.

safe_for_groups <- purrr::possibly(fun_for_groups, otherwise = default_spark)

# Apply the safe function to each userid using gapply and "group_by".
# - The result is either a) only the default_spark data frame.
#                        b) the result expected if no error occurs in fun_for_groups.
#   I would expect the answer to have a mixture of default_spark rows and correct rows.

replyr::gapply(
  test_spark, 
  gcolumn = "userid", 
  f = safe_for_groups, 
  partitionMethod = "group_by"
)

#####################################################
# Method 2: gapply with partitionMethod = "extract".
#####################################################

# Create a function which may throw an error. The group column, userid, is 
# included since gapply( , partiionMethod = "extract") doesn't create it.
# - Include a print statement to show that when gapply uses partitionMethod 
#   "split", the function is called for each userid.

fun_for_extract <- function(df) {
  temp <- sample(c(1,2), 1)
  print(temp)
  if (temp == 2) {
    log("a")
  } else {
    df %>%
      summarise(max = max(occurred_at), 
                min = min(occurred_at),
                userid = min(userid))
  }
}

safe_for_extract <- purrr::possibly(fun_for_extract, otherwise = default_spark)

# Apply that function to each userid using gapply and "split".
# - The result dataframe has a mixture of "otherwise" rows and correct rows.

replyr::gapply(
  test_spark, 
  gcolumn = "userid", 
  f = safe_for_extract, 
  partitionMethod = "extract"
)

当grouping列有数百万个值时,使用gapply是多么糟糕的想法?除了上面提到的错误处理策略之外,还有其他的选择吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-06-04 05:49:45

replyr::gapply()只是dplyr (在本例中是sparklyr)之上的一个薄薄的包装器。

对于分组模式-只有在没有组错误的情况下结果才是正确的,因为计算是一次性发出的。这是最有效的模式,但不能真正实现任何类型的错误处理。

对于提取模式-可以添加错误处理,但当前代码没有。

作为replyr的作者,我实际上建议研究一下sparklyrspark_apply()方法。replyr的gapply是在spark_apply()sparklyr中不可用时设计的(在sparklyr中绑定数据列表也是不可用的)。

还有replyr is mostly in "maintenance mode" (用于在大型项目中使用它的客户的补丁问题),对于新项目来说可能不是一个好的选择。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56432145

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档