首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Apache :不能为Parquet写出复杂的数据类型

Apache :不能为Parquet写出复杂的数据类型
EN

Stack Overflow用户
提问于 2022-03-07 10:45:44
回答 1查看 541关注 0票数 2

我试图使用Apache将复杂的数据类型(例如Array、Map)写入Parquet文件格式。对于用例,我正在读取JSON文件中的数据,执行一些内部数据转换,然后尝试使用FileSink。

然而,这是行不通的。这很奇怪,因为拼花文件声明如下:

拼花是用复杂的嵌套数据结构从头开始构建的,并使用Dremel文件中描述的记录分解和组装算法。我们认为这种方法优于简单的嵌套名称空间的扁平。

我希望它能够正确地处理嵌套的数据类型,除非我做错了什么。

以下是错误消息:

代码语言:javascript
复制
Caused by: java.lang.UnsupportedOperationException: Unsupported type: ARRAY<DECIMAL(12, 6)>
    at org.apache.flink.formats.parquet.utils.ParquetSchemaConverter.convertToParquetType(ParquetSchemaConverter.java:615)
    at org.apache.flink.formats.parquet.utils.ParquetSchemaConverter.convertToParquetType(ParquetSchemaConverter.java:553)
    at org.apache.flink.formats.parquet.utils.ParquetSchemaConverter.convertToParquetMessageType(ParquetSchemaConverter.java:547)
    at org.apache.flink.formats.parquet.row.ParquetRowDataBuilder$ParquetWriteSupport.<init>(ParquetRowDataBuilder.java:72)
    at org.apache.flink.formats.parquet.row.ParquetRowDataBuilder$ParquetWriteSupport.<init>(ParquetRowDataBuilder.java:70)
    at org.apache.flink.formats.parquet.row.ParquetRowDataBuilder.getWriteSupport(ParquetRowDataBuilder.java:67)
    at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:563)
    at org.apache.flink.formats.parquet.row.ParquetRowDataBuilder$FlinkParquetBuilder.createWriter(ParquetRowDataBuilder.java:135)
    at org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:56)

下面是我使用的JSON文件,存储在src/main/resources/NESTED.json

代码语言:javascript
复制
{"discount":[670237.997082,634079.372133,303534.821218]}

源代码:

代码语言:javascript
复制
object ReadJsonNestedData {
  def main(args: Array[String]): Unit = {
    // setup
    val jsonResource = getClass.getResource("/NESTED.json")
    val jsonFilePath = jsonResource.getPath
    val tableName = "orders"
    val readJSONTable =
      s"""
         | CREATE TABLE $tableName (
         |   `discount` ARRAY<DECIMAL(12, 6)>
         | )WITH (
         |    'connector' = 'filesystem',
         |    'path' = '$jsonFilePath',
         |    'format' = 'json'
         |)""".stripMargin

    val colFields = Array("discount")
    val defaultDataTypes = Array(DataTypes.ARRAY(DataTypes.DECIMAL(12, 6)))
    val rowType = RowType.of(defaultDataTypes.map(_.getLogicalType), colFields)
    val defaultDataTypesAsList = defaultDataTypes.toList.asJava
    val dataType = new FieldsDataType(rowType, defaultDataTypesAsList)

    // Job
    val env = StreamExecutionEnvironment.getExecutionEnvironment()
    val tableEnv = StreamTableEnvironment.create(env)
    tableEnv.executeSql(readJSONTable)
    val ordersTable = tableEnv.from(tableName)
    val dataStream = tableEnv
      .toDataStream(ordersTable)
      .map(new ConvertRowToRowDataMapFunction(dataType))
    val sink = FileSink
      .forBulkFormat(
        WriteParquetJobExample.outputBasePath,
        ParquetRowDataBuilder.createWriterFactory(rowType, Config.hadoopConfig, true)
      )
      .build()
    dataStream.sinkTo(sink)
    env.execute()
  }
}

class ConvertRowToRowDataMapFunction(fieldsDataType: FieldsDataType)
    extends RichMapFunction[Row, RowData] {
  private final val rowRowConverter = RowRowConverter.create(fieldsDataType)

  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
    rowRowConverter.open(this.getClass.getClassLoader)
  }

  override def map(row: Row): RowData =
    this.rowRowConverter.toInternal(row)
}

环境:

  • IntelliJ 2021.3.2 (终极版)
  • AdoptOpenJDK 1.8
  • Scala:2.12.15
  • Flink:1.13.5
  • 使用的Flink库(在本例中):
    • flink-table-api-java-bridge
    • flink-table-planner-blink
    • flink-clients
    • flink-json

提前感谢您的帮助!

EN

回答 1

Stack Overflow用户

发布于 2022-03-08 02:20:04

不幸的是,Flink Parquet格式只支持地图、数组和行类型,因为Flink 1.15 (参见FLINK-17782,尚未发布)。您可能希望在Flink版本发布后升级到1.15,或者根据主分支上的最新代码进行自己的实现。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71379852

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档