首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spark streaming 1.6.0 - Executors弹跳

Spark streaming 1.6.0 - Executors弹跳
EN

Stack Overflow用户
提问于 2016-03-08 05:10:37
回答 1查看 2.2K关注 0票数 7

我们使用在AWS EMR 4.3.x上运行的Spark Streaming 1.6.0,使用Kinesis流中的数据。用于迁移后在Spark 1.3.1中正常工作,我们无法长期承受负载。Ganglia显示,集群的已用内存一直在增长,直到在没有GC的情况下达到某个限制。在那之后,有几个非常长的微批次(以几十分钟而不是几秒为单位)。然后Spark开始杀死和弹跳执行者(一遍又一遍),

基本上,集群变得不可用。这个问题可以在负载下一次又一次地重现。Spark在不杀死执行者的情况下GC失败的原因是什么?如何让集群运行几周(目前不能运行几个小时)

欢迎任何意见。

在定义作业时,我们使用以下定义:

代码语言:javascript
复制
sparkConf.set("spark.shuffle.consolidateFiles", "true");
sparkConf.set("spark.storage.memoryFraction", "0.5");
sparkConf.set("spark.streaming.backpressure.enabled", "true");
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

做一个联合

代码语言:javascript
复制
KinesisUtils.createStream(streamingContext, appName,
                        kinesisStreamName, kinesisEndpoint, awsRegionName, initialPositionInStream, checkpointInterval,
                        StorageLevel.MEMORY_AND_DISK_2());

为了进行测试,我已经将我们的应用程序简化为一个简单的框架。保持字节流到字符串流的映射,然后转换为对象,过滤掉不相关的事件,然后持久化并存储到S3中。

eventStream = eventStream.persist(StorageLevel.MEMORY_AND_DISK_SER_2());

eventStream =新建eventStream.foreachRDD( eventStream.repartition(configuration.getSparkOutputPartitions());OutputToS3Function());

使用以下配置提交Spark作业(从默认Spark配置中复制内存大小修改):

代码语言:javascript
复制
spark.executor.extraJavaOptions -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p' -XX:PermSize=256M -XX:MaxPermSize=256M 
spark.driver.extraJavaOptions -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p' -XX:PermSize=512M -XX:MaxPermSize=512M 

添加异常。第1个集群

代码语言:javascript
复制
16/03/06 13:54:52 WARN BlockManagerMaster: Failed to remove broadcast 1327 with removeFromMaster = true - Cannot receive any reply in 120 seconds. This timeout is controlled by spark.rpc.askTimeout
org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 120 seconds. This timeout is controlled by spark.rpc.askTimeout
        at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
        at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
        at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185)
        at scala.util.Try$.apply(Try.scala:161)
        at scala.util.Failure.recover(Try.scala:185)
        at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
        at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
        at org.spark-project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
        at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133)
        at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
        at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
        at scala.concurrent.Promise$class.complete(Promise.scala:55)
        at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
        at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
        at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
        at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.processBatch$1(Future.scala:643)
        at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply$mcV$sp(Future.scala:658)
        at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply(Future.scala:635)
        at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply(Future.scala:635)
        at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
        at scala.concurrent.Future$InternalCallbackExecutor$Batch.run(Future.scala:634)
        at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
        at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:685)
        at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
        at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
        at scala.concurrent.Promise$class.tryFailure(Promise.scala:112)
        at scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:153)
        at org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:241)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply in 120 seconds
        at org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:242)
        ... 7 more
16/03/06 13:54:52 ERROR ContextCleaner: Error cleaning broadcast 1327
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout
        at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
        at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
        at org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:136)
        at org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:228)
        at org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45)
        at org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:67)
        at org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:233)
        at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:189)
        at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:180)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:180)
        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1180)
        at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:173)
        at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:68)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:107)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
        ... 12 more

16/03/06 13:55:04 ERROR YarnClusterScheduler: Lost executor 6 on ip-***-194.ec2.internal: Container killed by YARN for exceeding memory limits. 11.3 GB of 11.3 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
1
16/03/06 13:55:10 ERROR YarnClusterScheduler: Lost executor 1 on ip-***-193.ec2.internal: Container killed by YARN for exceeding memory limits. 11.3 GB of 11.3 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

第二次集群尝试:

代码语言:javascript
复制
16/03/07 14:24:38 ERROR server.TransportChannelHandler: Connection to ip-***-22.ec2.internal/N.N.N.22:40791 has been quiet for 120000 ms while there are outstanding requests. Assuming connection is dead; please adjust spark.network.timeout if this is wrong.
16/03/07 14:24:38 ERROR client.TransportResponseHandler: Still have 12 requests outstanding when connection from ip-***-22.ec2.internal/N.N.N.22:40791 is closed
16/03/07 14:24:38 ERROR netty.NettyBlockTransferService: Error while uploading block input-47-1457357970366
java.io.IOException: Connection from ip-***-22.ec2.internal/N.N.N.22:40791 closed
        at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124)
        at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
        at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
        at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
        at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
        at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739)
        at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at java.lang.Thread.run(Thread.java:745)
16/03/07 14:24:38 ERROR netty.NettyBlockTransferService: Error while uploading block input-15-1457357969730
java.io.IOException: Connection from ip-***-22.ec2.internal/N.N.N.22:40791 closed
        at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124)
        at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
        at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
        a.io.IOException: Connection from ip-***-22.ec2.internal/N.N.N.22:40791 closed
        at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124)
        at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
        at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
        at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
        at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
        at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739)
        at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at java.lang.Thread.run(Thread.java:745)
t io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
        at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
        at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
        at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739)
        at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at java.lang.Thread.run(Thread.java:745)

先谢谢你...

EN

回答 1

Stack Overflow用户

发布于 2016-03-15 20:05:20

我确实得到了同样的东西..当一个微批处理花费超过120秒完成时,它会触发:

代码语言:javascript
复制
16/03/14 22:57:30 INFO SparkStreaming$: Batch size: 2500, total read: 4287800
16/03/14 22:57:35 INFO SparkStreaming$: Batch size: 2500, total read: 4290300
16/03/14 22:57:42 INFO SparkStreaming$: Batch size: 2500, total read: 4292800
16/03/14 22:57:45 INFO SparkStreaming$: Batch size: 2500, total read: 4295300
16/03/14 22:59:45 ERROR ContextCleaner: Error cleaning broadcast 11251
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout
        at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
        at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
        at org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:136)
        at org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:228)
        at org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45)
        at org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:67)
        at org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:233)
        at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:189)
        at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:180)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:180)
        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1180)
        at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:173)
        at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:68)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:107)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
        ... 12 more

我在本地模式下运行,并使用Kinesis。我也没有使用任何广播变量。

票数 -2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/35854082

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档