首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >为spark-streaming RDD创建适当的模式

为spark-streaming RDD创建适当的模式
EN

Stack Overflow用户
提问于 2018-05-12 07:34:46
回答 1查看 503关注 0票数 1

我正在使用一个来自kafka的数据流,它看起来像下面的记录。我一直在努力使用嵌套的JSON字段进行适当的模式设置。这是我正在做的一个示例。我缺少的是只获取实际值的能力,而不是数组或rdd类型。感谢任何人的帮助。

代码语言:javascript
复制
{"Source":"10.30.110.45:42757","Telemetry":{"node_id_str":"ASR9006","subscription_id_str":"qos","encoding_path":"Cisco-IOS-XR-infra-statsd-oper:infra-statistics/interfaces/interface/latest/data-rate","collection_id":30905218,"collection_start_time":1524503864744,"msg_timestamp":1524503864744,"collection_end_time":1524503864746},"Rows":[{"Timestamp":1524503864746,"Keys":{"interface-name":"Bundle-Ether56"},"Content":{"bandwidth":40000000,"input-data-rate":4300587,"input-load":27,"input-packet-rate":5375721,"load-interval":0,"output-data-rate":12,"output-load":0,"output-packet-rate":5,"peak-input-data-rate":0,"peak-input-packet-rate":0,"peak-output-data-rate":0,"peak-output-packet-rate":0,"reliability":255}}]}

代码语言:javascript
复制
val schema_array = StructType (Array(
        StructField("Source",StringType),
        StructField("Telemetry",StructType(Array(
          StructField("collection_end_time",LongType),
          StructField("collection_id",LongType),
          StructField("collection_start_time",LongType),
          StructField("encoding_path",StringType),
          StructField("msg_timestamp",LongType),
          StructField("node_id_str",StringType),
          StructField("subscription_id_str",StringType)
        ))),
          StructField("Rows",ArrayType(StructType(Array(
            StructField("Timestamp",LongType),
            StructField("Keys",StructType(Array(
              StructField("interface-name",StringType)))),
            StructField("Content",StructType(Array(
            StructField("bandwidth",LongType),
            StructField("input-data-rate",LongType),
            StructField("input-load",LongType),
            StructField("input-packet-rate",LongType),
            StructField("load-interval",LongType),
            StructField("output-data-rate",LongType),
            StructField("output-load",LongType),
            StructField("output-packet-rate",LongType),
            StructField("peak-input-data-rate",LongType),
            StructField("peak-input-packet-rate",LongType),
            StructField("peak-output-data-rate",LongType),
            StructField("peak-output-packet-rate",LongType),
            StructField("reliability",LongType))))))))))

    stream.foreachRDD { (rdd, time)  =>
        val data = rdd.map (record => record.value)
        val jsonData = spark.read.schema(schema_array).json(data)

        val result = jsonData.select("Rows.Keys.interface-name")
        result.show()

我的结果是:

代码语言:javascript
复制
+----------------+
|  interface-name|
+----------------+
|[Bundle-Ether56]|
+----------------+

预期结果为:

代码语言:javascript
复制
+----------------+
|  interface-name|
+----------------+
| Bundle-Ether56 |
+----------------+

`

EN

回答 1

Stack Overflow用户

发布于 2018-05-14 06:48:24

在深入研究了一段时间后,我发现explode方法似乎对我想要做的事情起作用了。我相信,既然我在做forEach,而且一次只得到一张记录,我就可以安全地拉平我的记录。

代码语言:javascript
复制
import org.apache.spark.sql.functions.explode
  val result = jsonData.select(explode($"Rows.Keys.interface-name"))
    result.show()

结果

代码语言:javascript
复制
+--------------+
|           col|
+--------------+ 
|Bundle-Ether56|
+--------------+
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50301513

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档