首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spark streaming 2.4.0获取org.apache.spark.sql.AnalysisException:找不到数据源: kafka

Spark streaming 2.4.0获取org.apache.spark.sql.AnalysisException:找不到数据源: kafka
EN

Stack Overflow用户
提问于 2020-12-07 13:58:33
回答 1查看 172关注 0票数 0

尝试从Kafka读取数据时出现以下错误。我使用docker-compose来运行kafka和spark。

Exception in thread "main" org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".

下面是我用于阅读的代码:

代码语言:javascript
复制
object Livedata extends App with LazyLogging {
  logger.info("starting livedata...")
  val spark = SparkSession.builder().appName("livedata").master("local[*]").getOrCreate()

  val df = spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "kafka:9092")
        .option("subscribe", "topic")
        .option("startingOffsets", "latest")
        .load()

  df.printSchema()

  val hadoopConfig = spark.sparkContext.hadoopConfiguration
  hadoopConfig.set("fs.hdfs.impl", classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)
  hadoopConfig.set("fs.file.impl", classOf[org.apache.hadoop.fs.LocalFileSystem].getName)

}

在阅读了一些答案之后,我添加了用于sbt构建的所有包

这是build.sbt文件:

代码语言:javascript
复制
lazy val root = (project in file(".")).
  settings(
    inThisBuild(List(
      organization := "com.live.data",
      version := "0.1.0",
      scalaVersion := "2.12.2",
      assemblyJarName in assembly := "livedata.jar"
)),
    name := "livedata",
    libraryDependencies ++= List(
      "org.scalatest" %% "scalatest" % "3.0.5",
      "com.typesafe.scala-logging" %% "scala-logging" % "3.9.0",
      "org.apache.spark" %% "spark-sql" % "2.4.0",
      "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.0" % "provided",
      "org.apache.kafka"           % "kafka-clients"            % "2.5.0",
      "org.apache.kafka"           % "kafka-streams"            % "2.5.0",
      "org.apache.kafka"           %% "kafka-streams-scala"     % "2.5.0"
)
)
assemblyMergeStrategy in assembly := {
  case PathList("META-INF", xs@_*) => MergeStrategy.discard
  case x => MergeStrategy.first
}

不确定这里的主要问题是什么。

更新:

主要问题是获取此org.apache.spark.sql.AnalysisException:找不到数据源: kafka异常,因为spark-sql-kafka库在类路径中不可用&它无法在META-INF/ org.apache.spark.sql.sources.DataSourceRegister文件夹中找到服务。

下面的代码块需要添加到build.sbt中。这将在最终的jar中包含org.apache.spark.sql.sources.DataSourceRegister文件。

代码语言:javascript
复制
// META-INF discarding
assemblyMergeStrategy in assembly := {
  case PathList("META-INF","services",xs @ _*) => MergeStrategy.filterDistinctLines
  case PathList("META-INF",xs @ _*) => MergeStrategy.discard
  case "application.conf" => MergeStrategy.concat
  case _ => MergeStrategy.first
}```
EN

回答 1

Stack Overflow用户

发布于 2020-12-07 14:30:55

没有提供spark-sql-kafka-0-10,因此删除该部分依赖项。(虽然提供了spark-sql,因此您可以将其添加到其中)

您也不应该拉取Kafka流(因为Spark不使用它),并且kafka客户端是由sql-kafka传递拉取的,所以也不需要它

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65176768

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档