首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >不能将kafka.cluster.BrokerEndPoint转换为kafka.cluster.Broker问题

不能将kafka.cluster.BrokerEndPoint转换为kafka.cluster.Broker问题
EN

Stack Overflow用户
提问于 2017-11-06 14:02:05
回答 1查看 721关注 0票数 0

我使用的是kafka2.11-0.11.0.1、Scala2.11和Spark2.2.0。我在eclipse的java构建路径中添加了以下jars:

代码语言:javascript
复制
kafka-streams-0.11.0.1,
kafka-tools-0.11.0.1,
spark-streaming_2.11-2.2.0,
spark-streaming-kafka_2.11-1.6.3,
spark-streaming-kafka-0-10_2.11-2.2.0,
kafka_2.11-0.11.0.1.

我的代码如下:

代码语言:javascript
复制
import kafka.serializer.StringDecoder
import kafka.api._
import kafka.api.ApiUtils._
import org.apache.spark.SparkConf
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.kafka
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.SparkContext._


object KafkaExample {

  def main(args: Array[String]) {

    val ssc = new StreamingContext("local[*]", "KafkaExample", Seconds(1))

    val kafkaParams = Map("bootstrap.servers" -> "kafkaIP:9092")

    val topics = List("logstash_log").toSet

    val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,kafkaParams,topics).map(_._2)

    stream.print()

    ssc.checkpoint("C:/checkpoint/")
    ssc.start()
    ssc.awaitTermination()
  }
}

这是非常简单的代码,只是连接火花和卡夫卡。但是,我得到了以下错误:

代码语言:javascript
复制
Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:90)
    at scala.Option.map(Option.scala:146)
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:90)
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:87)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:87)
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:86)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.immutable.Set$Set1.foreach(Set.scala:94)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:86)
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:85)
    at scala.util.Either$RightProjection.flatMap(Either.scala:522)
    at org.apache.spark.streaming.kafka.KafkaCluster.findLeaders(KafkaCluster.scala:85)
    at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:179)
    at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:161)
    at org.apache.spark.streaming.kafka.KafkaCluster.getLatestLeaderOffsets(KafkaCluster.scala:150)
    at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:215)
    at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:211)
    at scala.util.Either$RightProjection.flatMap(Either.scala:522)
    at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211)
    at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
    at com.defne.KafkaExample$.main(KafkaExample.scala:28)
    at com.defne.KafkaExample.main(KafkaExample.scala)

我哪里做错了?

注意:我尝试了"metadata.broker.list“而不是"bootstrap.server”,但是没有改变。

EN

回答 1

Stack Overflow用户

发布于 2017-11-06 15:23:04

您的问题是,加载了太多的Kafka依赖项,而运行时获取的依赖项与Spark所期望的版本不兼容。

您的实际问题是PartitionMetadata类。在0.8.2中,如下所示(这就是从spark-streaming-kafka_2.11-1.6.3中得到的):

代码语言:javascript
复制
case class PartitionMetadata(partitionId: Int, 
                             val leader: Option[Broker], 
                             replicas: Seq[Broker], 
                             isr: Seq[Broker] = Seq.empty,
                             errorCode: Short = ErrorMapping.NoError) extends Logging

在> 0.10.0.0中是这样的:

代码语言:javascript
复制
case class PartitionMetadata(partitionId: Int,
                             leader: Option[BrokerEndPoint],
                             replicas: Seq[BrokerEndPoint],
                             isr: Seq[BrokerEndPoint] = Seq.empty,
                             errorCode: Short = Errors.NONE.code) extends Logging

看到leader是如何从Option[Broker]变成Option[BrokerEndPoint]的吗?这就是斯派克大喊大叫的原因。

您必须清理您的依赖项,您所需要的只是(如果您使用的是Spark2.2):

代码语言:javascript
复制
spark-streaming_2.11-2.2.0,
spark-streaming-kafka-0-10_2.11-2.2.0
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/47138692

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档