我的设置是:星火2.1在一个3节点纱线集群,160 GB,48 vcore。启动动态分配。spark.executor.memory=6G,spark.executor.cores=6
首先,我正在读取蜂窝表: orders (329 am )和lineitems (1.43GB),并执行左外部联接。接下来,我根据连接的数据集(比如var line1 = joinedDf.filter("linenumber=1")、var line2 = joinedDf.filter("l_linenumber=2")等)应用了7种不同的过滤条件。因为我对已连接的数据集进行了多次筛选,所以我认为在这里执行持久化(MEMORY_ONLY)会有所帮助,因为合并的数据集将完全适合内存。
java.lang.OutOfMemoryError: Java heap space。我试着增加/减少执行器内核,只保留磁盘,增加分区,修改存储比,但是似乎没有什么能解决执行器内存问题。如果有人能提到持久化是如何工作的,在什么情况下持久化更快--持久化,更重要的是如何排除内存问题,我将不胜感激。
发布于 2017-09-08 19:14:07
我建议阅读变换和行为在spark中的不同之处。我必须承认,我自己也曾多次被这个咬过。
火花中的数据被延迟地进行评估,这实际上意味着在执行“操作”之前什么都不会发生。.filter()函数是一个转换,所以当您的代码到达那个点时,除了向转换管道添加一个节外,实际上不会发生任何事情。对.persist()的调用以同样的方式进行。
如果.persist()调用的下游代码有多个可以同时触发的操作,那么很可能您实际上是在单独“保存”每个操作的数据,并消耗内存( Spark中的“存储”选项卡将告诉您数据集的缓存%,如果数据集缓存超过100%,那么您将看到我在这里描述的内容)。更糟糕的是,您可能从未真正使用缓存的数据。
通常,如果代码中有数据集分叉为两个单独的转换管道(在示例中是每个单独的.filter())的点,那么.persist()是一个好主意,可以防止对数据源进行多次读取,并/或在分叉之前保存昂贵的转换管道的结果。
很多时候,最好在.persist()调用之后(在数据分叉之前)触发单个操作,以确保以后的操作(可能同时运行)从持久化缓存中读取,而不是独立地评估(和无用地缓存)数据。
TL;DR:
在您的joinedDF.count()之后,但在您的.filter()之前执行一个.filter()。
https://stackoverflow.com/questions/46101585
复制相似问题