我正试图在星火中开发一个很重的数学计算,无论是在时间上还是在内存方面(对于两者都是O(n^2) )。我已经发现,包含Iterator的分区实际上并不适合大演算,因为它强制每一行实例化一个对象(尽管它是Iterator,所以很懒惰)。实际上,在一个最简单的场景中,例如,每一行都有一个向量。但这对内存是有害的,因为我们知道对象的JVM开销和GC所承受的所有压力,以及速度,因为我可以真正提高性能,将线性代数操作提高到BLAS级别-3(矩阵由矩阵代替矩阵,而不是我在这个范例中一直使用的矩阵)。在一个非常简单的例子中,我想要实现的是:
while (???) { // loop over some condition, doesn't really matter what
val matrix = ??? // an instance of a matrix
val broadMatrix = sparkContext.broadcast(matrix)
// rdd is an instance of RDD[Vector] that is already cached
rdd.mapPartition {
iter =>
val matrixValue = broadMatrix.value()
iter.map (vector => matrixValue * vec)
}
// a bunch of other things relying on that result
}以下是我的想法:
rdd是缓存的,那么拥有一个Iterator是无用的,不是吗?因为它的唯一优点是不在内存中同时保存所有行:但是在这里,它被计算和缓存,所以所有的行都保存在内存中.是的,当然,人们可能会认为Spark可能有一个可以序列化和压缩数据的智能缓存(但是,当存储级别为MEMORY_ONLY时,我对此表示怀疑.)。rdd中有很多JVM对象,但是我可以将其降低到每个分区的一个JVM对象。我甚至可以将其降低到每个Executor都有一个scala object,它将作为生活在同一个执行器上的所有分区的共享内存(我担心这可能很难处理,因为我希望保持Spark的恢复能力,因此,如果一个分区出于任何原因被移除并重新出现在另一个执行器上,我不想自己处理它,而是让Spark自己移动所有相关的对象.)。因此,我的想法是将这个rdd of vector转换为一个包含矩阵的矩阵,如下所示:
while (???) { // loop over some condition, doesn't really matter what
val matrix = ??? // an instance of a matrix
val broadMatrix = sparkContext.broadcast(matrix)
// rdd is an instance of RDD[Vector] that is already cached
rdd.mapPartition {
iter =>
val matrixValue = broadMatrix.value()
// iter actually contains one single element which is the matrix containing all vectors stacked
// here we have a BLAS-3 operation
iter.map (matrix => matrixValue * matrix)
}
// a bunch of other things relying on that result
}有人已经面对过这种困境了吗?您是否经历过预先使用内存管理作为这一?
发布于 2017-08-10 09:45:38
因为我确实可以提高性能,把我的线性代数运算提高到BLAS级别-3(用矩阵代替向量矩阵,这是我在这个范例中所坚持的)。
使用Iterators并不强制您以任何方式使用Vectors,甚至每个分区都不需要使用一个以上的元素。如果需要,可以为每个拆分轻松地创建一个Matrix对象。
这对内存是有害的,因为我们知道对象的JVM开销和GC所承受的所有压力。
我认为这比这要复杂得多。使用Iterators的原因是能够处理大于内存的分区。使用懒惰的Iterators和小对象,Spark可以将部分结果泄漏到磁盘,并使它们可用于垃圾收集。当您使用单个大型对象时,这种情况不会发生。根据我的经验,星火更容易受到大对象的GC问题的影响。
基于我怀疑的描述,避免显式存储数据和使用堆外内存显式初始化对象是有意义的。这将使GC不受影响,并允许您处理大型对象。但它远高于可能支付的等级。
https://stackoverflow.com/questions/45607805
复制相似问题