首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >无法执行我的SparkStreaming程序

无法执行我的SparkStreaming程序
EN

Stack Overflow用户
提问于 2014-12-21 18:22:04
回答 2查看 722关注 0票数 0

我编写了以下Scala代码,我的平台是CentOS 6.5上的ClouderaCDH5.2.1

Tutorial.scala

代码语言:javascript
复制
import org.apache.spark
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.StreamingContext._
import TutorialHelper._

object Tutorial {
    def main(args: Array[String]) {
        val checkpointDir = TutorialHelper.getCheckPointDirectory()
        val consumerKey = "..."
        val consumerSecret = "..."
        val accessToken = "..."
        val accessTokenSecret = "..."
        try {
            TutorialHelper.configureTwitterCredentials(consumerKey, consumerSecret, accessToken, accessTokenSecret)
            val ssc = new StreamingContext(new SparkContext(), Seconds(1))
            val tweets = TwitterUtils.createStream(ssc, None)
            val tweetText = tweets.map(tweet => tweet.getText())
            tweetText.print()
            ssc.checkpoint(checkpointDir)
            ssc.start()
            ssc.awaitTermination()
        } finally {
            //ssc.stop()
        }
    }
}

我的build.sbt文件看起来像

代码语言:javascript
复制
import AssemblyKeys._ // put this at the top of the file

name := "Tutorial"

scalaVersion := "2.10.3"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-streaming" % "1.0.0" % "provided",
  "org.apache.spark" %% "spark-streaming-twitter" % "1.0.0"
)

resolvers += "Akka Repository" at "http://repo.akka.io/releases/"

resourceDirectory in Compile := baseDirectory.value / "resources"

assemblySettings

mergeStrategy in assembly := {
  case m if m.toLowerCase.endsWith("manifest.mf")          => MergeStrategy.discard
  case m if m.toLowerCase.matches("meta-inf.*\\.sf$")      => MergeStrategy.discard
  case "log4j.properties"                                  => MergeStrategy.discard
  case m if m.toLowerCase.startsWith("meta-inf/services/") => MergeStrategy.filterDistinctLines
  case "reference.conf"                                    => MergeStrategy.concat
  case _                                                   => MergeStrategy.first
}

我还创建了一个名为projects/plugin.sbt的文件,其中包含以下内容

代码语言:javascript
复制
addSbtPlugin("net.virtual-void" % "sbt-cross-building" % "0.8.1")
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.9.1")

和项目/建筑。

代码语言:javascript
复制
import sbt._

object Plugins extends Build {
  lazy val root = Project("root", file(".")) dependsOn(
    uri("git://github.com/sbt/sbt-assembly.git#0.9.1")
  )
}

在此之后,我可以使用

代码语言:javascript
复制
sbt assembly

现在我运行我的代码使用

代码语言:javascript
复制
sudo -u hdfs spark-submit --class Tutorial --master local /tmp/Tutorial-assembly-0.1-SNAPSHOT.jar

我知道错误了

代码语言:javascript
复制
Configuring Twitter OAuth
        Property twitter4j.oauth.accessToken set as [...]
        Property twitter4j.oauth.consumerSecret set as [...]
        Property twitter4j.oauth.accessTokenSecret set as [...]
        Property twitter4j.oauth.consumerKey set as [...]

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/jars/spark-assembly-1.1.0-cdh5.2.1-hadoop2.5.0-cdh5.2.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
14/12/21 16:04:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-------------------------------------------
Time: 1419199472000 ms
-------------------------------------------

-------------------------------------------
Time: 1419199473000 ms
-------------------------------------------

14/12/21 16:04:33 ERROR ReceiverSupervisorImpl: Error stopping receiver 0org.apache.spark.Logging$class.log(Logging.scala:52)
org.apache.spark.streaming.twitter.TwitterReceiver.log(TwitterInputDStream.scala:60)
org.apache.spark.Logging$class.logInfo(Logging.scala:59)
org.apache.spark.streaming.twitter.TwitterReceiver.logInfo(TwitterInputDStream.scala:60)
org.apache.spark.streaming.twitter.TwitterReceiver.onStop(TwitterInputDStream.scala:101)
org.apache.spark.streaming.receiver.ReceiverSupervisor.stopReceiver(ReceiverSupervisor.scala:136)
org.apache.spark.streaming.receiver.ReceiverSupervisor.stop(ReceiverSupervisor.scala:112)
org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:127)
org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
EN

回答 2

Stack Overflow用户

发布于 2014-12-21 20:32:15

您需要使用sbt程序集插件来准备具有所有依赖项的“组装”jar文件。它应该包含所有的twitter util类。

链接:

  1. https://github.com/sbt/sbt-assembly
  2. http://prabstechblog.blogspot.com/2014/04/creating-single-jar-for-spark-project.html
  3. http://eugenezhulenev.com/blog/2014/10/18/run-tests-in-standalone-spark-cluster/

或者您可以看看我的Spark项目,它已经配置了sbt组装插件:http://eugenezhulenev.com/blog/2014/11/20/twitter-analytics-with-spark/

票数 1
EN

Stack Overflow用户

发布于 2015-02-10 06:54:40

CDH5.2软件包Spark1.1.0,但build.sbt使用的是1.0.0。更新下面的版本并重新构建可以解决您的问题。

代码语言:javascript
复制
 libraryDependencies ++= Seq(
   "org.apache.spark" %% "spark-streaming" % "1.0.0" % "provided",
   "org.apache.spark" %% "spark-streaming-twitter" % "1.0.0"
 )
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/27592363

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档