我在试着把管道平行化。管道中有一个tidyr命令("tidyr::complete")。这会分解并行运行的代码,因为对象类不被识别。
在dplyr中是否有可供选择的方法来完成?
library(dplyr)
library(tidyr)
library(zoo)
test <- tibble(year=c(1,2,3,4,5,5,1,4,5),
var_1=c(1,1,1,1,1,1,2,2,2),
var_2=c(1,1,1,1,1,2,3,3,3),
var_3=c(0,5,NA,15,20,NA,1,NA,NA))
max_year <- max(test$year,na.rm = T)
min_year <- min(test$year,na.rm = T)系列
test_serial <- test %>%
group_by(var_1,var_2) %>%
complete(var_1, year = seq(min_year,max_year)) %>%
mutate(
var_3 = na.approx(var_3,na.rm = FALSE),
var_3 = if(all(is.na(var_3))) NA else na.spline(var_3,na.rm = FALSE))并行(失败)
devtools::install_github("hadley/multidplyr")
library(multidplyr)
cl <- new_cluster(2)
cluster_copy(cl, c("test","max_year","min_year"))
cluster_library(cl, c("dplyr","tidyr","zoo"))
test_parallel <- test %>% group_by(var_1,var_2) %>% partition(cl)
test_parallel <- test_parallel %>%
dplyr::group_by(var_1,var_2) %>%
tidyr::complete(var_1, year = seq(min_year,max_year)) %>%
dplyr::mutate(
var_3 = na.approx(var_3,na.rm = FALSE),
var_3 = if(all(is.na(var_3))) NA else na.spline(var_3,na.rm = FALSE)) %>%
collect()这是错误消息
Error in UseMethod("complete_") :
no applicable method for 'complete_' applied to an object of class "multidplyr_party_df"发布于 2020-06-29 15:35:55
Multidplyr允许您:
partition()拆分数据collect()结果所有数据处理任务都不适合以前的工作流。
特别是,complete需要知道输入数据中的所有可能值,以便创建缺少的行,这意味着整个操作不能被拆分,这就是为什么没有适用的方法可用的原因。
在您提供的示例中,每个节点将接收单个var_1, var_2对,而不知道其他节点得到了什么,这不允许并行地实现预期的结果。
但是,正如您已经知道的year = seq(min_year,max_year),您可以只为这个变量并行complete任务,通过var_1拆分任务,例如使用furrr包:
library(furrr)
plan(multiprocess)
test_parallel <- test %>%
group_by(var_1,var_2) %>%
complete(var_1) %>% split(.$var_1) %>%
furrr::future_map(~{
complete(.x, year = seq(min_year,max_year)) %>%
dplyr::mutate(
var_3 = na.approx(var_3,na.rm = FALSE),
var_3 = if(all(is.na(var_3))) NA else na.spline(var_3,na.rm = FALSE))
}) %>% bind_rows()
> identical(c(test_serial$var_1,test_serial$var_2,test_serial$var_3,test_serial$year),
+ c(test_parallel$var_1,test_parallel$var_2,test_parallel$var_3,test_parallel$year))
[1] TRUE要在更大的数据集上进行测试,以度量潜在的性能改进。
https://stackoverflow.com/questions/62553822
复制相似问题