首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >ClassPath与Spark2流在纱线上的应用

ClassPath与Spark2流在纱线上的应用
EN

Stack Overflow用户
提问于 2018-03-23 10:06:56
回答 1查看 190关注 0票数 0

在纱线客户端模式下使用Kafka 0.10运行Spark流作业时,我遇到了一些类路径问题:

代码语言:javascript
复制
java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka010.KafkaRDDPartition

我已经读过了将罐子添加到火花作业中星火卡夫卡流媒体问题的问题(解释得很好),但我仍然被塞在……

我的项目是用Maven管理的,其中包括一个用于Spark的scala部分。使用Maven Shade插件创建一个uber jar,其中包含缺失类的火花流-kafka-0.10_2.11依赖项。

在本地模式下执行星火应用程序就像预期的那样工作。尽管如此,在纱线客户端模式下执行它,我想是从执行者类路径中丢失了KafkaRDDPartition。

我试图将extraClassPath属性添加到我的SparkConf中,但是它没有改变任何东西(不要对硬编码的路径进行攻击,这只是一个测试)

代码语言:javascript
复制
val conf: SparkConf = new SparkConf()
  conf.set("spark.streaming.concurrentJobs", "2")
  conf.set("spark.executor.extraClassPath",sparkHome + "/kafka-0.10")
  conf.setAppName(classOf[KafkaSubscriber].getSimpleName)
  conf.setMaster(sparkMaster)
  conf.setSparkHome(sparkHome)

在创建SparkStreamingContext之前将jars添加到Spark也没有改变任何东西

代码语言:javascript
复制
val folder: File = new File(sparkHome + "/kafka-0.10");
  if (folder.exists && folder.isDirectory) {
    for (f <- folder.listFiles.filter(_.isFile).toSeq) {
      spark.sparkContext.addJar(f.getAbsolutePath);
    }
  }

我使用的是Cloudera平台,部署并激活了整个集群中的Spark2包,因此默认情况下,Spark2服务被配置为使用Kafka0.10。

另一件重要的事情是,因为它是Spring应用程序,所以我不是用spark提交来执行spark应用程序,而是执行这个

代码语言:javascript
复制
java -Dloader.path=file:///etc/hadoop/conf,myApp.jar -jar myApp.jar

我遗漏了什么?

星条旗代码是这样的

代码语言:javascript
复制
class KafkaSubscriber(sparkMaster: String, duration: Duration, topicSession: String, brokers: String) {

  val sparkHome: String = "/opt/cloudera/parcels/SPARK2/lib/spark"

  // Create Spark Conf
  val conf: SparkConf = new SparkConf()
  conf.set("spark.streaming.concurrentJobs", "2")
  conf.set("spark.executor.extraClassPath",sparkHome + "/kafka-0.10")
  conf.setAppName(classOf[KafkaSubscriber].getSimpleName)
  conf.setMaster(sparkMaster)
  conf.setSparkHome(sparkHome)

  // Create Spark Session
  // **********
  val spark: SparkSession = SparkSession.builder()
    .appName(classOf[KafkaSubscriber].getSimpleName)
    .master(sparkMaster)
    .config(conf)
    .getOrCreate()

  // Set the Kafka dependencies
  val folder: File = new File(sparkHome + "/kafka-0.10");
  if (folder.exists && folder.isDirectory) {
    for (f <- folder.listFiles.filter(_.isFile).toSeq) {
      spark.sparkContext.addJar(f.getAbsolutePath);
    }
  }

  // Create Spark Streaming Context
  // **********
  val ssc: StreamingContext = StreamingContext.getActiveOrCreate(() => new StreamingContext(spark.sparkContext, duration))

  def subscribe {

    // Some code here ...

    // Subscribe to Kafka
    // **********
    val topicSetSession: Array[String] = topicSession.split(",")
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> brokers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "KafkaSubscriber",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean))

    // Get Session Stream
    val rawSessionStream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topicSetSession, kafkaParams))

    // Some code here ...

  }

  /**
   * Start the Spark ETL
   */
  def startEtl {
    // Start the Spark Streaming batch
    ssc.start()
    ssc.awaitTermination()
  }

  /**
   * Close the spark session
   */
  def close {
    ssc.stop(true)
    spark.close
  }

和Java部分来调用Spark代码

代码语言:javascript
复制
@Component
@Scope("singleton")
public class KafkaStreaming implements InitializingBean {

    @Autowired
    private KafkaSubscriber kafkaSubscriber;

    @Override
    public void afterPropertiesSet() throws Exception {

        // Get and store the HbbTV Config
        kafkaSubscriber.subscribe();

        // Start Spark
        kafkaSubscriber.startEtl();
    }
}

谢谢你的帮助!

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-07-11 17:31:18

嗯,我的问题是卡夫卡集成的Cloudera配置出现了错误。默认情况下,卡夫卡0.9被选中,包括卡夫卡0.9罐进入类路径,而不是卡夫卡0.10。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/49447088

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档