我的Spark应用程序当前由于YARN试图超过内存限制而导致执行器死亡。在我购买的文档或O‘’Reilly书中,我似乎找不到创建RDD是如何在executors上分配内存的。有人能告诉我下面的代码片段中发生了什么吗?
N = 10
array = numpy.random.random_float(N)
# Is the array actually partitioned and serialized out when this is executed?
# Or when an action using this rdd is called? At this point,
# I would expect 1 float, or 4 bytes on each executor.
rdd1 = sc.parallelize(array, 10)
# Transformations return new rdd's, so now I would expect each executor
# to have 2 floats on it, one from rdd, and one from rdd2, so 8 bytes.
rdd2 = rdd1.map(lambda x: x + 2)
# Here is where things get murky. Would this cause 8 bytes of memory to be used
# to account for the intermediate product of rdd1.map(lambda x: x -2)?
# So in Spark's calculations, if we would now require space for
# 4 floats, one for each rdd1/2/3 and one for the intermediate?
rdd3 = rdd1.map(lambda x: x - 2).map(lambda x: x * 2)
# Is this the point where each executor actually has the memory allocated?
# And since I only call collect() on the first rdd,
# would only 4 bytes be sent out to each executor?
rdd1.collect()
#How about now?
rdd2.collect()发布于 2016-02-03 23:44:26
我创建了一个使用sc.parallelize()的单元测试,并在执行此行后放置一个断点。在调用后续的collect()之前,我没有看到任何内存分配被记录下来。
Java代码...
JavaRDD<String> fooBars = sparkCtx.parallelize(Lists.newArrayList("foo", "bar"));
JavaRDD<String> abcs = fooBars.map(f -> "abc");
abcs.collect(); // break point here在执行collect()时从日志中...
2016-02-03 15:49:14 INFO DAGScheduler:59 - Got job 0 (collect at MyTest.java:40) with 1 output partitions (allowLocal=false)
2016-02-03 15:49:14 INFO DAGScheduler:59 - Final stage: Stage 0(collect at MyTest.java:40)
2016-02-03 15:49:14 INFO DAGScheduler:59 - Parents of final stage: List()
2016-02-03 15:49:14 INFO DAGScheduler:59 - Missing parents: List()
2016-02-03 15:49:14 INFO DAGScheduler:59 - Submitting Stage 0 (ParallelCollectionRDD[0] at parallelize at RddFactory.java:42), which has no missing parents
2016-02-03 15:49:14 INFO MemoryStore:59 - ensureFreeSpace(1416) called with curMem=0, maxMem=991753666
2016-02-03 15:49:14 INFO MemoryStore:59 - Block broadcast_0 stored as values in memory (estimated size 1416.0 B, free 945.8 MB)这与Spark文档一致,该文档解释了在使用终止函数(例如collect())之前,不会执行对非终止函数(例如map())的惰性评估。这个示例还表明,parallelize()也被认为是一个非终止函数,因为直到collect()语句才记录ensureFreeSpace...。
为了解释您关于两个collect()语句的另一个问题,Sparks RDD对象是不可变的,因此当您调用rdd1.collect()时,它会创建此RDD。如果您随后调用rdd2.collect(),它也必须创建一个。
发布于 2016-02-03 23:51:52
嗯,这是一个相当棘手的问题。
首先,您必须包含与您所执行的工作完全相互依赖的内存。这意味着工作进程( 200MB左右)和运行每个Python执行器所需的内存(每个解释器大约30-40MB,不需要任何额外的导入)。
此外,在阶段之间传递的每个数据实际上都是重复的。首先,它必须传递给工作者(JVM),然后通过套接字传递到Python解释器。
最后是Spark实现的细节。默认情况下,Spark在任务之间重用Python解释器。这意味着在释放内存之前,必须对每个任务中创建的临时对象进行垃圾回收。通常这不应该是一个问题,但这绝对是需要牢记的事情。此外,如果有必要,数据可能会溢出到磁盘,这可能会使情况变得更加复杂。
因此,尽管您的计算或多或少是正确的(假设您已经执行了一个操作并使用了float32),但它只是图片的一小部分。
https://stackoverflow.com/questions/35179799
复制相似问题