我希望使用架构信息将DataStream转换为DataStream
输入
参数DataStream
{"fields":["China","Beijing"]}args1模式
message spark_schema {
optional binary country (UTF8);
optional binary city (UTF8);
}期望输出
{"country":"china", "city":"beijing"}我的代码如下所示
public DataStream<String> convert(DataStream source, MessageType messageType) {
SingleOutputStreamOperator<String> dataWithSchema = source.map((MapFunction<Row, String>) row -> {
JSONObject data = new JSONObject();
this.fields = messageType.getFields().stream().map(Type::getName).collect(Collectors.toList());
for (int i = 0; i < fields.size(); i++) {
data.put(fields.get(i), row.getField(i));
}
return data.toJSONString();
});
return dataWithSchema;
}异常错误
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Object com.xxxx.ParquetDataSourceReader$$Lambda$64/1174881426@d78795 is not serializable
at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:180)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1823)
at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:188)
at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:590)但是下面的代码运行良好
public DataStream<String> convert(DataStream source, MessageType messageType) {
if (this.fields == null) {
throw new RuntimeException("The schema of AbstractRowStreamReader is null");
}
List<String> field = messageType.getFields().stream().map(Type::getName).collect(Collectors.toList());
SingleOutputStreamOperator<String> dataWithSchema = source.map((MapFunction<Row, String>) row -> {
JSONObject data = new JSONObject();
for (int i = 0; i < field.size(); i++) {
data.put(field.get(i), row.getField(i));
}
return data.toJSONString();
});
return dataWithSchema;
}Flink map运算符如何组合外部复杂POJO?
发布于 2020-03-09 16:20:53
为了让Flink跨任务分发代码,代码需要完全Serializable。在您的第一个示例中,它不是;在第二个示例中,它是。特别是,Type::getName将生成一个非Serializable的lambda。
要获得一个Serializable的lambda,您需要显式地将其强制转换为可序列化的接口(例如Flink MapFunction),或者将其与(Serializable & Function)一起使用强制转换
因为第二个也节省了计算,所以在任何情况下它都会更好。在作业编译过程中,Convert只执行一次,而每条记录都会调用DataStream#map。如果还不清楚,我建议在IDE中执行它,并尝试使用断点。
https://stackoverflow.com/questions/60573782
复制相似问题