首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >spark streaming + cassandra

spark streaming + cassandra
EN

Stack Overflow用户
提问于 2015-11-07 00:30:46
回答 2查看 894关注 0票数 2

想要将cassandra添加到spark streaming

代码语言:javascript
复制
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-streaming" % sparkVersion,
  "org.apache.spark" %% "spark-streaming-kafka" % sparkVersion,
  "com.datastax.cassandra" % "cassandra-driver-core" % "2.1.8",
  "com.datastax.spark" %% "spark-cassandra-connector" % "1.5.0-M2")

准备jar

代码语言:javascript
复制
assemblyMergeStrategy in assembly := {
  case PathList("com", "esotericsoftware", xs@_*) => MergeStrategy.last
  case PathList("com", "google", xs@_*) => MergeStrategy.first
  case PathList("org", "apache", xs@_*) => MergeStrategy.last
  case PathList("io", "netty", xs@_*) => MergeStrategy.last
  case PathList("com", "codahale", xs@_*) => MergeStrategy.last
  case PathList("io.netty", "netty-all", xs@_*) => MergeStrategy.last
  case PathList("META-INF", "io.netty.versions.properties") => MergeStrategy.first

  case x =>
    val oldStrategy = (assemblyMergeStrategy in assembly).value
    oldStrategy(x)
}

代码

代码语言:javascript
复制
val sparkConf = new SparkConf(true)
      .setMaster("local[2]")
      .setAppName(getClass.getSimpleName)
      .set("spark.executor.memory", "1g")
      .set("spark.cores.max", "1")
      .set("spark.cassandra.connection.host", "127.0.0.1")

    val ssc = new StreamingContext(sparkConf, Seconds(2))

    /** Creates the keyspace and table in Cassandra. */
    CassandraConnector(sparkConf).withSessionDo { session =>
      session.execute(s"DROP KEYSPACE IF EXISTS kafka_streaming")
      session.execute(s"CREATE KEYSPACE IF NOT EXISTS kafka_streaming WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }")
      session.execute(s"CREATE TABLE IF NOT EXISTS kafka_streaming.wordcount (word TEXT PRIMARY KEY, count COUNTER)")
      session.execute(s"TRUNCATE kafka_streaming.wordcount")
    }

获取异常:

代码语言:javascript
复制
Exception in thread "main" java.lang.NoSuchMethodError: com.google.common.reflect.TypeToken.isPrimitive()Z
        at com.datastax.driver.core.TypeCodec.<init>(TypeCodec.java:142)
        at com.datastax.driver.core.TypeCodec.<init>(TypeCodec.java:136)
        at com.datastax.driver.core.TypeCodec$BlobCodec.<init>(TypeCodec.java:609)
        at com.datastax.driver.core.TypeCodec$BlobCodec.<clinit>(TypeCodec.java:606)
        at com.datastax.driver.core.CodecRegistry.<clinit>(CodecRegistry.java:147)
        at com.datastax.driver.core.Configuration$Builder.build(Configuration.java:259)
        at com.datastax.driver.core.Cluster$Builder.getConfiguration(Cluster.java:1135)
        at com.datastax.driver.core.Cluster.<init>(Cluster.java:111)
        at com.datastax.driver.core.Cluster.buildFrom(Cluster.java:178)
        at com.datastax.driver.core.Cluster$Builder.build(Cluster.java:1152)
        at com.datastax.spark.connector.cql.DefaultConnectionFactory$.createCluster(CassandraConnectionFactory.scala:85)
        at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:155)
        at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:150)
        at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:150)
        at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31)
        at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56)
        at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:81)
        at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109)
        at com.lowcosttravelgroup.PostCostsStreamingApp$.cassandraTest(PostCostsStreamingApp.scala:71)
        at com.lowcosttravelgroup.PostCostsStreamingApp$.main(PostCostsStreamingApp.scala:46)
        at com.lowcosttravelgroup.PostCostsStreamingApp.main(PostCostsStreamingApp.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/metrics/json,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage/kill,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/api,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/static,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors/threadDump/json,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors/threadDump,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors/json,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/environment/json,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/environment,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/rdd/json,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/rdd,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/json,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/pool/json,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/pool,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage/json,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/json,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/jobs/job/json,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/jobs/job,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/jobs/json,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/jobs,null}

如果使用cassandra排除操作,spark将启动,没有任何异常

有什么想法吗?有什么作品吗? github示例spark streaming + cassandra?

BR!

EN

回答 2

Stack Overflow用户

发布于 2015-11-07 01:22:57

NoSuchMethodError通常表示scala版本不匹配。例如,如果Spark是使用scala 2.10构建的,则将连接器依赖替换为

代码语言:javascript
复制
"com.datastax.spark" % "spark-cassandra-connector-java_2.10" % "1.5.0-M2"

您可以找到其他版本的Here

票数 0
EN

Stack Overflow用户

发布于 2017-02-10 14:31:56

您应该使用ur spark-submit或spark-shell传递包参数,以便下载spark cassandra连接器

--packages datastax:spark-cassandra-connector:1.5.0-M2-s_2.10

请传递正确的连接器版本,以便代码正常工作,并在sbt文件"com.datastax.spark“%% "spark-cassandra- connector”% "1.5.0-M2“%中提供依赖项后编写

希望这能有所帮助

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/33571358

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档