首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spark :在创建RDD时,如何在executors上管理内存?

Spark :在创建RDD时,如何在executors上管理内存?
EN

Stack Overflow用户
提问于 2016-02-03 22:30:52
回答 2查看 140关注 0票数 0

我的Spark应用程序当前由于YARN试图超过内存限制而导致执行器死亡。在我购买的文档或O‘’Reilly书中,我似乎找不到创建RDD是如何在executors上分配内存的。有人能告诉我下面的代码片段中发生了什么吗?

代码语言:javascript
复制
N = 10
array = numpy.random.random_float(N)

# Is the array actually partitioned and serialized out when this is executed?
# Or when an action using this rdd is called?  At this point,
# I would expect 1 float, or 4 bytes on each executor.
rdd1 = sc.parallelize(array, 10)

# Transformations return new rdd's, so now I would expect each executor 
# to have 2 floats on it, one from rdd, and one from rdd2, so 8 bytes.
rdd2 = rdd1.map(lambda x: x + 2)

# Here is where things get murky.  Would this cause 8 bytes of memory to be used
# to account for the intermediate product of rdd1.map(lambda x: x -2)?  
# So in Spark's calculations, if we would now require space for 
# 4 floats, one for each rdd1/2/3 and one for the intermediate?
rdd3 = rdd1.map(lambda x: x - 2).map(lambda x: x * 2)

# Is this the point where each executor actually has the memory allocated? 
# And since I only call collect() on the first rdd,
# would only 4 bytes be sent out to each executor?
rdd1.collect()

#How about now?
rdd2.collect()
EN

回答 2

Stack Overflow用户

发布于 2016-02-03 23:44:26

我创建了一个使用sc.parallelize()的单元测试,并在执行此行后放置一个断点。在调用后续的collect()之前,我没有看到任何内存分配被记录下来。

Java代码...

代码语言:javascript
复制
JavaRDD<String> fooBars = sparkCtx.parallelize(Lists.newArrayList("foo", "bar"));
JavaRDD<String> abcs = fooBars.map(f -> "abc");
abcs.collect();   // break point here

在执行collect()时从日志中...

代码语言:javascript
复制
2016-02-03 15:49:14 INFO  DAGScheduler:59 - Got job 0 (collect at MyTest.java:40) with 1 output partitions (allowLocal=false)
2016-02-03 15:49:14 INFO  DAGScheduler:59 - Final stage: Stage 0(collect at MyTest.java:40)
2016-02-03 15:49:14 INFO  DAGScheduler:59 - Parents of final stage: List()
2016-02-03 15:49:14 INFO  DAGScheduler:59 - Missing parents: List()
2016-02-03 15:49:14 INFO  DAGScheduler:59 - Submitting Stage 0 (ParallelCollectionRDD[0] at parallelize at RddFactory.java:42), which has no missing parents
2016-02-03 15:49:14 INFO  MemoryStore:59 - ensureFreeSpace(1416) called with curMem=0, maxMem=991753666
2016-02-03 15:49:14 INFO  MemoryStore:59 - Block broadcast_0 stored as values in memory (estimated size 1416.0 B, free 945.8 MB)

这与Spark文档一致,该文档解释了在使用终止函数(例如collect())之前,不会执行对非终止函数(例如map())的惰性评估。这个示例还表明,parallelize()也被认为是一个非终止函数,因为直到collect()语句才记录ensureFreeSpace...

为了解释您关于两个collect()语句的另一个问题,Sparks RDD对象是不可变的,因此当您调用rdd1.collect()时,它会创建此RDD。如果您随后调用rdd2.collect(),它也必须创建一个。

票数 1
EN

Stack Overflow用户

发布于 2016-02-03 23:51:52

嗯,这是一个相当棘手的问题。

首先,您必须包含与您所执行的工作完全相互依赖的内存。这意味着工作进程( 200MB左右)和运行每个Python执行器所需的内存(每个解释器大约30-40MB,不需要任何额外的导入)。

此外,在阶段之间传递的每个数据实际上都是重复的。首先,它必须传递给工作者(JVM),然后通过套接字传递到Python解释器。

最后是Spark实现的细节。默认情况下,Spark在任务之间重用Python解释器。这意味着在释放内存之前,必须对每个任务中创建的临时对象进行垃圾回收。通常这不应该是一个问题,但这绝对是需要牢记的事情。此外,如果有必要,数据可能会溢出到磁盘,这可能会使情况变得更加复杂。

因此,尽管您的计算或多或少是正确的(假设您已经执行了一个操作并使用了float32),但它只是图片的一小部分。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/35179799

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档