我试图使用Apache将复杂的数据类型(例如Array、Map)写入Parquet文件格式。对于用例,我正在读取JSON文件中的数据,执行一些内部数据转换,然后尝试使用FileSink。
然而,这是行不通的。这很奇怪,因为拼花文件声明如下:
拼花是用复杂的嵌套数据结构从头开始构建的,并使用Dremel文件中描述的记录分解和组装算法。我们认为这种方法优于简单的嵌套名称空间的扁平。
我希望它能够正确地处理嵌套的数据类型,除非我做错了什么。
以下是错误消息:
Caused by: java.lang.UnsupportedOperationException: Unsupported type: ARRAY<DECIMAL(12, 6)>
at org.apache.flink.formats.parquet.utils.ParquetSchemaConverter.convertToParquetType(ParquetSchemaConverter.java:615)
at org.apache.flink.formats.parquet.utils.ParquetSchemaConverter.convertToParquetType(ParquetSchemaConverter.java:553)
at org.apache.flink.formats.parquet.utils.ParquetSchemaConverter.convertToParquetMessageType(ParquetSchemaConverter.java:547)
at org.apache.flink.formats.parquet.row.ParquetRowDataBuilder$ParquetWriteSupport.<init>(ParquetRowDataBuilder.java:72)
at org.apache.flink.formats.parquet.row.ParquetRowDataBuilder$ParquetWriteSupport.<init>(ParquetRowDataBuilder.java:70)
at org.apache.flink.formats.parquet.row.ParquetRowDataBuilder.getWriteSupport(ParquetRowDataBuilder.java:67)
at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:563)
at org.apache.flink.formats.parquet.row.ParquetRowDataBuilder$FlinkParquetBuilder.createWriter(ParquetRowDataBuilder.java:135)
at org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:56)下面是我使用的JSON文件,存储在src/main/resources/NESTED.json下
{"discount":[670237.997082,634079.372133,303534.821218]}源代码:
object ReadJsonNestedData {
def main(args: Array[String]): Unit = {
// setup
val jsonResource = getClass.getResource("/NESTED.json")
val jsonFilePath = jsonResource.getPath
val tableName = "orders"
val readJSONTable =
s"""
| CREATE TABLE $tableName (
| `discount` ARRAY<DECIMAL(12, 6)>
| )WITH (
| 'connector' = 'filesystem',
| 'path' = '$jsonFilePath',
| 'format' = 'json'
|)""".stripMargin
val colFields = Array("discount")
val defaultDataTypes = Array(DataTypes.ARRAY(DataTypes.DECIMAL(12, 6)))
val rowType = RowType.of(defaultDataTypes.map(_.getLogicalType), colFields)
val defaultDataTypesAsList = defaultDataTypes.toList.asJava
val dataType = new FieldsDataType(rowType, defaultDataTypesAsList)
// Job
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val tableEnv = StreamTableEnvironment.create(env)
tableEnv.executeSql(readJSONTable)
val ordersTable = tableEnv.from(tableName)
val dataStream = tableEnv
.toDataStream(ordersTable)
.map(new ConvertRowToRowDataMapFunction(dataType))
val sink = FileSink
.forBulkFormat(
WriteParquetJobExample.outputBasePath,
ParquetRowDataBuilder.createWriterFactory(rowType, Config.hadoopConfig, true)
)
.build()
dataStream.sinkTo(sink)
env.execute()
}
}
class ConvertRowToRowDataMapFunction(fieldsDataType: FieldsDataType)
extends RichMapFunction[Row, RowData] {
private final val rowRowConverter = RowRowConverter.create(fieldsDataType)
override def open(parameters: Configuration): Unit = {
super.open(parameters)
rowRowConverter.open(this.getClass.getClassLoader)
}
override def map(row: Row): RowData =
this.rowRowConverter.toInternal(row)
}环境:
1.82.12.151.13.5flink-table-api-java-bridgeflink-table-planner-blinkflink-clientsflink-json提前感谢您的帮助!
发布于 2022-03-08 02:20:04
不幸的是,Flink Parquet格式只支持地图、数组和行类型,因为Flink 1.15 (参见FLINK-17782,尚未发布)。您可能希望在Flink版本发布后升级到1.15,或者根据主分支上的最新代码进行自己的实现。
https://stackoverflow.com/questions/71379852
复制相似问题