首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在spark-streaming中解析json

在spark-streaming中解析json
EN

Stack Overflow用户
提问于 2014-09-03 20:06:19
回答 2查看 10.1K关注 0票数 9

我是spark的新手,我正在尝试从kafka主题中接收一个被构造为json的DStream,我想解析每个json的内容。我收到的json是这样的:

代码语言:javascript
复制
{"type":"position","ident":"IBE32JZ","air_ground":"A","alt":"34000","clock":"1409733420","id":"IBE32JZ-1409715361-ed-0002:0","gs":"446","heading":"71","lat":"44.50987","lon":"2.98972","reg":"ECJRE","squawk":"1004","updateType":"A","altChange":" "}

我试图只提取ident字段,至少现在是这样,并且我正在使用lift-json库来解析de数据。我的程序看起来像这样:

代码语言:javascript
复制
object ScalaExample {
    val kafkaHost = "localhost"
    val kafkaPort = 9092
    val zookeeperHost = "localhost"
    val zookeeperPort = 2181

    implicit val formats = DefaultFormats
    case class PlaneInfo(ident: String)


    def parser(json: String): String = {
        val parsedJson = parse(json)
        val m = paso1.extract[PlaneInfo]
        return m.ident
    }

    def main(args : Array[String]) {
        val zkQuorum = "localhost:2181"
        val group = "myGroup"
        val topic = Map("flightStatus" -> 1)
        val sparkContext = new SparkContext("local[4]", "KafkaConsumer")
        val ssc = new StreamingContext(sparkContext, Seconds(10))


        val json = KafkaUtils.createStream(ssc, zkQuorum, group, topic)

        val id = json.map(_._2).map(parser)

        id.print

        ssc.start()
 }
}

但它抛出了下面的异常:

代码语言:javascript
复制
java.lang.NoClassDefFoundError: scala/reflect/ClassManifest
    at net.liftweb.json.JsonAST$JValue.extract(JsonAST.scala:300)
    at aero.catec.stratio.ScalaExample$.parser(ScalaExample.scala:33)
    at aero.catec.stratio.ScalaExample$$anonfun$2.apply(ScalaExample.scala:48)
    at aero.catec.stratio.ScalaExample$$anonfun$2.apply(ScalaExample.scala:48)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$28.apply(RDD.scala:1003)
    at org.apache.spark.rdd.RDD$$anonfun$28.apply(RDD.scala:1003)
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083)
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083)
    at org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:575)
    at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:560)
Caused by: java.lang.ClassNotFoundException: scala.reflect.ClassManifest
    at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

问题是,如果不使用spark (从文件中读取)运行相同的程序,它就能完美地工作。当我试图将它放入spark程序中时,问题就开始了。另外,如果我将解析器函数更改为如下所示:

代码语言:javascript
复制
def parser(json: String): JValue = {
  val parsedJson = parse(json)
  return (parsedJson \\ "ident")
}

它也是有效的。但是当我试图提取实际的字符串时,我得到了同样的错误。

谢谢你的帮助。我希望我已经解释得很好了。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2014-09-03 20:22:48

这是因为您缺少序列化/反序列化记录所需的scala反映依赖项。尝试添加与spark版本匹配的scala反射jar。

提示:"org.scala-lang“% "scala-reflect”% Version.scala

票数 2
EN

Stack Overflow用户

发布于 2014-09-03 20:22:55

哦,一个很好的老问题:-)

基本上,这表明存在版本问题:您的一个依赖项与您当前使用的Scala编译器不兼容。你在2.10吗?

试着用谷歌搜索短语"NoClassDefFoundError: scala/reflect/ClassManifest",我相信你会找到足够多的关于这个问题的描述。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/25643872

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档