我有一个作业运行在spark上,它是用scala编写的,使用spark。由于“按操作分组”的开销很大,我得到了以下错误:Container killed by YARN for exceeding memory limits. 22.4 GB of 22 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead。我增加了记忆,但我得到了同样的。我使用10台r4.xsize的机器。我尝试使用r4.2xlarge,甚至r4.4xlarge,但也使用了相同的错误。该数据正在is 5GB压缩数据上进行测试(将近50个解压缩数据和近600万条记录)。
一些配置:
spark.executor.memory:20480 M spark.driver.memory:21295M spark.yarn.executor.memoryOverhead:2g spark.executor.instances:10
代码是这样的:
val groupedEntitiesRDD = datasetRDD
.groupBy(_.entityId)
.map({ case (key, valueIterator) => key -> valueIterator.toList })
.persist(StorageLevel.MEMORY_AND_DISK)
val deduplicatedRDD = groupedEntitiesRDD
.flatMap({ case (_, entities) => deduplication(entities) })
def deduplication(entities: List[StreamObject[JsValue]]): List[StreamObject[JsValue]] = {
entities
.groupBy(_.deduplicationKey)
.values
.map(duplicates => duplicates.maxBy(_.processingTimestamp.toEpochSecond))
.toList
}发布于 2017-08-03 09:42:05
根据我的经验和我在Spark2.x发布说明中所读到的内容,我们需要分配比在Spark1.x中更多的堆外内存(spark.yarn.executor.memoryOverhead)。
您只将2G分配给memoryOverhead和20 to内存。我相信,如果您将其更改为8G memoryOverhead和14 8G执行器内存,您会得到更好的结果。
如果您仍然遇到内存问题(如抛出实际OOMs ),则需要查看数据倾斜。特别是groupBy操作经常会导致严重的数据偏差。
最后一件事是,您使用RDDs --我希望您的意思是DataFrames或DataSets?RDDs在groupBy方面的性能非常低(例如,请参阅这博客文章中的原因),所以如果您在RDDs上,则应该使用reduceByKey。但实际上,您应该使用DataFrames (或DataSets),在这里groupBy确实是正确的选择。
编辑!
您在评论中询问了如何将groupBy转换为reduceByKey。你可以这样做:
datasetRDD
.map{case(entityID, streamObject) => (entityID, List(streamObject))}
.reduceByKey(_++_)
.flatMap{case(_, entities) => deduplication(entities)您还没有指定这些实体的数据结构,但是看起来您正在寻找一些最大值,实际上是丢弃了不想要的数据。这应该内置到reduceByKey-operation中,以便在减少不必要的数据的同时过滤掉不必要的数据。
https://stackoverflow.com/questions/45479813
复制相似问题