首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >内存外火花流接收器(OOM)

内存外火花流接收器(OOM)
EN

Stack Overflow用户
提问于 2016-07-14 07:10:05
回答 1查看 801关注 0票数 0

我遇到了一个问题,接收器一次又一次地重新启动。

我使用的是星火1.6.1。我使用Spark流从流接收,然后使用map反序列化pb数据。

我的测试包含两个案例:

  1. 只需接收数据并直接打印:应用程序是稳定的。
  2. 接收和反序列化:这会产生问题。发生时间不规律。有500 min/min。我已经将执行器内存设置为8GB。问题就像分配内存的东西一样。但是我不知道为什么。

我的代码:

代码语言:javascript
复制
val conf = new SparkConf().setAppName(args(8))
conf.set("spark.serializer",   "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.streaming.stopGracefullyOnShutdown", "true")
conf.set("spark.streaming.backpressure.enabled","true")
conf.set("spark.speculation","true")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(args(7).toInt))
val bigPipeStreams = (1 to args(3).toInt).map{
    i => ssc.networkStream(
    new MyBigpipeLogagentReceiver(args(0),args(1),args(2),i,args(4),args(5),args(6).toInt)
)
}
val lines = ssc.union(bigPipeStreams)   
def deserializePbData(value: String) : String = {

if (null == value || value.isEmpty) {
    return ""
}
var cuid = ""
var os = ""
var channel = ""
var sv = ""
var resid = ""
var appid = ""
var prod = ""
try { //if exception,useless data,just drop it
    val timeStrIndex = value.indexOf(",\"time_str\"")
    var strAfterTruncation = ""
    if (-1 != timeStrIndex) {
        strAfterTruncation = value.substring(0,timeStrIndex) + "}"
    } else {
        strAfterTruncation = value
    }
    val jsonData = JSONObject.fromObject(strAfterTruncation)
    //val jsonData = value.getAsJsonArray()
    val binBody = jsonData.getString("bin_body")
    val pbData = binBody.substring(1,binBody.length()-1).split(",").foldLeft(ArrayBuffer.empty[Byte])((b,a) => b +java.lang.Byte.parseByte(a)).drop(8).toArray
    Lighttpd.lighttpd_log.parseFrom(pbData).getRequest().getUrl().getUrlFields().getAutokvList().asScala.foreach(a => 
        a.getKey() match {
            case "cuid" => cuid += a.getValue()
            case "os" => os += a.getValue()
            case "channel" => channel += a.getValue()
            case "sv" => sv += a.getValue()
            case "resid" => resid += a.getValue()
            case "appid" => appid += a.getValue()
            case "prod" => prod += a.getValue()
            case _ => null
        }
    )
    val decodeCuid = URLDecoder.decode(cuid, "UTF-8")
    os = os.toLowerCase()
    if (os.matches("android(.*)")) {
        os = "android"
    } else if (os.matches("iphone(.*)")) {
        os = "iphone"
    } else if (os.matches("ipad(.*)")) {
        os = "ipad"
    } else if (os.matches("s60(.*)")) {
        os = "symbian"
    } else if (os.matches("wp7(.*)")) {
        os = "wp7"
    } else if (os.matches("wp(.*)")) {
        os = "wp"
    } else if (os.matches("tizen(.*)")) {
        os = "tizen"

    val ifHasLogid = Lighttpd.lighttpd_log.parseFrom(pbData).hasLogid()
    val time = Lighttpd.lighttpd_log.parseFrom(pbData).getTime()
    if (ifHasLogid) {
        val logid = Lighttpd.lighttpd_log.parseFrom(pbData).getLogid()
        if (logid.isEmpty || logid.toString().equals("-") || !resid.toString().equals("01") || channel.isEmpty |!appid.isEmpty || !prod.isEmpty) {
            ""
        } else {
            decodeCuid + "\001" + os + "\001" + channel + "\001" + sv + "\001" + "1" + "\001" + "1" + "\001" + time + "\n"
        }
    } else {
        ""
    }
} catch {
    case _:Throwable => ""
}
}
lines.map(parseData).print()

错误文本:

代码语言:javascript
复制
016-07-12T12:00:01.546+0800: 5096.643: [GC (Allocation Failure) 
Desired survivor size 442499072 bytes, new threshold 1 (max 15)
[PSYoungGen: 0K->0K(2356736K)] 5059009K->5059009K(7949312K), 0.0103342 secs] [Times: user=0.21 sys=0.00, real=0.01 secs] 
2016-07-12T12:00:01.556+0800: 5096.654: [Full GC (Allocation Failure) [PSYoungGen: 0K->0K(2356736K)] [ParOldGen:    5059009K->5057376K(5592576K)] 5059009K->5057376K(7949312K), [Metaspace: 44836K->44490K(1089536K)], 0.8769617 secs] [Times: user=17.88   sys=0.04, real=0.88 secs] 
2016-07-12T12:00:02.434+0800: 5097.531: Total time for which application threads were stopped: 1.2951974 seconds, Stopping threads  took: 0.0000662 seconds
java.lang.OutOfMemoryError: Java heap space
Dumping heap to java_pid24310.hprof ...
2016-07-12T12:00:30.960+0800: 5126.057: Total time for which application threads were stopped: 28.5260812 seconds, Stopping threads     took: 0.0000995 seconds
Heap dump file created [5211252802 bytes in 28.526 secs]
#
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill %p"
#   Executing /bin/sh -c "kill 24310"...
2016-07-12T12:00:31.589+0800: 5126.686: Total time for which application threads were stopped: 0.6289627 seconds, Stopping threads  took: 0.0001258 seconds
2016-07-12T12:00:31.595+0800: 5126.692: Total time for which application threads were stopped: 0.0004822 seconds, Stopping threads  took: 0.0001493 seconds
2016-07-12 12:00:31.597 [Thread-5] ERROR [Logging.scala:95] - Uncaught exception in thread Thread[Thread-5,5,main]
java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:3236) ~[na:1.8.0_51]
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118) ~[na:1.8.0_51]
    at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) ~[na:1.8.0_51]
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) ~[na:1.8.0_51]
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) ~[na:1.8.0_51]
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) ~[na:1.8.0_51]
    at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) ~[    spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at com.esotericsoftware.kryo.io.Output.require(Output.java:135) ~[  spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at com.esotericsoftware.kryo.io.Output.writeString_slow(Output.java:420) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at com.esotericsoftware.kryo.io.Output.writeString(Output.java:326) ~[  spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:153) ~[  spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:146) ~[  spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:194) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153) ~[    spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1196) ~[    spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1202) ~[  spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:858) ~[   spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:645) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:77) ~[   spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:157) ~[   spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:128) ~[  spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl$$anon$3.onPushBlock(ReceiverSupervisorImpl.scala:109) ~[  spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:296) ~[    spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at org.apache.spark.streaming.receiver.BlockGenerator.org$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(    BlockGenerator.scala:268) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:109) ~[  spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
2016-07-12 12:00:31.600 [SIGTERM handler] ERROR [SignalLogger.scala:57] - RECEIVED SIGNAL 15: SIGTERM
2016-07-12T12:00:31.611+0800: 5126.708: Total time for which application threads were stopped: 0.0005602 seconds, Stopping threads  took: 0.0001765 seconds
2016-07-12T12:00:31.617+0800: 5126.714: Total time for which application threads were stopped: 0.0004800 seconds, Stopping threads  took: 0.0001412 seconds
2016-07-12 12:00:32.483 [Bigpipe Receiver-SendThread(cq01-bigpipe-proxy01.cq01.baidu.com:2181)] WARN  [ClientCnxnSocket.java:139] -     Connected to an old server; r-o mode will be unavailable
2016-07-12T12:00:32.507+0800: 5127.604: Total time for which application threads were stopped: 0.0004604 seconds, Stopping threads  took: 0.0001198 seconds
2016-07-12T12:00:32.509+0800: 5127.606: Total time for which application threads were stopped: 0.0002919 seconds, Stopping threads  took: 0.0001800 seconds
2016-07-12T12:00:32.509+0800: 5127.607: Total time for which application threads were stopped: 0.0002692 seconds, Stopping threads  took: 0.0001612 seconds
2016-07-12 12:00:32.549 [Bigpipe Receiver-SendThread(tc-bigpipe-proxy03.tc.baidu.com:2181)] WARN  [ClientCnxnSocket.java:139] -     Connected to an old server; r-o mode will be unavailable
2016-07-12T12:00:34.220+0800: 5129.317: [GC (Allocation Failure) 
Desired survivor size 424148992 bytes, new threshold 2 (max 15)
[PSYoungGen: 1931776K->188775K(2363904K)] 6989152K->5246152K(7956480K), 0.2569385 secs] [Times: user=0.00 sys=5.19, real=0.26 secs] 
2016-07-12T12:00:34.477+0800: 5129.575: Total time for which application threads were stopped: 0.2575019 seconds, Stopping threads  took: 0.0000384 seconds
2016-07-12T12:00:35.478+0800: 5130.575: Total time for which application threads were stopped: 0.0002786 seconds, Stopping threads  took: 0.0000424 seconds
2016-07-12T12:00:37.600+0800: 5132.697: [GC (Allocation Failure) 
Desired survivor size 482344960 bytes, new threshold 3 (max 15)
[PSYoungGen: 2120551K->387013K(2268160K)] 7177928K->5444389K(7860736K), 0.5153031 secs] [Times: user=0.00 sys=9.89, real=0.52 secs] 
2016-07-12T12:00:38.116+0800: 5133.213: Total time for which application threads were stopped: 0.5157529 seconds, Stopping threads  took: 0.0000427 seconds
2016-07-12T12:00:40.116+0800: 5135.213: Total time for which application threads were stopped: 0.0003171 seconds, Stopping threads  took: 0.0001000 seconds
2016-07-12T12:00:40.419+0800: 5135.516: [GC (Allocation Failure) 
Desired survivor size 599785472 bytes, new threshold 2 (max 15)
[PSYoungGen: 2240965K->471033K(2324992K)] 7298341K->5633517K(7917568K), 0.3621433 secs] [Times: user=0.12 sys=7.11, real=0.36 secs] 
2016-07-12T12:00:40.781+0800: 5135.878: Total time for which application threads were stopped: 0.3626080 seconds, Stopping threads  took: 0.0000429 seconds
2016-07-12T12:00:41.781+0800: 5136.879: Total time for which application threads were stopped: 0.0003301 seconds, Stopping threads  took: 0.0000947 seconds
2016-07-12T12:00:43.108+0800: 5138.205: [GC (Allocation Failure) 
Desired survivor size 620756992 bytes, new threshold 3 (max 15)
[PSYoungGen: 2324985K->378481K(2054656K)] 7487469K->5831048K(7647232K), 0.2593685 secs] [Times: user=0.66 sys=4.96, real=0.26 secs] 
2016-07-12T12:00:43.368+0800: 5138.465: [Full GC (Ergonomics) [PSYoungGen: 378481K->0K(2054656K)] [ParOldGen:   5452566K->4713601K(5592576K)] 5831048K->4713601K(7647232K), [Metaspace: 44635K->44635K(1089536K)], 4.3137405 secs] [Times: user=9.78    sys=74.53, real=4.31 secs] 
2016-07-12T12:00:47.682+0800: 5142.779: Total time for which application threads were stopped: 4.5736603 seconds, Stopping threads  took: 0.0000449 seconds
2016-07-12T12:00:47.682+0800: 5142.779: Total time for which application threads were stopped: 0.0002430 seconds, Stopping threads  took: 0.0000856 seconds
2016-07-12T12:00:49.954+0800: 5145.052: [GC (Allocation Failure) 
Desired survivor size 597688320 bytes, new threshold 4 (max 15)
[PSYoungGen: 1583616K->161266K(2189824K)] 6297217K->4874867K(7782400K), 0.0388138 secs] [Times: user=0.00 sys=0.84, real=0.04 secs] 
2016-07-12T12:00:49.993+0800: 5145.091: Total time for which application threads were stopped: 0.0392926 seconds, Stopping threads  took: 0.0000449 seconds
2016-07-12T12:00:51.903+0800: 5147.000: [GC (Allocation Failure) 
Desired survivor size 596115456 bytes, new threshold 5 (max 15)
[PSYoungGen: 1744882K->324587K(2213888K)] 6458483K->5038189K(7806464K), 0.0334029 secs] [Times: user=0.69 sys=0.03, real=0.04 secs] 
2016-07-12T12:00:51.936+0800: 5147.034: Total time for which application threads were stopped: 0.0338707 seconds, Stopping threads  took: 0.0000404 seconds
2016-07-12T12:00:53.942+0800: 5149.039: [GC (Allocation Failure) 
Desired survivor size 654835712 bytes, new threshold 6 (max 15)
[PSYoungGen: 1954795K->490438K(2120704K)] 6668397K->5204039K(7713280K), 0.0441762 secs] [Times: user=0.95 sys=0.02, real=0.05 secs] 
2016-07-12T12:00:53.986+0800: 5149.083: Total time for which application threads were stopped: 0.0446174 seconds, Stopping threads  took: 0.0000456 seconds
2016-07-12T12:00:56.102+0800: 5151.199: [GC (Allocation Failure) 
Desired survivor size 763887616 bytes, new threshold 5 (max 15)
[PSYoungGen: 2120646K->639467K(1943552K)] 6834247K->5370280K(7536128K), 0.1124828 secs] [Times: user=1.07 sys=1.30, real=0.11 secs] 
2016-07-12T12:00:56.214+0800: 5151.312: Total time for which application threads were stopped: 0.1129348 seconds, Stopping threads  took: 0.0000396 seconds
2016-07-12T12:00:57.784+0800: 5152.881: [GC (Allocation Failure) 
Desired survivor size 895483904 bytes, new threshold 4 (max 15)
[PSYoungGen: 1943531K->745977K(2050048K)] 6674344K->5504073K(7642624K), 0.0971717 secs] [Times: user=1.20 sys=0.67, real=0.10 secs] 
2016-07-12T12:00:57.881+0800: 5152.979: Total time for which application threads were stopped: 0.0977363 seconds, Stopping threads  took: 0.0000941 seconds
2016-07-12T12:00:59.406+0800: 5154.504: [GC (Allocation Failure) 
Desired survivor size 935329792 bytes, new threshold 5 (max 15)
[PSYoungGen: 2050041K->599188K(1715200K)] 6808137K->5647517K(7307776K), 0.3651465 secs] [Times: user=0.98 sys=5.88, real=0.37 secs] 
2016-07-12T12:00:59.772+0800: 5154.869: Total time for which application threads were stopped: 0.3656089 seconds, Stopping threads  took: 0.0000479 seconds
2016-07-12T12:01:00.968+0800: 5156.066: [GC (Allocation Failure) 
Desired survivor size 954204160 bytes, new threshold 4 (max 15)
[PSYoungGen: 1568404K->697830K(1667072K)] 6616733K->5746159K(7259648K), 0.0978955 secs] [Times: user=1.91 sys=0.04, real=0.09 secs] 
2016-07-12T12:01:01.066+0800: 5156.164: Total time for which application threads were stopped: 0.0983759 seconds, Stopping threads  took: 0.0000482 seconds
2016-07-12T12:01:02.189+0800: 5157.287: [GC (Allocation Failure) 
Desired survivor size 954204160 bytes, new threshold 3 (max 15)
[PSYoungGen: 1667046K->465454K(1864192K)] 6715375K->5855655K(7456768K), 0.1261993 secs] [Times: user=2.41 sys=0.29, real=0.12 secs] 
2016-07-12T12:01:02.316+0800: 5157.413: [Full GC (Ergonomics) [PSYoungGen: 465454K->65236K(1864192K)] [ParOldGen:   5390200K->5592328K(5592576K)] 5855655K->5657564K(7456768K), [Metaspace: 44635K->44635K(1089536K)], 3.2729437 secs] [Times: user=12.34   sys=57.11, real=3.28 secs] 
2016-07-12T12:01:05.589+0800: 5160.686: Total time for which application threads were stopped: 3.3998619 seconds, Stopping threads  took: 0.0000521 seconds
2016-07-12T12:01:05.589+0800: 5160.686: Total time for which application threads were stopped: 0.0002330 seconds, Stopping threads  took: 0.0000949 seconds
2016-07-12T12:01:05.688+0800: 5160.785: Total time for which application threads were stopped: 0.0002935 seconds, Stopping threads  took: 0.0000514 seconds
Heap
 PSYoungGen      total 1864192K, used 146620K [0x0000000715580000, 0x00000007c0000000, 0x00000007c0000000)
  eden space 932352K, 8% used [0x0000000715580000,0x000000071a4fa138,0x000000074e400000)
  from space 931840K, 7% used [0x0000000787200000,0x000000078b1b5290,0x00000007c0000000)
  to   space 931840K, 0% used [0x000000074e400000,0x000000074e400000,0x0000000787200000)
 ParOldGen       total 5592576K, used 5592328K [0x00000005c0000000, 0x0000000715580000, 0x0000000715580000)
  object space 5592576K, 99% used [0x00000005c0000000,0x00000007155420a8,0x0000000715580000)
 Metaspace       used 44654K, capacity 44990K, committed 45864K, reserved 1089536K
  class space    used 6212K, capacity 6324K, committed 6440K, reserved 1048576K 

新错误:我想是上传错误引起了oom问题,我想知道如何修复这个上传错误?

代码语言:javascript
复制
    2016-07-15 11:41:47.307 [shuffle-client-0] ERROR [TransportChannelHandler.java:128] - Connection to     nmg01-taihang-d10207.nmg01.baidu.com/10.76.48.22:30456 has been quiet for 120000 ms while there are outstanding requests. Assuming  connection is dead; please adjust spark.network.timeout if this is wrong.
2016-07-15 11:41:47.309 [shuffle-client-0] ERROR [TransportResponseHandler.java:122] - Still have 1 requests outstanding when   connection from nmg01-taihang-d10207.nmg01.baidu.com/10.76.48.22:30456 is closed
2016-07-15 11:41:47.314 [shuffle-client-0] ERROR [Logging.scala:95] - Error while uploading block input-0-1468553896200
java.io.IOException: Connection from nmg01-taihang-d10207.nmg01.baidu.com/10.76.48.22:30456 closed
at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_51]
2016-07-15T11:41:47.316+0800: 2176.487: Total time for which application threads were stopped: 0.0002632 seconds, Stopping threads  took: 0.0000521 seconds
2016-07-15 11:41:47.316 [Thread-5] WARN  [Logging.scala:91] - Failed to replicate input-0-1468553896200 to BlockManagerId(2,    nmg01-taihang-d10207.nmg01.baidu.com, 30456), failure #0
java.io.IOException: Connection from nmg01-taihang-d10207.nmg01.baidu.com/10.76.48.22:30456 closed
at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_51]
2016-07-15T11:41:48.316+0800: 2177.487: Total time for which application threads were stopped: 0.0003391 seconds, Stopping threads  took: 0.0000979 seconds
2016-07-15T11:41:51.312+0800: 2180.483: [GC (Allocation Failure) --[PSYoungGen: 2894863K->2894863K(3007488K)]   8299519K->9550273K(9998336K), 0.7462118 secs] [Times: user=9.78 sys=0.02, real=0.74 secs] 
2016-07-15T11:41:52.059+0800: 2181.230: [Full GC (Ergonomics) [PSYoungGen: 2894863K->0K(3007488K)] [ParOldGen:  6655410K->6895736K(6990848K)] 9550273K->6895736K(9998336K), [Metaspace: 44409K->44409K(1087488K)], 0.4061892 secs] [Times: user=7.50    sys=0.01, real=0.41 secs] 
EN

