首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >SparkR前馈回路

SparkR前馈回路
EN

Stack Overflow用户
提问于 2017-01-23 21:50:45
回答 1查看 1.5K关注 0票数 2

在Spark的Java/Scala/Python实现中,可以简单地调用RDDDataFrame类型的DataFrame方法,以便并行化数据集上的迭代。

在SparkR中,我找不到这样的指令。对DataFrame行进行迭代的正确方法是什么?

我只能找到gapplydapply函数,但是我不想计算新的列值,我只是想通过从列表中并行地获取一个元素来做一些事情。

我之前的尝试是用lapply

代码语言:javascript
复制
inputDF <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "")
createOrReplaceTempView(inputDF,'inputData')

distinctM <- sql('SELECT DISTINCT(ID_M) FROM inputData')

collected <- collect(distinctM)[[1]]

problemSolver <- function(idM) {
  filteredDF <- filter(inputDF, inputDF$ID_M == idM)
}

spark.lapply(c(collected), problemSolver)

但我发现了一个错误:

代码语言:javascript
复制
Error in handleErrors(returnStatus, conn) : 
  org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 5.0 failed 1 times, most recent failure: Lost task 1.0 in stage 5.0 (TID 207, localhost, executor driver): org.apache.spark.SparkException: R computation failed with
 Error in callJMethod(x@sdf, "col", c) : 
  Invalid jobj 3. If SparkR was restarted, Spark operations need to be re-executed.
Calls: compute ... filter -> $ -> $ -> getColumn -> column -> callJMethod

R为解决这些问题提供了什么解决方案?

EN

回答 1

Stack Overflow用户

发布于 2017-05-16 21:24:49

我也有类似的问题。收集一个dataframe会将它作为一个数据文件放入R中。从那里,你可以像通常的老R一样在每一行得到,在我看来,这对于处理数据来说是一个可怕的主题,因为你失去了并行处理火花提供的。不要收集数据,然后进行过滤,而是使用内置的SparkR函数、selectfilter等。如果您希望执行逐行操作,内置的SparkR函数通常会为您做到这一点,否则,我发现当最初的selectExpr函数设计为处理单个值时,selectExprexpr非常有用(想想看: from_unix_timestamp)。

因此,为了得到您想要的,我会尝试这样的方法(我在SparkR 2.0+上):

如你所做的那样,第一次阅读这些数据:

代码语言:javascript
复制
inputDF<- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "")

然后使其成为RDD:inputSparkDF<- SparkR::createDataFrame(inputDF)

接下来,只分离不同的/唯一的值(我使用magrittr作为管道(在SparkR中工作)):

代码语言:javascript
复制
distinctSparkDF<- SparkR::select(inputSparkDF) %>% SparkR::distinct()

从这里开始,您可以在仍然生活在星火世界中的时候应用过滤:

filteredSparkDF<- SparkR::filter(distinctSparkDF, distinctSparkDF$variable == "value")

在Spark为您筛选了这些数据之后,将子集收集到基本R中是有意义的,这是工作流中的最后一步:

myRegularRDataframe<- SparkR::collect(filteredSparkDF)

我希望这能帮到你。祝你好运。-内特

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/41816328

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档