我使用Spark2.3.2并读取一个多行JSON文件。这是df.printSchema()的输出
root
|-- data: struct (nullable = true)
| |-- items: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- context: struct (nullable = true)
| | | | |-- environment: struct (nullable = true)
| | | | | |-- tag: struct (nullable = true)
| | | | | | |-- weather: string (nullable = true)
| | | | | |-- weather: struct (nullable = true)
| | | | | | |-- clouds: double (nullable = true)
| | | | | | |-- rain: long (nullable = true)
| | | | | | |-- temp: long (nullable = true)
| | | | |-- personal: struct (nullable = true)
| | | | | |-- activity: struct (nullable = true)
| | | | | | |-- conditions: array (nullable = true)
| | | | | | | |-- element: string (containsNull = true)
| | | | | | |-- kind: string (nullable = true)
| | | | | |-- status: struct (nullable = true)
| | | | | | |-- speed: double (nullable = true)
| | | | |-- timespace: struct (nullable = true)
| | | | | |-- geo: struct (nullable = true)
| | | | | | |-- coordinates: array (nullable = true)
| | | | | | | |-- element: double (containsNull = true)
| | | | | | |-- type: string (nullable = true)
| | | | | |-- tag: struct (nullable = true)
| | | | | | |-- season: string (nullable = true)
| | | | | |-- timestamp: string (nullable = true)
| | | |-- passport: struct (nullable = true)
| | | | |-- pid: string (nullable = true)
| | | | |-- uid: string (nullable = true)可以看出,JSON文件有一个嵌套的结构,检索特定的嵌套特性(例如季节、速度等)并不容易。
我就是这样读取数据的:
SparkSession spark = SparkSession.builder()
.config("spark.rdd.compress", "true")
.appName("Test")
.master("local[*]")
.getOrCreate();
df = spark
.read()
.option("multiLine", true).option("mode", "PERMISSIVE")
.json(filePath);如何在单独的数据集中获取timestamp和weather标记?
timestamp weather
... ...
... ...我试过了,但没有用:
df.registerTempTable("df");
Dataset result = spark.sql("SELECT data.items.element.passport.uid FROM df");或
Dataset result = df.withColumn("items",
org.apache.spark.sql.functions.explode(df.col("data.items")))
.select(df.col("items.context.environment.weather"));发布于 2018-11-30 05:10:22
您可以读取多行json文件并选择嵌套数据,如下所示。
//Read multiline json
Dataset<Row> ds = spark.read().option("multiLine", true).option("mode", "PERMISSIVE")
.json("c:\\temp\\test.json");
//print schema
ds.printSchema();
//get weather
Dataset<Row> ds1 = ds.select("data.items.context.environment.weather");
ds1.show(false);
//get timestamp
Dataset<Row> ds2 = ds.select("data.items.context.timestamp");
ds2.show(false);
//get weather and timestamp
Dataset<Row> ds3 = ds.select("data.items.context.environment.weather", "data.items.context.timestamp");
ds3.show(false);使用Spark2.4.0,您可以使用explode和arrays_zip函数来爆炸并组合多个列
import static org.apache.spark.sql.functions.explode;
import static org.apache.spark.sql.functions.arrays_zip;
import static org.apache.spark.sql.functions.col;
Dataset<Row> ds4 = ds3.withColumn("values", explode(arrays_zip(col("weather"), col("timestamp")))).select(col("values.weather"), col("values.timestamp"));
ds4.show(false);https://stackoverflow.com/questions/53547514
复制相似问题