首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >为什么spark-submit找不到kafka数据源,除非使用--packages?

为什么spark-submit找不到kafka数据源,除非使用--packages?
EN

Stack Overflow用户
提问于 2017-09-01 21:44:24
回答 3查看 3.6K关注 0票数 4

我正在尝试将Kafka集成到我的Spark应用程序中,这是我的POM文件所需的条目:

代码语言:javascript
复制
<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>${spark.stream.kafka.version}</version>
</dependency>
<dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>${kafka.version}</version>
</dependency>

对应的工件版本为:

代码语言:javascript
复制
<kafka.version>0.10.2.0</kafka.version>
<spark.stream.kafka.version>2.2.0</spark.stream.kafka.version>

我一直在挠头:

代码语言:javascript
复制
Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html

我还尝试为jar提供--jars参数,但是没有帮助。这里我漏掉了什么?

代码:

代码语言:javascript
复制
private static void startKafkaConsumerStream() {

        Dataset<HttpPackage> ds1 = _spark
                .readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", getProperty("kafka.bootstrap.servers"))
                .option("subscribe", HTTP_FED_VO_TOPIC)
                .load() // Getting the error here
                .as(Encoders.bean(HttpPackage.class));

        ds1.foreach((ForeachFunction<HttpPackage>)  req ->System.out.print(req));

    }

并且_spark被定义为:

代码语言:javascript
复制
_spark = SparkSession
                .builder()
                .appName(_properties.getProperty("app.name"))
                .config("spark.master", _properties.getProperty("master"))
                .config("spark.es.nodes", _properties.getProperty("es.hosts"))
                .config("spark.es.port", _properties.getProperty("es.port"))
                .config("spark.es.index.auto.create", "true")
                .config("es.net.http.auth.user", _properties.getProperty("es.net.http.auth.user"))
                .config("es.net.http.auth.pass", _properties.getProperty("es.net.http.auth.pass"))
                .getOrCreate();

我的导入是:

代码语言:javascript
复制
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;

但是,当我像前面提到的那样运行我的代码时,使用了here选项:

代码语言:javascript
复制
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0

它起作用了

EN

回答 3

Stack Overflow用户

发布于 2018-01-02 21:41:51

Spark Structured Streaming使用外部kafka-0-10-sql模块支持Apache Kafka作为流源和接收器。

对于使用spark-submit提交以供执行的kafka-0-10-sql应用程序,Spark模块不可用。该模块是外部的,要使其可用,您应该将其定义为依赖项。

除非在Spark应用程序中使用特定于kafka-0-10-sql模块的代码,否则不必在pom.xml中将模块定义为dependency。您根本不需要在模块上使用编译依赖项,因为没有代码使用模块的代码。您是针对接口编写代码的,这也是Spark SQL如此易于使用的原因之一(即,只需很少的代码就可以拥有相当复杂的分布式应用程序)。

但是,spark-submit需要--packages命令行选项,您已经报告它工作得很好。

但是,当我像这里提到的那样运行我的代码时,它使用了package选项:

--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0

它在--packages上运行良好的原因是你必须告诉星火基础设施在哪里可以找到kafka格式的定义。

这就引出了使用Kafka运行Spark流媒体应用的另一个“问题”(或要求)。必须在spark-sql-kafka模块上指定运行时依赖项

您可以使用--packages命令行选项(在spark-submit应用程序之后下载必要的jar)或创建所谓的uber-jar (或fat-jar)来指定运行时依赖项。

这就是pom.xml发挥作用的地方(这也是为什么人们在pom.xmldependency模块方面提供帮助的原因)。

因此,首先,您必须在pom.xml中指定依赖项。

代码语言:javascript
复制
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
  <version>2.2.0</version>
</dependency>

最后但并非最不重要的一点是,您必须构建一个使用Apache Maven Shade Pluginpom.xml中配置的超级jar。

使用Apache Maven Shade插件,您可以在create an Uber JAR应用程序jar文件中包含kafka格式工作的所有“基础设施”。事实上,Uber将包含所有必要的运行时依赖项,因此您可以单独使用JAR进行spark-submit (没有--packages选项或类似选项)。

票数 4
EN

Stack Overflow用户

发布于 2017-09-01 22:04:06

将以下依赖项添加到pom.xml文件中。

代码语言:javascript
复制
<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
        <version>2.2.0</version>
</dependency>
票数 1
EN

Stack Overflow用户

发布于 2017-09-02 02:54:07

更新您的依赖项和版本。下面给出的依赖关系应该可以正常工作:

代码语言:javascript
复制
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.1.1</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.1.1</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>2.1.1</version>
    </dependency>

注:注意在前两个依赖项中提供了作用域。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/46001583

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档