我有一个数据集df,使用spark.read().json读取
它的模式如下所示:
root
|-- items: struct (nullable = true)
| |-- item: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- batch-id column: long (nullable = true)
| | | |-- id: string (nullable = true)
| | | |-- name: string (nullable = true)
| | | |-- type: string (nullable = true)我想使用FlatMapFunction来获得一个带有内部模式(id、名称、类型)的dataset。
我想做一些类似以下的事情:
df.flatMap(mapperFunction(),RowEncoder.apply(someSchema);
public static FlatMapFunction<Row,Row> mapperFunction() {
return row -> {
Row r1 = row.getAs("items");
List<Row> r2 = r1.getList(0); //This will explode the column
StructType schema = r2.get(0).schema();
//I know list doesn't have map function, I want to know what can be done here
return r2.flatMap(mapperFunction(),RowEncoder.apply(schema);
};
}发布于 2020-05-25 03:18:23
有几个选项:
选项1:使用explode
扁平化数据结构的最简单方法是使用explode而不是flatMap调用:
从数据开始
{"items": {"item": [{"batch-id":1,"id":"id1","name":"name1","type":"type1"},{"batch-id":2,"id":"id2","name":"name2","type":"type2"}]}}代码
df.withColumn("exploded", explode(col("items.item"))).select("exploded.*").show();打印
+--------+---+-----+-----+
|batch_id| id| name| type|
+--------+---+-----+-----+
| 1|id1|name1|type1|
| 2|id2|name2|type2|
+--------+---+-----+-----+选项2:使用flatMap
如果需要flatMap调用(例如,向映射中添加更多逻辑),此代码将打印相同的结果:
df.flatMap(mapperFunction(), Encoders.bean(Data.class)).show();使用映射函数
private static FlatMapFunction<Row, Data> mapperFunction() {
return row -> {
Row r1 = row.getAs("items");
List<Row> r2 = r1.getList(0); //This will explode the column
return r2.stream().map(entry -> {
Data d = new Data();
d.setBatch_id(entry.getLong(0));
d.setId(entry.getString(1));
d.setName(entry.getString(2));
d.setType(entry.getString(3));
return d;
}).iterator();
};
}和数据bean
public static class Data implements Serializable {
private long batch_id;
private String id;
private String name;
private String type;
//getters and setters
}https://stackoverflow.com/questions/61659040
复制相似问题