首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何存储卡夫卡的JSONLines RDD消息

如何存储卡夫卡的JSONLines RDD消息
EN

Stack Overflow用户
提问于 2018-12-27 10:58:28
回答 1查看 85关注 0票数 0

我收到了卡夫卡的一条信息,它由几条独立的json线路组成。我想将这条消息流到hdfs中。问题是,我的代码只保存了第一个json,而忽略了其余的json。

例1 kafka消息(不是多条消息):

代码语言:javascript
复制
{"field": "1"}
{"field": "2"}
{"field": "3"}

scala代码的一部分:

代码语言:javascript
复制
 val stream = KafkaSource.kafkaStream[String, String, StringDecoder, StringDecoder](
      streamingContext, brokers, new ZooKeeperOffsetsStore(zkQuorum, zkPath), topic)
    stream.foreachRDD(rdd => {
      if (!rdd.isEmpty) {

        val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()

        val df = spark.sqlContext.read.format(rdd.map(m => m._2))

        df.write.mode(SaveMode.Append).format("json").save(outputPath)
      }

    })

特别的解决方案在于rdd.map(m => m._2)部分,在这里我需要映射所有的行,而不仅仅是第一行。在我看来,rdd本身已经被裁剪了,并且不包含其余的json行。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-02-18 08:08:05

我用文本而不是json来解决这个问题。主要区别在于toDF()转换:

代码语言:javascript
复制
stream.foreachRDD(rdd => {

      if (!rdd.isEmpty) {        
        //works as .txt file: 
        rdd.map(m => m._2).toDF().coalesce(1).write.mode(SaveMode.Append).format("text").save(outputPath)


      }
    })
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/53943912

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档