我对使用updateStateByKey()函数有问题。我有以下简单的代码(根据书编写:“学习火花-快速数据分析”):
object hello {
def updateStateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
Some(runningCount.getOrElse(0) + newValues.size)
}
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[5]").setAppName("AndrzejApp")
val ssc = new StreamingContext(conf, Seconds(4))
ssc.checkpoint("/")
val lines7 = ssc.socketTextStream("localhost", 9997)
val keyValueLine7 = lines7.map(line => (line.split(" ")(0), line.split(" ")(1).toInt))
val statefullStream = keyValueLine7.updateStateByKey(updateStateFunction _)
ssc.start()
ssc.awaitTermination()
}
}我的build.sbt是:
name := "stream-correlator-spark"
version := "1.0"
scalaVersion := "2.11.4"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.3.1" % "provided",
"org.apache.spark" %% "spark-streaming" % "1.3.1" % "provided"
)当我使用sbt assembly命令构建它时,一切都很好。当我在星火集群上以本地模式运行时,我得到了错误:
线程"main“java.lang.NoClassDefFoundError中的异常:org/apache/spark/DStream/DStream/DStream$ at hello$.main(helo.scala:25)
第25行是:
val statefullStream = keyValueLine7.updateStateByKey(updateStateFunction _)我觉得这可能是一些兼容性版本的问题,但我不知道原因是什么,以及如何解决这个问题。
我真的很感激你的帮助!
发布于 2015-05-25 12:37:58
当您在SBT中编写"provided"时,这意味着您的库是由环境提供的,不需要包含在包中。尝试从"provided"库中删除"spark-streaming"标记。
发布于 2015-05-26 07:42:29
当你需要将你的应用提交到星火集群运行时,你可以添加“提供的”返回。“提供”的好处是,结果fat jar将不包括来自所提供的依赖项的类,与没有“提供”相比,这将产生一个小得多的胖jar。在我的例子中,结果jar将在9000万左右没有“提供”,然后缩小到30+M与“提供”。
https://stackoverflow.com/questions/30436268
复制相似问题