所以我一直试图对一个数据集执行累加运算。我想强调的是,我希望我的累积量发生在我的数据集中的分区上。feature1随时间累积的累积量( personA)。
我知道如何做到这一点,而且它“完全独立”地工作--我稍后会解释这部分。下面是一段代码:
// it's admitted that this DF contains all data I need
// with one column/possible value, with only 1/0 in each line
// 1 <-> feature has the value
// 0 <-> feature doesn't contain the value
// this DF is the one I get after the one-hot operation
// this operation is performed to apply ML algorithms on features
// having simultaneously multiple values
df_after_onehot.createOrReplaceTempView("test_table")
// @param DataFrame containing all possibles values eg. A, B, C
def cumSumForFeatures(values: DataFrame) = {
values
.map(value => "CAST(sum(" + value(0) + ") OVER (PARTITION BY person ORDER BY date) as Integer) as sum_" + value(0))
.reduce(_+ ", " +_)
}
val req = "SELECT *, " + cumSumForFeatures(possible_segments) + " FROM test_table"
// val req = "SELECT * FROM test_table"
println("executing: " + req)
val data_after_cumsum = sqLContext.sql(req).orderBy("person", "date")
data_after_cumsum.show(10, false)这个问题发生在我尝试执行相同的操作之前,一些预处理(比如一个热的操作,或添加之前的计算特性)。我尝试了一个非常小的数据集,但它不起作用。
下面是打印的堆栈跟踪(至少应该访问您的部分):
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
[Executor task launch worker-3] ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-3,5,main]
java.lang.OutOfMemoryError: GC overhead limit exceeded那么,它似乎与GC问题/JVM堆大小有关吗?我只是不明白这和我的预处理有什么关系?
我的pom.xml的摘录(适用于Java、Spark、Scala的版本):
<spark.version>2.1.0</spark.version>
<scala.version>2.10.4</scala.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>你知道我该怎么解决我的问题吗?谢谢
发布于 2017-08-24 08:41:40
据我所知,我认为我们可以有两个理由:
考虑到这一点,我决定“不坚持”不再使用的DataFrames。这似乎并没有多大变化。
然后,我决定删除所有没有必要的显示/打印操作。这大大提高了步数。
我将代码更改为更多功能,但保留了3个单独的值以帮助调试。这并没有多大变化,但我的代码更简洁。
最后,有件事帮助我解决了这个问题。与其让我的请求在一次传递中遍历dataset,我还将特性列表划分为片:
def listOfSlices[T](list: List[T], sizeOfSlices: Int): List[List[T]] =
(for (i <- 0 until list.length by sizeOfSlices) yield list.slice(i, i+sizeOfSlices)).toList我通过一个map操作来执行对每个片段的请求。然后我和他们一起完成我的最后一次DataFrame。这样的话,我就可以分配计算量了,而且这种方式似乎更有效。
val possible_features_slices = listOfSlices[String](possible_features, 5)
val df_cum_sum = possible_features_slices
.map(possible_features_slice =>
dfWithCumSum(sqLContext, my_df, possible_segments_slice, "feature", "time")) // operation described in the original post
.foldLeft[DataFrame](null)((a, b) => if (a == null) b else if (b == null) a else a.join(b, Seq("person", "list_features", "time")))我只想强调,我仍然没有理解问题背后的原因,而且我仍然希望在这个层次上得到答案。
https://stackoverflow.com/questions/45833762
复制相似问题