在Spark的Java/Scala/Python实现中,可以简单地调用RDD或DataFrame类型的DataFrame方法,以便并行化数据集上的迭代。
在SparkR中,我找不到这样的指令。对DataFrame行进行迭代的正确方法是什么?
我只能找到gapply和dapply函数,但是我不想计算新的列值,我只是想通过从列表中并行地获取一个元素来做一些事情。
我之前的尝试是用lapply
inputDF <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "")
createOrReplaceTempView(inputDF,'inputData')
distinctM <- sql('SELECT DISTINCT(ID_M) FROM inputData')
collected <- collect(distinctM)[[1]]
problemSolver <- function(idM) {
filteredDF <- filter(inputDF, inputDF$ID_M == idM)
}
spark.lapply(c(collected), problemSolver)但我发现了一个错误:
Error in handleErrors(returnStatus, conn) :
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 5.0 failed 1 times, most recent failure: Lost task 1.0 in stage 5.0 (TID 207, localhost, executor driver): org.apache.spark.SparkException: R computation failed with
Error in callJMethod(x@sdf, "col", c) :
Invalid jobj 3. If SparkR was restarted, Spark operations need to be re-executed.
Calls: compute ... filter -> $ -> $ -> getColumn -> column -> callJMethodR为解决这些问题提供了什么解决方案?
发布于 2017-05-16 21:24:49
我也有类似的问题。收集一个dataframe会将它作为一个数据文件放入R中。从那里,你可以像通常的老R一样在每一行得到,在我看来,这对于处理数据来说是一个可怕的主题,因为你失去了并行处理火花提供的。不要收集数据,然后进行过滤,而是使用内置的SparkR函数、select、filter等。如果您希望执行逐行操作,内置的SparkR函数通常会为您做到这一点,否则,我发现当最初的selectExpr函数设计为处理单个值时,selectExpr或expr非常有用(想想看: from_unix_timestamp)。
因此,为了得到您想要的,我会尝试这样的方法(我在SparkR 2.0+上):
如你所做的那样,第一次阅读这些数据:
inputDF<- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "")然后使其成为RDD:inputSparkDF<- SparkR::createDataFrame(inputDF)
接下来,只分离不同的/唯一的值(我使用magrittr作为管道(在SparkR中工作)):
distinctSparkDF<- SparkR::select(inputSparkDF) %>% SparkR::distinct()从这里开始,您可以在仍然生活在星火世界中的时候应用过滤:
filteredSparkDF<- SparkR::filter(distinctSparkDF, distinctSparkDF$variable == "value")
在Spark为您筛选了这些数据之后,将子集收集到基本R中是有意义的,这是工作流中的最后一步:
myRegularRDataframe<- SparkR::collect(filteredSparkDF)
我希望这能帮到你。祝你好运。-内特
https://stackoverflow.com/questions/41816328
复制相似问题