首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Flink DataStream如何将自定义POJO合并到另一个DataStream中

Flink DataStream如何将自定义POJO合并到另一个DataStream中
EN

Stack Overflow用户
提问于 2020-03-07 10:08:33
回答 1查看 731关注 0票数 0

我希望使用架构信息将DataStream转换为DataStream

输入

参数DataStream

代码语言:javascript
复制
{"fields":["China","Beijing"]}

args1模式

代码语言:javascript
复制
message spark_schema {
  optional binary country (UTF8);
  optional binary city (UTF8);
}

期望输出

代码语言:javascript
复制
{"country":"china", "city":"beijing"}

我的代码如下所示

代码语言:javascript
复制
public DataStream<String> convert(DataStream source, MessageType messageType) {

        SingleOutputStreamOperator<String> dataWithSchema = source.map((MapFunction<Row, String>) row -> {
            JSONObject data = new JSONObject();
            this.fields = messageType.getFields().stream().map(Type::getName).collect(Collectors.toList());
            for (int i = 0; i < fields.size(); i++) {
                data.put(fields.get(i), row.getField(i));
            }
            return data.toJSONString();
        });
        return dataWithSchema;
    }

异常错误

代码语言:javascript
复制
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Object com.xxxx.ParquetDataSourceReader$$Lambda$64/1174881426@d78795 is not serializable
    at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:180)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1823)
    at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:188)
    at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:590)

但是下面的代码运行良好

代码语言:javascript
复制
public DataStream<String> convert(DataStream source, MessageType messageType) {
        if (this.fields == null) {
            throw new RuntimeException("The schema of AbstractRowStreamReader is null");
        }

        List<String> field = messageType.getFields().stream().map(Type::getName).collect(Collectors.toList());
        SingleOutputStreamOperator<String> dataWithSchema = source.map((MapFunction<Row, String>) row -> {
            JSONObject data = new JSONObject();
            for (int i = 0; i < field.size(); i++) {
                data.put(field.get(i), row.getField(i));
            }
            return data.toJSONString();
        });
        return dataWithSchema;
    }

Flink map运算符如何组合外部复杂POJO?

EN

回答 1

Stack Overflow用户

发布于 2020-03-09 16:20:53

为了让Flink跨任务分发代码,代码需要完全Serializable。在您的第一个示例中,它不是;在第二个示例中,它是。特别是,Type::getName将生成一个非Serializable的lambda。

要获得一个Serializable的lambda,您需要显式地将其强制转换为可序列化的接口(例如Flink MapFunction),或者将其与(Serializable & Function)一起使用强制转换

因为第二个也节省了计算,所以在任何情况下它都会更好。在作业编译过程中,Convert只执行一次,而每条记录都会调用DataStream#map。如果还不清楚,我建议在IDE中执行它,并尝试使用断点。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60573782

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档