首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >SnappyData : java.lang.OutOfMemoryError:超过GC开销限制

SnappyData : java.lang.OutOfMemoryError:超过GC开销限制
EN

Stack Overflow用户
提问于 2017-10-04 09:57:09
回答 1查看 767关注 0票数 1

我在S3上有1.2GB的orc数据,并且我试图用相同的方法执行以下操作:

1)缓存snappy集群snappydata 0.9上的数据

2)在缓存的数据集上执行groupby查询

3)与Spark2.0.0的性能比较。

我使用的是64 GB/8核心计算机,Snappy集群的配置如下所示:

代码语言:javascript
复制
$ cat locators
localhost

$cat leads
localhost -heap-size=4096m -spark.executor.cores=1

$cat servers
localhost -heap-size=6144m
localhost -heap-size=6144m
localhost -heap-size=6144m
localhost -heap-size=6144m
localhost -heap-size=6144m
localhost -heap-size=6144m

现在,我编写了一个小的python脚本,缓存来自S3的orc数据并按查询运行一个简单的组,如下所示:

代码语言:javascript
复制
from pyspark.sql.snappy import SnappyContext
from pyspark import SparkContext,SparkConf
conf = SparkConf().setAppName('snappy_sample')
sc = SparkContext(conf=conf)
sqlContext = SnappyContext(sc)

sqlContext.sql("CREATE EXTERNAL TABLE if not exists my_schema.my_table using orc options(path 's3a://access_key:secret_key@bucket_name/path')")
sqlContext.cacheTable("my_schema.my_table")

out = sqlContext.sql("select *  from my_schema.my_table where (WeekId = '1') order by sum_viewcount desc limit 25")
out.collect()

上面的脚本使用以下命令执行:

代码语言:javascript
复制
spark-submit --master local[*] snappy_sample.py

我得到了以下错误:

代码语言:javascript
复制
17/10/04 02:50:32 WARN memory.MemoryStore: Not enough space to cache rdd_2_5 in memory! (computed 21.2 MB so far)
17/10/04 02:50:32 WARN memory.MemoryStore: Not enough space to cache rdd_2_0 in memory! (computed 21.2 MB so far)
17/10/04 02:50:32 WARN storage.BlockManager: Persisting block rdd_2_5 to disk instead.
17/10/04 02:50:32 WARN storage.BlockManager: Persisting block rdd_2_0 to disk instead.
17/10/04 02:50:47 WARN storage.BlockManager: Putting block rdd_2_2 failed due to an exception
17/10/04 02:50:47 WARN storage.BlockManager: Block rdd_2_2 could not be removed as it was not found on disk or in memory
17/10/04 02:50:47 ERROR executor.Executor: Exception in task 2.0 in stage 0.0 (TID 2)
java.lang.OutOfMemoryError: GC overhead limit exceeded


at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
    at org.apache.spark.sql.execution.columnar.compression.CompressibleColumnBuilder$class.build(CompressibleColumnBuilder.scala:96)
    at org.apache.spark.sql.execution.columnar.NativeColumnBuilder.build(ColumnBuilder.scala:97)
    at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1$$anonfun$next$2.apply(InMemoryRelation.scala:135)
    at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1$$anonfun$next$2.apply(InMemoryRelation.scala:134)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
    at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:134)
    at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:98)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:232)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:935)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:331)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:320)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:284)
    at org.apache.spark.sql.execution.WholeStageCodegenRDD.compute(WholeStageCodegenExec.scala:496)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:320)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:284)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:320)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:284)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
17/10/04 02:50:47 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-2,5,main]
java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
    at org.apache.spark.sql.execution.columnar.compression.CompressibleColumnBuilder$class.build(CompressibleColumnBuilder.scala:96)
    at org.apache.spark.sql.execution.columnar.NativeColumnBuilder.build(ColumnBuilder.scala:97)
    at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1$$anonfun$next$2.apply(InMemoryRelation.scala:135)
    at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1$$anonfun$next$2.apply(InMemoryRelation.scala:134)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
    at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:134)
    at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:98)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:232)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:935)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:331)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:320)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:284)
    at org.apache.spark.sql.execution.WholeStageCodegenRDD.compute(WholeStageCodegenExec.scala:496)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:320)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:284)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:320)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:284)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
17/10/04 02:50:48 INFO snappystore: VM is exiting - shutting down distributed system

除了上述错误外,如何检查数据是否已缓存在snappy集群中?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-10-04 11:44:52

1)首先,它看起来不像是用python脚本连接到SnappyData集群,而是在本地模式下运行它。在这种情况下,python脚本启动的JVM在OOM中失败了。在“SnappyData”模式下使用python到智能连接器集群时:

代码语言:javascript
复制
spark-submit --master local[*] --conf snappydata.connection=locator:1527 snappy_sample.py

上面的主机:端口是运行节俭服务器的定位器主机和端口(默认情况下是1527)。

2)第二,您所举的示例将只是使用Spark缓存。如果要使用SnappyData,请将其加载到列表中:

代码语言:javascript
复制
from pyspark.sql.snappy import SnappySession
from pyspark import SparkContext,SparkConf
conf = SparkConf().setAppName('snappy_sample')
sc = SparkContext(conf=conf)
session = SnappySession(sc)

session.sql("CREATE EXTERNAL TABLE if not exists my_table using orc options(path 's3a://access_key:secret_key@bucket_name/path')")
session.table("my_table").write.format("column").saveAsTable("my_column_table")

out = session.sql("select *  from my_column_table where (WeekId = '1') order by sum_viewcount desc limit 25")
out.collect()

还要注意"SnappySession“的使用,而不是上下文的使用,因为Spark2.0.x就不再推荐它了。当与火花缓存进行比较时,您可以在单独的脚本中使用"cacheTable“,并在上游火花上运行。请注意,"cacheTable“将懒洋洋地进行缓存,这意味着第一个查询将执行实际的缓存,因此第一个查询的运行将非常缓慢,但是后续的缓存应该会更快。

3)更新到有许多改进的1.0版本,而不是使用0.9。在启动集群之前,还需要将hadoop-aws-2.7.3aws-java-sdk-1.7.4添加到conf/leads和conf/server(或放入产品的jars目录)中的"-classpath“中。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/46561989

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档