首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Apache & Iceberg:无法处理100种RowData类型

Apache & Iceberg:无法处理100种RowData类型
EN

Stack Overflow用户
提问于 2022-10-18 09:47:49
回答 1查看 85关注 0票数 0

我有一个Flink应用程序,它读取任意AVRO数据,将其映射到RowData,并使用几个FlinkSink实例将数据写入冰山表。对于任意数据,我的意思是我有100种类型的AVRO消息,它们都具有公共属性"tableName“,但包含不同的列。我想将每种类型的消息写入一个独立的Iceberg表中。

为此,我使用了侧输出:当我将数据映射到RowData时,我使用ProcessFunction将每条消息写入特定的OutputTag。

稍后,使用已经处理的datastream,我循环进入不同的输出标记,使用getSideOutput获取记录,并为每个输出标记创建一个特定的IcebergSink。类似于:

代码语言:javascript
复制
        final List<OutputTag<RowData>> tags = ... // list of all possible output tags

        final DataStream<RowData> rowdata = stream
                .map(new ToRowDataMap()) // Map Custom Avro Pojo into RowData
                .uid("map-row-data")
                .name("Map to RowData")
                .process(new ProcessRecordFunction(tags)) // process elements one by one sending them to a specific OutputTag
                .uid("id-process-record")
                .name("Process Input records");;

        CatalogLoader catalogLoader = ...
        String upsertField = ...
     
        outputTags
                .stream()
                .forEach(tag -> {
                    SingleOutputStreamOperator<RowData> outputStream = stream
                            .getSideOutput(tag);

                    TableIdentifier identifier = TableIdentifier.of("myDBName", tag.getId());

                    FlinkSink.Builder builder = FlinkSink
                            .forRowData(outputStream)
                            .table(catalog.loadTable(identifier))
                            .tableLoader(TableLoader.fromCatalog(catalogLoader, identifier))
                            .set("upsert-enabled", "true")
                            .uidPrefix("commiter-sink-" + tableName)
                            .equalityFieldColumns(Collections.singletonList(upsertField));
                    builder.append();
                });

当我处理几张桌子时,效果很好。但是,当表的数量增加时,Flink无法查询足够的任务资源,因为每个Sink需要两个不同的操作符(因为https://iceberg.apache.org/javadoc/0.10.0/org/apache/iceberg/flink/sink/FlinkSink.html的内部结构)。

还有其他更有效的方法吗?或者任何优化它的方法?

谢谢!)

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-10-25 10:15:30

考虑到您的问题,我假设您的运营商中大约有一半是充分利用的IcebergStreamWriter,另一半是很少使用的IcebergFilesCommitter。

您可以通过以下方法优化服务器的资源使用:

  • 增加TaskManagers (taskmanager.numberOfTaskSlots) [1]上的插槽数量--因此空闲IcebergFilesCommitter操作符没有使用的CPU将由TaskManager上的其他操作符使用。
  • 增加提供给TaskManagers (taskmanager.memory.process.size) [2]的资源--这有助于在运行中的TaskManager上的操作符之间分配JVM内存开销(不要忘记并行地增加插槽,以开始使用额外的资源:)

为TaskManagers添加更多插槽可能会导致运营商争夺CPU,内存仍然保留给“空闲”任务。[3]

也许这种Flink架构也可以使用[4]

希望这能帮上忙,彼得

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/74108950

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档