首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >按键分组时火花耗尽内存

按键分组时火花耗尽内存
EN

Stack Overflow用户
提问于 2014-03-25 14:26:55
回答 2查看 20K关注 0票数 13

我试图在EC2上使用本指南对普通爬行数据进行简单转换,我的代码如下所示:

代码语言:javascript
复制
package ccminer

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

object ccminer {
  val english = "english|en|eng"
  val spanish = "es|esp|spa|spanish|espanol"
  val turkish = "turkish|tr|tur|turc"
  val greek = "greek|el|ell"
  val italian = "italian|it|ita|italien"
  val all = (english :: spanish :: turkish :: greek :: italian :: Nil).mkString("|")

  def langIndep(s: String) = s.toLowerCase().replaceAll(all, "*")

  def main(args: Array[String]): Unit = {
    if (args.length != 3) {
      System.err.println("Bad command line")
      System.exit(-1)
    }

    val cluster = "spark://???"
    val sc = new SparkContext(cluster, "Common Crawl Miner",
      System.getenv("SPARK_HOME"), Seq("/root/spark/ccminer/target/scala-2.10/cc-miner_2.10-1.0.jar"))

    sc.sequenceFile[String, String](args(0)).map {
      case (k, v) => (langIndep(k), v)
    }
    .groupByKey(args(2).toInt)
    .filter {
      case (_, vs) => vs.size > 1
    }
    .saveAsTextFile(args(1))
  }
}

我使用以下命令运行它:

代码语言:javascript
复制
sbt/sbt "run-main ccminer.ccminer s3n://aws-publicdatasets/common-crawl/parse-output/segment/1341690165636/textData-* s3n://parallelcorpus/out/ 2000"

但是很快就失败了,错误如下所示

代码语言:javascript
复制
java.lang.OutOfMemoryError: Java heap space
at com.ning.compress.BufferRecycler.allocEncodingBuffer(BufferRecycler.java:59)
at com.ning.compress.lzf.ChunkEncoder.<init>(ChunkEncoder.java:93)
at com.ning.compress.lzf.impl.UnsafeChunkEncoder.<init>(UnsafeChunkEncoder.java:40)
at com.ning.compress.lzf.impl.UnsafeChunkEncoderLE.<init>(UnsafeChunkEncoderLE.java:13)
at com.ning.compress.lzf.impl.UnsafeChunkEncoders.createEncoder(UnsafeChunkEncoders.java:31)
at com.ning.compress.lzf.util.ChunkEncoderFactory.optimalInstance(ChunkEncoderFactory.java:44)
at com.ning.compress.lzf.LZFOutputStream.<init>(LZFOutputStream.java:61)
at org.apache.spark.io.LZFCompressionCodec.compressedOutputStream(CompressionCodec.scala:60)
at org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:803)
at org.apache.spark.storage.BlockManager$$anonfun$5.apply(BlockManager.scala:471)
at org.apache.spark.storage.BlockManager$$anonfun$5.apply(BlockManager.scala:471)
at org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:117)
at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:174)
at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:164)
at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)

因此,我的基本问题是,编写一个可以按键分组的Spark任务需要什么?输入的数量几乎是无限的,而不会耗尽内存?

EN

回答 2

Stack Overflow用户

发布于 2014-04-18 10:27:51

洗牌任务中java.lang.OutOfMemoryError异常的最常见原因(如groupByKey、reduceByKey等)是低水平的平行性

可以通过在spark.default.parallelism中设置配置属性来增加默认值。

票数 16
EN

Stack Overflow用户

发布于 2015-07-14 11:35:35

因此,这说明您已经用完了分配的堆空间,的JVM。您可能会增加堆大小,但这仍然受到系统功能的限制(不能超过物理RAM的数量)。

另一方面,正如homutov所解释的,这种情况发生在大型收集操作中。例如groupByKey,reduceByKey,cartisien + mapToPair。这些操作将将RDD数据收集到一个地方,从而使JVM耗尽堆空间

你能做什么?

根据我的经验,当集群/系统资源有限时,可以使用火花调谐导轨spark.default.parallelism可以增加,直到您可以将任务伴随到集群/系统中--我曾经通过调整并行性在我的笔记本虚拟机上运行了一个针对14000实例,1024特性数据集的KNN实现。

代码语言:javascript
复制
Command line flag :   --conf spark.default.parallelism=4   ; 4 is the parallelism value

请记住,您需要这些特性调到最有效的和避免失败的设置(耗尽堆)设置,以获得最佳结果。

Additionally

记住使用原语数据类型,而不是使用包装器。并使用collections.代替数组

代码语言:javascript
复制
 ex :  List<Integers> vs int[] ; int[] is better than List 

在星火阵中,阵列可以节省许多宝贵的空间,提高性能。

还可以使用BroadCast变量,而不是笛卡尔积或任何大型组合任务。

票数 6
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/22637518

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档