首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >执行火花流时的"java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext“

执行火花流时的"java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext“
EN

Stack Overflow用户
提问于 2015-05-07 03:24:15
回答 1查看 3.7K关注 0票数 4

当我在纱线上执行火花流应用程序时,我继续收到以下错误

为什么会发生错误,如何解决?任何建议都会有帮助的,谢谢~

代码语言:javascript
复制
15/05/07 11:11:50 INFO dstream.StateDStream: Marking RDD 2364 for time 1430968310000 ms for checkpointing
    15/05/07 11:11:50 INFO scheduler.JobScheduler: Added jobs for time 1430968310000 ms
    15/05/07 11:11:50 INFO scheduler.JobGenerator: Checkpointing graph for time 1430968310000 ms
    15/05/07 11:11:50 INFO streaming.DStreamGraph: Updating checkpoint data for time 1430968310000 ms
    15/05/07 11:11:50 INFO streaming.DStreamGraph: Updated checkpoint data for time 1430968310000 ms
    15/05/07 11:11:50 ERROR actor.OneForOneStrategy: org.apache.spark.streaming.StreamingContext
    java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
            at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
            at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
            at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
            at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
            at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
            at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
            at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
            at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
            at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

星星之流应用程序代码如下所示,我将在火花壳中执行它。

代码语言:javascript
复制
    import kafka.cluster.Cluster
import kafka.serializer.StringDecoder
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Duration, StreamingContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext._

val updateFunc = (values: Seq[Int], state: Option[Int]) => {
  Some(0)
}

val ssc = new StreamingContext(sc,
  new Duration(5000))
ssc.checkpoint(".")

val lines = KafkaUtils.createStream(ssc, "10.1.10.21:2181", "kafka_spark_streaming", Map("hello_test" -> 3))

val uuidDstream = lines.transform(rdd => rdd.map(_._2)).map(x => (x, 1)).updateStateByKey[Int](updateFunc)
uuidDstream.count().print()

ssc.start()
ssc.awaitTermination()
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2015-05-07 07:45:09

val updateFunc闭包中使用的对updateStateByKey的引用是将该实例的其余部分拖到闭包中,并将StreamingContext与其一起使用。

有两种选择:

  • 快速修复:声明流上下文瞬态=> @transient val ssc= ...也是注释dstream声明以及@transient的好主意。
  • 更好的解决方法:将函数放在单独的对象中。

如下所示:

代码语言:javascript
复制
case object TransformFunctions {
    val updateFunc = ???
}
票数 7
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/30091371

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档