首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >单源多汇v/s平面图

单源多汇v/s平面图
EN

Stack Overflow用户
提问于 2021-01-29 03:14:49
回答 1查看 186关注 0票数 0

我在Flink上使用Kinesis Data Analytics进行流处理。

我正在处理的用例是从单个Kinesis流中读取记录,并在经过一些转换后写入多个S3存储桶。一个源记录可能会在多个S3存储桶中结束。我们需要写入多个存储桶,因为源记录包含大量信息,这些信息需要拆分到多个S3存储桶中。

我尝试使用多个接收器来实现这一点。

代码语言:javascript
复制
private static <T> SinkFunction<T> createS3SinkFromStaticConfig(String path, Class<T> type) {
        OutputFileConfig config = OutputFileConfig
                .builder()
                .withPartSuffix(".snappy.parquet")
                .build();


        final StreamingFileSink<T> sink = StreamingFileSink
                .forBulkFormat(new Path(s3SinkPath + "/" + path), createParquetWriter(type))
                .withBucketAssigner(new S3BucketAssigner<T>())
                .withOutputFileConfig(config)
                .withRollingPolicy(new RollingPolicy<T>(DEFAULT_MAX_PART_SIZE, DEFAULT_ROLLOVER_INTERVAL))
                .build();
        return sink;
}

public static void main(String[] args) throws Exception {
    DataStream<PIData> input = createSourceFromStaticConfig(env)
        .map(new JsonToSourceDataMap())
        .name("jsonToInputDataTransformation");


    input.map(value -> value)
        .name("rawData")
        .addSink(createS3SinkFromStaticConfig("raw_data", InputData.class))
        .name("s3Sink");

     input.map(FirstConverter::convertInputData)
        .addSink(createS3SinkFromStaticConfig("firstOutput", Output1.class));

    input.map(SecondConverter::convertInputData)
        .addSink(createS3SinkFromStaticConfig("secondOutput", Output2.class));

    input.map(ThirdConverter::convertInputData)
        .addSink(createS3SinkFromStaticConfig("thirdOutput", Output3.class));

    //and so on; There are around 10 buckets.
}

然而,我看到了这一点对性能的很大影响。我看到了一个很大的CPU峰值(与只有一个接收器的CPU相比)。我看到的规模大约是每秒100k条记录。

其他注意事项:我使用批量格式写入器,因为我想以拼图格式写入文件。我尝试将检查点间隔从1分钟增加到3分钟,假设每分钟向s3写入文件可能会导致问题。但这并没有多大帮助。

由于我是flink和流处理的新手,我不确定这样的性能影响是预期的,还是有什么我可以做得更好的?使用平面映射运算符,然后使用单个接收器会更好吗?

EN

回答 1

Stack Overflow用户

发布于 2021-01-29 21:21:16

当你有一个非常简单的管道,只有一个源和一个接收器时,就像这样:

source -> map -> sink

然后,Flink调度器能够优化执行,整个流水线作为单个任务中的函数调用序列运行--没有序列化或网络开销。Flink 1.12可以将这种操作符链优化应用于更复杂的拓扑--可能包括您现在拥有的具有多个接收器的拓扑--但我不相信Flink 1.11 (这是KDA当前所基于的)可以做到这一点。

我看不出使用平面地图会有什么不同。

你也许可以优化你的序列化/反序列化。参见https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65943799

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档