首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >不能将kafka.cluster.BrokerEndPoint转换为kafka.cluster.Broker

不能将kafka.cluster.BrokerEndPoint转换为kafka.cluster.Broker
EN

Stack Overflow用户
提问于 2016-07-09 15:52:11
回答 1查看 4.7K关注 0票数 1

我试着把数据从卡夫卡流出来

我用的是火花1.6.2和kafka 0.9.0.1和scala 2.11.8

当我使用基于接收器的方法(KafkaUtils.createStream())时,一切都很好,但是当我尝试这种没有接收器的直接方法时,一切都很好。

代码语言:javascript
复制
val kafkaStreams = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
  ssc,
  Map("group.id" -> "blah",
    "auto.offset.reset" -> "smallest",
    "metadata.broker.list" -> "127.0.0.1:9092",
    "bootstrap.servers"-> "127.0.0.1:9092"),
  Set("tweets")
  )

我知道这个错误

代码语言:javascript
复制
Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:90)
at scala.Option.map(Option.scala:146)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:90)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:87)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:87)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:86)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:94)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:86)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:85)
at scala.util.Either$RightProjection.flatMap(Either.scala:522)
at org.apache.spark.streaming.kafka.KafkaCluster.findLeaders(KafkaCluster.scala:85)
at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:179)
at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:161)
at org.apache.spark.streaming.kafka.KafkaCluster.getEarliestLeaderOffsets(KafkaCluster.scala:155)
at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:213)
at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:211)
at scala.util.Either$RightProjection.flatMap(Either.scala:522)
at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at SparkStreaming$.delayedEndpoint$SparkStreaming$1(SparkStreaming.scala:32)
at SparkStreaming$delayedInit$body.apply(SparkStreaming.scala:24)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at SparkStreaming$.main(SparkStreaming.scala:24)
at SparkStreaming.main(SparkStreaming.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)

这些是我的依赖

代码语言:javascript
复制
"org.apache.spark" %% "spark-streaming-kafka" % "1.6.2",
"org.apache.spark" %% "spark-core" % "1.6.2",
"org.apache.spark" % "spark-streaming_2.11" % "1.6.2",
"org.apache.kafka" %% "kafka" % "0.9.0.1"

我不知道问题出在哪里?有人能帮我吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2016-07-09 17:43:49

根据星火流文档这里,SparkStreaming1.6.2与Kakfa 0.8.2.1兼容。

Kafka :火花流1.6.2与Kafka 0.8.2.1兼容

因此,要解决您的问题,请使用版本为0.8.2.1的kafka库,而不是0.9.0.1。

希望这能有所帮助!

票数 6
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/38283598

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档