首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何递归调用FlatMapFunction<Row,Row>?

如何递归调用FlatMapFunction<Row,Row>?
EN

Stack Overflow用户
提问于 2020-05-07 21:30:26
回答 1查看 83关注 0票数 2

我有一个数据集df,使用spark.read().json读取

它的模式如下所示:

代码语言:javascript
复制
root
 |-- items: struct (nullable = true)
 |    |-- item: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- batch-id column: long (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- type: string (nullable = true)

我想使用FlatMapFunction来获得一个带有内部模式(id、名称、类型)的dataset。

我想做一些类似以下的事情:

代码语言:javascript
复制
df.flatMap(mapperFunction(),RowEncoder.apply(someSchema);

public static FlatMapFunction<Row,Row> mapperFunction() {
    return row -> {
      Row r1 = row.getAs("items");
      List<Row> r2 = r1.getList(0);    //This will explode the column
      StructType schema = r2.get(0).schema();
      //I know list doesn't have map function, I want to know what can be done here
      return r2.flatMap(mapperFunction(),RowEncoder.apply(schema);    
    };
  }
EN

回答 1

Stack Overflow用户

发布于 2020-05-25 03:18:23

有几个选项:

选项1:使用explode

扁平化数据结构的最简单方法是使用explode而不是flatMap调用:

从数据开始

代码语言:javascript
复制
{"items": {"item": [{"batch-id":1,"id":"id1","name":"name1","type":"type1"},{"batch-id":2,"id":"id2","name":"name2","type":"type2"}]}}

代码

代码语言:javascript
复制
df.withColumn("exploded", explode(col("items.item"))).select("exploded.*").show();

打印

代码语言:javascript
复制
+--------+---+-----+-----+
|batch_id| id| name| type|
+--------+---+-----+-----+
|       1|id1|name1|type1|
|       2|id2|name2|type2|
+--------+---+-----+-----+

选项2:使用flatMap

如果需要flatMap调用(例如,向映射中添加更多逻辑),此代码将打印相同的结果:

代码语言:javascript
复制
df.flatMap(mapperFunction(), Encoders.bean(Data.class)).show();

使用映射函数

代码语言:javascript
复制
private static FlatMapFunction<Row, Data> mapperFunction() {
    return row -> {
        Row r1 = row.getAs("items");
        List<Row> r2 = r1.getList(0);    //This will explode the column
        return r2.stream().map(entry -> {
            Data d = new Data();
            d.setBatch_id(entry.getLong(0));
            d.setId(entry.getString(1));
            d.setName(entry.getString(2));
            d.setType(entry.getString(3));
            return d;
        }).iterator();
    };
}

和数据bean

代码语言:javascript
复制
public static class Data implements Serializable {
    private long batch_id;
    private String id;
    private String name;
    private String type;

    //getters and setters
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61659040

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档