我收到了卡夫卡的一条信息,它由几条独立的json线路组成。我想将这条消息流到hdfs中。问题是,我的代码只保存了第一个json,而忽略了其余的json。
例1 kafka消息(不是多条消息):
{"field": "1"}
{"field": "2"}
{"field": "3"}scala代码的一部分:
val stream = KafkaSource.kafkaStream[String, String, StringDecoder, StringDecoder](
streamingContext, brokers, new ZooKeeperOffsetsStore(zkQuorum, zkPath), topic)
stream.foreachRDD(rdd => {
if (!rdd.isEmpty) {
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
val df = spark.sqlContext.read.format(rdd.map(m => m._2))
df.write.mode(SaveMode.Append).format("json").save(outputPath)
}
})特别的解决方案在于rdd.map(m => m._2)部分,在这里我需要映射所有的行,而不仅仅是第一行。在我看来,rdd本身已经被裁剪了,并且不包含其余的json行。
发布于 2019-02-18 08:08:05
我用文本而不是json来解决这个问题。主要区别在于toDF()转换:
stream.foreachRDD(rdd => {
if (!rdd.isEmpty) {
//works as .txt file:
rdd.map(m => m._2).toDF().coalesce(1).write.mode(SaveMode.Append).format("text").save(outputPath)
}
})https://stackoverflow.com/questions/53943912
复制相似问题