我正在使用一个来自kafka的数据流,它看起来像下面的记录。我一直在努力使用嵌套的JSON字段进行适当的模式设置。这是我正在做的一个示例。我缺少的是只获取实际值的能力,而不是数组或rdd类型。感谢任何人的帮助。
{"Source":"10.30.110.45:42757","Telemetry":{"node_id_str":"ASR9006","subscription_id_str":"qos","encoding_path":"Cisco-IOS-XR-infra-statsd-oper:infra-statistics/interfaces/interface/latest/data-rate","collection_id":30905218,"collection_start_time":1524503864744,"msg_timestamp":1524503864744,"collection_end_time":1524503864746},"Rows":[{"Timestamp":1524503864746,"Keys":{"interface-name":"Bundle-Ether56"},"Content":{"bandwidth":40000000,"input-data-rate":4300587,"input-load":27,"input-packet-rate":5375721,"load-interval":0,"output-data-rate":12,"output-load":0,"output-packet-rate":5,"peak-input-data-rate":0,"peak-input-packet-rate":0,"peak-output-data-rate":0,"peak-output-packet-rate":0,"reliability":255}}]}val schema_array = StructType (Array(
StructField("Source",StringType),
StructField("Telemetry",StructType(Array(
StructField("collection_end_time",LongType),
StructField("collection_id",LongType),
StructField("collection_start_time",LongType),
StructField("encoding_path",StringType),
StructField("msg_timestamp",LongType),
StructField("node_id_str",StringType),
StructField("subscription_id_str",StringType)
))),
StructField("Rows",ArrayType(StructType(Array(
StructField("Timestamp",LongType),
StructField("Keys",StructType(Array(
StructField("interface-name",StringType)))),
StructField("Content",StructType(Array(
StructField("bandwidth",LongType),
StructField("input-data-rate",LongType),
StructField("input-load",LongType),
StructField("input-packet-rate",LongType),
StructField("load-interval",LongType),
StructField("output-data-rate",LongType),
StructField("output-load",LongType),
StructField("output-packet-rate",LongType),
StructField("peak-input-data-rate",LongType),
StructField("peak-input-packet-rate",LongType),
StructField("peak-output-data-rate",LongType),
StructField("peak-output-packet-rate",LongType),
StructField("reliability",LongType))))))))))
stream.foreachRDD { (rdd, time) =>
val data = rdd.map (record => record.value)
val jsonData = spark.read.schema(schema_array).json(data)
val result = jsonData.select("Rows.Keys.interface-name")
result.show()我的结果是:
+----------------+
| interface-name|
+----------------+
|[Bundle-Ether56]|
+----------------+预期结果为:
+----------------+
| interface-name|
+----------------+
| Bundle-Ether56 |
+----------------+`
发布于 2018-05-14 06:48:24
在深入研究了一段时间后,我发现explode方法似乎对我想要做的事情起作用了。我相信,既然我在做forEach,而且一次只得到一张记录,我就可以安全地拉平我的记录。
import org.apache.spark.sql.functions.explode
val result = jsonData.select(explode($"Rows.Keys.interface-name"))
result.show()结果
+--------------+
| col|
+--------------+
|Bundle-Ether56|
+--------------+https://stackoverflow.com/questions/50301513
复制相似问题