首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Iceberg的FlinkSink在流式写入时不更新元数据文件

Iceberg的FlinkSink在流式写入时不更新元数据文件
EN

Stack Overflow用户
提问于 2021-01-12 05:48:40
回答 1查看 305关注 0票数 0

我一直在尝试使用冰山的FlinkSink来消费数据并写入接收器。我成功地从kinesis获取了数据,并且我看到数据正在被写入到适当的分区中。但是,我没有看到metadata.json正在更新。没有它,我就不能查询表。

任何帮助或指点,我们将不胜感激。

以下是代码。

代码语言:javascript
复制
package test

import java.util.{Calendar, Properties}

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer
import org.apache.flink.streaming.connectors.kinesis.config.{AWSConfigConstants, ConsumerConfigConstants}
import org.apache.flink.table.data.{GenericRowData, RowData, StringData}
import org.apache.hadoop.conf.Configuration
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.flink.{CatalogLoader, TableLoader}
import org.apache.iceberg.flink.TableLoader.HadoopTableLoader
import org.apache.iceberg.flink.sink.FlinkSink
import org.apache.iceberg.hadoop.HadoopCatalog
import org.apache.iceberg.types.Types
import org.apache.iceberg.{PartitionSpec, Schema}

import scala.collection.JavaConverters._

object SampleApp {
  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val warehouse = "file://<local folder path>"

    val catalog = new HadoopCatalog(new Configuration(), warehouse)
    val ti = TableIdentifier.of("test_table")

    if (!catalog.tableExists(ti)) {
      println("table doesnt exist. creating it.")

      val schema = new Schema(
        Types.NestedField.optional(1, "message", Types.StringType.get()),
        Types.NestedField.optional(2, "year", Types.StringType.get()),
        Types.NestedField.optional(3, "month", Types.StringType.get()),
        Types.NestedField.optional(4, "date", Types.StringType.get()),
        Types.NestedField.optional(5, "hour", Types.StringType.get())
      )

      val props = Map(
        "write.metadata.delete-after-commit.enabled" -> "true",
        "write.metadata.previous-versions-max" -> "5",
        "write.target-file-size-bytes" -> "1048576"
      )

      val partitionSpec = PartitionSpec.builderFor(schema)
        .identity("year")
        .identity("month")
        .identity("date")
        .identity("hour")
        .build();

      catalog.createTable(ti, schema, partitionSpec, props.asJava)
    } else {
      println("table exists. not creating it.")
    }


    val inputProperties = new Properties()
    inputProperties.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1")
    inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST")

    val stream: DataStream[RowData] = env
      .addSource(new FlinkKinesisConsumer[String]("test_kinesis_stream", new SimpleStringSchema(), inputProperties))
      .map(x => {
        val now = Calendar.getInstance()
        GenericRowData.of(
          StringData.fromString(x),
          StringData.fromString(now.get(Calendar.YEAR).toString),
          StringData.fromString("%02d".format(now.get(Calendar.MONTH))),
          StringData.fromString("%02d".format(now.get(Calendar.DAY_OF_MONTH))),
          StringData.fromString("%02d".format(now.get(Calendar.HOUR_OF_DAY)))
        )
      })

    FlinkSink
      .forRowData(stream.javaStream)
      .tableLoader(TableLoader.fromHadoopTable(s"$warehouse/${ti.name()}", new Configuration()))
      .build()

    env.execute("test app")
  }
}

提前谢谢。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-02-22 18:11:19

您应该设置检查点:

代码语言:javascript
复制
env.enableCheckpointing(1000)
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65675000

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档