首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Flink Hadoop用多个并行桶执行斗槽的性能

Flink Hadoop用多个并行桶执行斗槽的性能
EN

Stack Overflow用户
提问于 2020-02-07 10:35:00
回答 1查看 402关注 0票数 1

我正在调查一个Flink作业的性能,它将数据从Kafka传输到一个S3水槽。我们正在使用BucketingSink来编写拼花文件。存储逻辑将每个类型的数据、租户(客户)、日期-时间、提取Id等划分为一个文件夹。这导致每个文件存储在由9-10层组成的文件夹结构中(S3_桶:/1/2/3/4/5/6/7/8/9/myFile.

如果数据以突发消息的形式分发给租户类型,我们可以看到良好的书面性能,但是当数据更多地分布在数千个租户、数十个数据类型和多个提取is上时,我们就会失去难以置信的性能。(约为300倍)

附加调试器时,问题似乎与S3上同时打开的用于写入数据的处理程序的数量有关。更具体而言:

通过对用于向S3写入的hadoop库的研究,我发现了一些可能的改进设置:

代码语言:javascript
复制
      <name>fs.s3a.connection.maximum</name>
      <name>fs.s3a.threads.max</name>
      <name>fs.s3a.threads.core</name>
      <name>fs.s3a.max.total.tasks</name>

但所有这些都没有对吞吐量产生很大影响。我还试图将文件夹结构扁平化,以便写入单个键,如(1_2_3_.)但这并没有带来任何改善。

注意:这些测试是用Hadoop FileSystem (BucketingSink)在Flink 1.8上完成的,它使用hadoop库2.6.x (因为我们将ClouderaCDH5.x用于保存点)写入S3,所以我们不能切换到StreamingFileSink。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-02-07 10:35:00

根据科斯塔斯在https://lists.apache.org/thread.html/50ef4d26a1af408df8d9abb70589699cb6b26b2600ab6f4464e86ea4%40%3Cdev.flink.apache.org%3E的建议

慢下来的罪魁祸首是这段代码:https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L543-L551

仅这一项就需要4到5秒左右的时间,总共需要6秒来打开文件。来自已检测的调用的日志:

代码语言:javascript
复制
2020-02-07 08:51:05,825 INFO  BucketingSink  - openNewPartFile FS verification
2020-02-07 08:51:09,906 INFO  BucketingSink  - openNewPartFile FS verification - done
2020-02-07 08:51:11,181 INFO  BucketingSink  - openNewPartFile FS - completed partPath = s3a://....

这与60秒无活动翻转的水桶池的默认设置一起意味着,当我们创建完最后一个桶时,一个槽上有10个以上的并行桶,第一个桶就过时了,因此需要旋转以产生阻塞情况。

我们通过替换BucketingSink.java和删除上面提到的FS检查来解决这个问题:

代码语言:javascript
复制
        LOG.debug("Opening new part file FS verification");
        if (!fs.exists(bucketPath)) {
            try {
                if (fs.mkdirs(bucketPath)) {
                    LOG.debug("Created new bucket directory: {}", bucketPath);
                }
            }
            catch (IOException e) {
                throw new RuntimeException("Could not create new bucket path.", e);
            }
        }
        LOG.debug("Opening new part file FS verification - done");

正如我们所看到的,没有它的接收器工作正常,现在打开文件需要1.2秒。

此外,我们将默认的非活动阈值设置为5分钟。通过这种更改,我们可以轻松地处理每个时隙200多个桶(一旦作业速度加快,就会在所有插槽上摄入,因此延迟了不活动的超时)

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60111692

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档