首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如果在数据处理之前进行预处理,则处理时间太长。

如果在数据处理之前进行预处理,则处理时间太长。
EN

Stack Overflow用户
提问于 2017-08-23 07:51:16
回答 1查看 202关注 0票数 1

所以我一直试图对一个数据集执行累加运算。我想强调的是,我希望我的累积量发生在我的数据集中的分区上。feature1随时间累积的累积量( personA)。

我知道如何做到这一点,而且它“完全独立”地工作--我稍后会解释这部分。下面是一段代码:

代码语言:javascript
复制
// it's admitted that this DF contains all data I need
// with one column/possible value, with only 1/0 in each line
// 1 <-> feature has the value
// 0 <-> feature doesn't contain the value
// this DF is the one I get after the one-hot operation
// this operation is performed to apply ML algorithms on features
// having simultaneously multiple values
df_after_onehot.createOrReplaceTempView("test_table")

// @param DataFrame containing all possibles values eg. A, B, C
def cumSumForFeatures(values: DataFrame) = {
  values
    .map(value => "CAST(sum(" + value(0) + ") OVER (PARTITION BY person ORDER BY date) as Integer) as sum_" + value(0))
    .reduce(_+ ", " +_)
}

val req = "SELECT *, " + cumSumForFeatures(possible_segments) + " FROM test_table"
// val req = "SELECT * FROM test_table"
println("executing: " + req)

val data_after_cumsum = sqLContext.sql(req).orderBy("person", "date")
data_after_cumsum.show(10, false)

这个问题发生在我尝试执行相同的操作之前,一些预处理(比如一个热的操作,或添加之前的计算特性)。我尝试了一个非常小的数据集,但它不起作用。

下面是打印的堆栈跟踪(至少应该访问您的部分):

代码语言:javascript
复制
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded

[Executor task launch worker-3] ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-3,5,main]
java.lang.OutOfMemoryError: GC overhead limit exceeded

那么,它似乎与GC问题/JVM堆大小有关吗?我只是不明白这和我的预处理有什么关系?

  • 我试过对不再使用的DFs进行非持久化手术。
  • 我试着修改我机器上的选项。-Xmx2048m)。
  • 一旦我在AWS上部署,问题也是一样的。

我的pom.xml的摘录(适用于Java、Spark、Scala的版本):

代码语言:javascript
复制
<spark.version>2.1.0</spark.version>
<scala.version>2.10.4</scala.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>

你知道我该怎么解决我的问题吗?谢谢

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-08-24 08:41:40

据我所知,我认为我们可以有两个理由:

  • JVM的堆溢出是因为内存中保留但没有更长时间使用的DataFrames。
  • cum-sum请求可能太大,无法用所剩的少量RAM进行处理。
  • 显示/打印操作增加了作业所需的步骤数,并可能与星火的内部优化有关

考虑到这一点,我决定“不坚持”不再使用的DataFrames。这似乎并没有多大变化。

然后,我决定删除所有没有必要的显示/打印操作。这大大提高了步数。

我将代码更改为更多功能,但保留了3个单独的值以帮助调试。这并没有多大变化,但我的代码更简洁。

最后,有件事帮助我解决了这个问题。与其让我的请求在一次传递中遍历dataset,我还将特性列表划分为片:

代码语言:javascript
复制
def listOfSlices[T](list: List[T], sizeOfSlices: Int): List[List[T]] =
    (for (i <- 0 until list.length by sizeOfSlices) yield list.slice(i, i+sizeOfSlices)).toList

我通过一个map操作来执行对每个片段的请求。然后我和他们一起完成我的最后一次DataFrame。这样的话,我就可以分配计算量了,而且这种方式似乎更有效。

代码语言:javascript
复制
val possible_features_slices = listOfSlices[String](possible_features, 5)

val df_cum_sum = possible_features_slices
  .map(possible_features_slice =>
    dfWithCumSum(sqLContext, my_df, possible_segments_slice, "feature", "time")) // operation described in the original post
  .foldLeft[DataFrame](null)((a, b) => if (a == null) b else if (b == null) a else a.join(b, Seq("person", "list_features", "time")))

我只想强调,我仍然没有理解问题背后的原因,而且我仍然希望在这个层次上得到答案。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/45833762

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档