我刚刚从星火流开始,我正在尝试构建一个示例应用程序来统计Kafka流中的单词。虽然它是用sbt package编译的,但当我运行它时,我得到了NoClassDefFoundError。这个post似乎也有同样的问题,但是解决方案是针对Maven的,而我还不能用sbt来再现它。
KafkaApp.scala
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
object KafkaApp {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("kafkaApp").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(1))
val kafkaParams = Map(
"zookeeper.connect" -> "localhost:2181",
"zookeeper.connection.timeout.ms" -> "10000",
"group.id" -> "sparkGroup"
)
val topics = Map(
"test" -> 1
)
// stream of (topic, ImpressionLog)
val messages = KafkaUtils.createStream(ssc, kafkaParams, topics, storage.StorageLevel.MEMORY_AND_DISK)
println(s"Number of words: %{messages.count()}")
}
}build.sbt
name := "Simple Project"
version := "1.1"
scalaVersion := "2.10.4"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.1.1",
"org.apache.spark" %% "spark-streaming" % "1.1.1",
"org.apache.spark" %% "spark-streaming-kafka" % "1.1.1"
)
resolvers += "Akka Repository" at "http://repo.akka.io/releases/"我提交这份报告时:
bin/spark-submit \
--class "KafkaApp" \
--master local[4] \
target/scala-2.10/simple-project_2.10-1.1.jar错误:
14/12/30 19:44:57 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@192.168.5.252:65077/user/HeartbeatReceiver
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils$
at KafkaApp$.main(KafkaApp.scala:28)
at KafkaApp.main(KafkaApp.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:329)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtils$
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)发布于 2014-12-31 02:09:05
火花提交不会自动放置包含KafkaUtils的包。你需要在你的项目罐子里。为此,您需要使用sbt assembly创建一个包含所有内容的uber-jar。下面是一个示例build.sbt。
https://github.com/tdas/spark-streaming-external-projects/blob/master/kafka/build.sbt
显然,还需要将程序集插件添加到SBT中。
https://github.com/tdas/spark-streaming-external-projects/tree/master/kafka/project
发布于 2015-10-19 05:03:48
在提交申请时,请尝试包括所有依赖jars:
提交-名称"SampleApp“-部署模式客户端-主火花://主机:7077-类com.stackexchange.SampleApp --jars com.stackexchange.SampleApp$KAFKA_INSTALL_DIR/libs/zkclient-0.3.jar火花-示例-1.0-SNAPSHOT.jar
发布于 2016-02-28 01:51:38
跟随build.sbt为我工作。它还要求您将sbt-assembly插件放在projects/目录下的文件中。
build.sbt
name := "NetworkStreaming" // https://github.com/sbt/sbt-assembly/blob/master/Migration.md#upgrading-with-bare-buildsbt
libraryDependencies ++= Seq(
"org.apache.spark" % "spark-streaming_2.10" % "1.4.1",
"org.apache.spark" % "spark-streaming-kafka_2.10" % "1.4.1", // kafka
"org.apache.hbase" % "hbase" % "0.92.1",
"org.apache.hadoop" % "hadoop-core" % "1.0.2",
"org.apache.spark" % "spark-mllib_2.10" % "1.3.0"
)
mergeStrategy in assembly := {
case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard
case "log4j.properties" => MergeStrategy.discard
case m if m.toLowerCase.startsWith("meta-inf/services/") => MergeStrategy.filterDistinctLines
case "reference.conf" => MergeStrategy.concat
case _ => MergeStrategy.first
}项目/plugins.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1")
https://stackoverflow.com/questions/27710887
复制相似问题