想要将cassandra添加到spark streaming
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-streaming" % sparkVersion,
"org.apache.spark" %% "spark-streaming-kafka" % sparkVersion,
"com.datastax.cassandra" % "cassandra-driver-core" % "2.1.8",
"com.datastax.spark" %% "spark-cassandra-connector" % "1.5.0-M2")准备jar
assemblyMergeStrategy in assembly := {
case PathList("com", "esotericsoftware", xs@_*) => MergeStrategy.last
case PathList("com", "google", xs@_*) => MergeStrategy.first
case PathList("org", "apache", xs@_*) => MergeStrategy.last
case PathList("io", "netty", xs@_*) => MergeStrategy.last
case PathList("com", "codahale", xs@_*) => MergeStrategy.last
case PathList("io.netty", "netty-all", xs@_*) => MergeStrategy.last
case PathList("META-INF", "io.netty.versions.properties") => MergeStrategy.first
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
}代码
val sparkConf = new SparkConf(true)
.setMaster("local[2]")
.setAppName(getClass.getSimpleName)
.set("spark.executor.memory", "1g")
.set("spark.cores.max", "1")
.set("spark.cassandra.connection.host", "127.0.0.1")
val ssc = new StreamingContext(sparkConf, Seconds(2))
/** Creates the keyspace and table in Cassandra. */
CassandraConnector(sparkConf).withSessionDo { session =>
session.execute(s"DROP KEYSPACE IF EXISTS kafka_streaming")
session.execute(s"CREATE KEYSPACE IF NOT EXISTS kafka_streaming WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }")
session.execute(s"CREATE TABLE IF NOT EXISTS kafka_streaming.wordcount (word TEXT PRIMARY KEY, count COUNTER)")
session.execute(s"TRUNCATE kafka_streaming.wordcount")
}获取异常:
Exception in thread "main" java.lang.NoSuchMethodError: com.google.common.reflect.TypeToken.isPrimitive()Z
at com.datastax.driver.core.TypeCodec.<init>(TypeCodec.java:142)
at com.datastax.driver.core.TypeCodec.<init>(TypeCodec.java:136)
at com.datastax.driver.core.TypeCodec$BlobCodec.<init>(TypeCodec.java:609)
at com.datastax.driver.core.TypeCodec$BlobCodec.<clinit>(TypeCodec.java:606)
at com.datastax.driver.core.CodecRegistry.<clinit>(CodecRegistry.java:147)
at com.datastax.driver.core.Configuration$Builder.build(Configuration.java:259)
at com.datastax.driver.core.Cluster$Builder.getConfiguration(Cluster.java:1135)
at com.datastax.driver.core.Cluster.<init>(Cluster.java:111)
at com.datastax.driver.core.Cluster.buildFrom(Cluster.java:178)
at com.datastax.driver.core.Cluster$Builder.build(Cluster.java:1152)
at com.datastax.spark.connector.cql.DefaultConnectionFactory$.createCluster(CassandraConnectionFactory.scala:85)
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:155)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:150)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:150)
at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31)
at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56)
at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:81)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109)
at com.lowcosttravelgroup.PostCostsStreamingApp$.cassandraTest(PostCostsStreamingApp.scala:71)
at com.lowcosttravelgroup.PostCostsStreamingApp$.main(PostCostsStreamingApp.scala:46)
at com.lowcosttravelgroup.PostCostsStreamingApp.main(PostCostsStreamingApp.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/metrics/json,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage/kill,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/api,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/static,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors/threadDump/json,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors/threadDump,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors/json,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/environment/json,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/environment,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/rdd/json,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/rdd,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/json,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/pool/json,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/pool,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage/json,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/json,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/jobs/job/json,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/jobs/job,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/jobs/json,null}
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/jobs,null}如果使用cassandra排除操作,spark将启动,没有任何异常
有什么想法吗?有什么作品吗? github示例spark streaming + cassandra?
BR!
发布于 2015-11-07 01:22:57
NoSuchMethodError通常表示scala版本不匹配。例如,如果Spark是使用scala 2.10构建的,则将连接器依赖替换为
"com.datastax.spark" % "spark-cassandra-connector-java_2.10" % "1.5.0-M2"您可以找到其他版本的Here。
发布于 2017-02-10 14:31:56
您应该使用ur spark-submit或spark-shell传递包参数,以便下载spark cassandra连接器
--packages datastax:spark-cassandra-connector:1.5.0-M2-s_2.10
请传递正确的连接器版本,以便代码正常工作,并在sbt文件"com.datastax.spark“%% "spark-cassandra- connector”% "1.5.0-M2“%中提供依赖项后编写
希望这能有所帮助
https://stackoverflow.com/questions/33571358
复制相似问题