回答 1

Stack Overflow用户

发布于 2016-07-14 14:54:16

您的代码在结构上似乎有错误。在查看代码的过程中(重新缩进代码以反映发布的结构),我发现您的最后一个else if语句:

代码语言:javascript
复制
} else if (os.matches("tizen(.*)")) {
    os = "tizen"

打开一个块,但不关闭它“应该”所在的块。相反,该块实际上以以下方式结束:

代码语言:javascript
复制
} catch {

所显示的完整代码(以及重新缩进)如下:

代码语言:javascript
复制
val conf = new SparkConf().setAppName(args(8))
conf.set("spark.serializer",   "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.streaming.stopGracefullyOnShutdown", "true")
conf.set("spark.streaming.backpressure.enabled","true")
conf.set("spark.speculation","true")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(args(7).toInt))
val bigPipeStreams = (1 to args(3).toInt).map{
    i => ssc.networkStream(
        new MyBigpipeLogagentReceiver(args(0),args(1),args(2),i,args(4),args(5),args(6).toInt)
    )
}
val lines = ssc.union(bigPipeStreams)   
def deserializePbData(value: String) : String = {

    if (null == value || value.isEmpty) {
        return ""
    }
    var cuid = ""
    var os = ""
    var channel = ""
    var sv = ""
    var resid = ""
    var appid = ""
    var prod = ""
    try { //if exception,useless data,just drop it
        val timeStrIndex = value.indexOf(",\"time_str\"")
        var strAfterTruncation = ""
        if (-1 != timeStrIndex) {
            strAfterTruncation = value.substring(0,timeStrIndex) + "}"
        } else {
            strAfterTruncation = value
        }
        val jsonData = JSONObject.fromObject(strAfterTruncation)
        //val jsonData = value.getAsJsonArray()
        val binBody = jsonData.getString("bin_body")
        val pbData = binBody.substring(1,binBody.length()-1).split(",").foldLeft(ArrayBuffer.empty[Byte])((b,a) => b +java.lang.Byte.parseByte(a)).drop(8).toArray
        Lighttpd.lighttpd_log.parseFrom(pbData).getRequest().getUrl().getUrlFields().getAutokvList().asScala.foreach(a => 
            a.getKey() match {
                case "cuid" => cuid += a.getValue()
                case "os" => os += a.getValue()
                case "channel" => channel += a.getValue()
                case "sv" => sv += a.getValue()
                case "resid" => resid += a.getValue()
                case "appid" => appid += a.getValue()
                case "prod" => prod += a.getValue()
                case _ => null
            }
        )
        val decodeCuid = URLDecoder.decode(cuid, "UTF-8")
        os = os.toLowerCase()
        if (os.matches("android(.*)")) {
            os = "android"
        } else if (os.matches("iphone(.*)")) {
            os = "iphone"
        } else if (os.matches("ipad(.*)")) {
            os = "ipad"
        } else if (os.matches("s60(.*)")) {
            os = "symbian"
        } else if (os.matches("wp7(.*)")) {
            os = "wp7"
        } else if (os.matches("wp(.*)")) {
            os = "wp"
        } else if (os.matches("tizen(.*)")) {
            os = "tizen"
        }

        val ifHasLogid = Lighttpd.lighttpd_log.parseFrom(pbData).hasLogid()
        val time = Lighttpd.lighttpd_log.parseFrom(pbData).getTime()
        if (ifHasLogid) {
            val logid = Lighttpd.lighttpd_log.parseFrom(pbData).getLogid()
            if (logid.isEmpty || logid.toString().equals("-") || !resid.toString().equals("01") || channel.isEmpty |!appid.isEmpty || !prod.isEmpty) {
                ""
            } else {
                decodeCuid + "\001" + os + "\001" + channel + "\001" + sv + "\001" + "1" + "\001" + "1" + "\001" + time + "\n"
            }
        } else {
            ""
        }
    } catch {
        case _:Throwable => ""
    }
}
lines.map(parseData).print()

我还没有检查您的代码的功能。这只是一个语法/结构问题,当简单地查看您发布的代码时,这个问题非常突出。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/38367706

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档