首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何提高将130多个项上传到aws s3的性能

如何提高将130多个项上传到aws s3的性能
EN

Stack Overflow用户
提问于 2019-04-03 16:39:01
回答 2查看 729关注 0票数 9

我必须迭代超过130个数据传输对象,每次都会生成一个json,然后上传到aws S3。

在没有任何改进的情况下,需要大约90秒的时间完成整个过程。我试着使用lamba,而不是lamba,两者的结果相同。

代码语言:javascript
复制
for(AbstractDTO dto: dtos) {
    try {
        processDTO(dealerCode, yearPeriod, monthPeriod, dto);
    } catch (FileAlreadyExistsInS3Exception e) {
        failedToUploadDTOs.add(e.getLocalizedMessage() + ": " + dto.fileName() + ".json");
    }
}
代码语言:javascript
复制
dtos.stream().forEach(dto -> {
    try {
        processDTO(dealerCode, yearPeriod, monthPeriod, dto);
    } catch (FileAlreadyExistsInS3Exception e) {
        failedToUploadDTOs.add(e.getLocalizedMessage() + ": " + dto.fileName() + ".json");
    }
});

经过一些调查,我得出结论,processDTO方法每项运行大约需要0.650ms。

我的第一次尝试是使用并行流,结果非常好,花费了15秒来完成整个过程:

代码语言:javascript
复制
dtos.parallelStream().forEach(dto -> {
    try {
        processDTO(dealerCode, yearPeriod, monthPeriod, dto);
    } catch (FileAlreadyExistsInS3Exception e) {
        failedToUploadDTOs.add(e.getLocalizedMessage() + ": " + dto.fileName() + ".json");
    }
});

但我还是需要缩短时间。我研究了如何改进并行流,并发现了ForkJoinPool技巧:

代码语言:javascript
复制
ForkJoinPool forkJoinPool = new ForkJoinPool(PARALLELISM_NUMBER);
forkJoinPool.submit(() ->
dtos.parallelStream().forEach(dto -> {
    try {
        processDTO(dealerCode, yearPeriod, monthPeriod, dto);
    } catch (FileAlreadyExistsInS3Exception e) {
        failedToUploadDTOs.add(e.getLocalizedMessage() + ": " + dto.fileName() + ".json");
    }
})).get();
forkJoinPool.shutdown();

不幸的是,结果让我有点困惑。

  • 当PARALLELISM_NUMBER为8时,大约需要13秒才能完成整个过程。不是什么大进步。
  • 当PARALLELISM_NUMBER为16时,大约需要8秒才能完成整个过程。
  • 当PARALLELISM_NUMBER为32时,大约需要5秒才能完成整个过程。

所有测试都使用postman请求完成,调用控制器方法,该方法将结束对130个条目的迭代()。

我对5秒感到满意,使用32作为PARALLELISM_NUMBER,但我担心后果。

  • 保留32块可以吗?
  • 什么是理想的PARALLELISM_NUMBER?
  • 当我决定它的价值时,我必须记住什么?

我在Mac2.2GHz的I7上运行

代码语言:javascript
复制
sysctl hw.physicalcpu hw.logicalcp
hw.physicalcpu: 4
hw.logicalcpu: 8

下面是processDTO所做的工作:

代码语言:javascript
复制
private void processDTO(int dealerCode, int yearPeriod, int monthPeriod, AbstractDTO dto) throws FileAlreadyExistsInS3Exception {
    String flatJson = JsonFlattener.flatten(new JSONObject(dto).toString());
    String jsonFileName = dto.fileName() + JSON_TYPE;;
    String jsonFilePath = buildFilePathNew(dto.endpoint(), dealerCode, yearPeriod, monthPeriod, AWS_S3_JSON_ROOT_FOLDER);
    uploadFileToS3(jsonFilePath + jsonFileName, flatJson);
}
代码语言:javascript
复制
public void uploadFileToS3(String fileName, String fileContent) throws FileAlreadyExistsInS3Exception {
    if (s3client.doesObjectExist(bucketName, fileName)) {
        throw new FileAlreadyExistsInS3Exception(ErrorMessages.FILE_ALREADY_EXISTS_IN_S3.getMessage());
    }
    s3client.putObject(bucketName, fileName, fileContent);
}
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2019-04-03 19:51:32

由于您的所有有益建议和解释,我设法将缩短到了8秒

因为瓶颈是上传到aws s3,而且您在aws上提到了一个非阻塞API,经过一些研究后,我发现TransferManager类包含一个非阻塞上传。

TransferManager类

因此,我没有使用ForkJoinPool来增加线程数,而是保留了简单的parallelStream:

代码语言:javascript
复制
        dtos.parallelStream().forEach(dto -> {
            try {
                processDTO(dealerCode, yearPeriod, monthPeriod, dto);
            } catch (FileAlreadyExistsInS3Exception e) {
                failedToUploadDTOs.add(e.getLocalizedMessage() + ": " + dto.fileName() + ".json");
            }
        });

uploadToS3Method稍微改变了一点,我没有使用AmazonS3,而是使用了TransferManager

代码语言:javascript
复制
public Upload uploadAsyncFileToS3(String fileName, String fileContent) throws FileAlreadyExistsInS3Exception {
        if (s3client.doesObjectExist(bucketName, fileName)) {
            throw new FileAlreadyExistsInS3Exception(ErrorMessages.FILE_ALREADY_EXISTS_IN_S3.getMessage());
        }
        InputStream targetStream = new ByteArrayInputStream(fileContent.getBytes());
        ObjectMetadata metadata = new ObjectMetadata();
        metadata.setContentLength(fileContent.getBytes().length);
        return transferManager.upload(bucketName, fileName, targetStream, metadata);
}

这样,当上传被调用时,它不会等待它完成,让另一个DTO被处理。在处理所有DTO时,我检查它们的上载状态,以查看可能的错误(在第一个forEach之外)

票数 0
EN

Stack Overflow用户

发布于 2019-04-03 17:04:11

parallelism参数决定ForkJoinPool将使用多少线程。这就是为什么默认情况下,parallelism值是可用的CPU核心计数:

代码语言:javascript
复制
Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors())

在您的示例中,瓶颈应该是检查文件是否存在并将其上传到S3。这里的时间至少取决于几个因素: CPU、网卡和驱动程序、操作系统等等。在您的情况下,似乎S3网络操作时间不受CPU限制,因为您正在通过创建更多模拟工作线程来观察到改进,也许网络请求是由操作系统排队的。

parallelism的正确值因工作负载类型的不同而不同。由于上下文切换的负面影响,默认的parallelism等于CPU核心的情况下,CPU绑定工作流更好。像您这样的非CPU绑定工作负载可以通过更多的工作线程来加速,假设工作负载不会阻塞CPU,例如忙着等待

parallelismForkJoinPool中没有一个单一的理想值。

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55500189

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档