我必须迭代超过130个数据传输对象,每次都会生成一个json,然后上传到aws S3。
在没有任何改进的情况下,需要大约90秒的时间完成整个过程。我试着使用lamba,而不是lamba,两者的结果相同。
for(AbstractDTO dto: dtos) {
try {
processDTO(dealerCode, yearPeriod, monthPeriod, dto);
} catch (FileAlreadyExistsInS3Exception e) {
failedToUploadDTOs.add(e.getLocalizedMessage() + ": " + dto.fileName() + ".json");
}
}dtos.stream().forEach(dto -> {
try {
processDTO(dealerCode, yearPeriod, monthPeriod, dto);
} catch (FileAlreadyExistsInS3Exception e) {
failedToUploadDTOs.add(e.getLocalizedMessage() + ": " + dto.fileName() + ".json");
}
});经过一些调查,我得出结论,processDTO方法每项运行大约需要0.650ms。
我的第一次尝试是使用并行流,结果非常好,花费了15秒来完成整个过程:
dtos.parallelStream().forEach(dto -> {
try {
processDTO(dealerCode, yearPeriod, monthPeriod, dto);
} catch (FileAlreadyExistsInS3Exception e) {
failedToUploadDTOs.add(e.getLocalizedMessage() + ": " + dto.fileName() + ".json");
}
});但我还是需要缩短时间。我研究了如何改进并行流,并发现了ForkJoinPool技巧:
ForkJoinPool forkJoinPool = new ForkJoinPool(PARALLELISM_NUMBER);
forkJoinPool.submit(() ->
dtos.parallelStream().forEach(dto -> {
try {
processDTO(dealerCode, yearPeriod, monthPeriod, dto);
} catch (FileAlreadyExistsInS3Exception e) {
failedToUploadDTOs.add(e.getLocalizedMessage() + ": " + dto.fileName() + ".json");
}
})).get();
forkJoinPool.shutdown();不幸的是,结果让我有点困惑。
所有测试都使用postman请求完成,调用控制器方法,该方法将结束对130个条目的迭代()。
我对5秒感到满意,使用32作为PARALLELISM_NUMBER,但我担心后果。
我在Mac2.2GHz的I7上运行
sysctl hw.physicalcpu hw.logicalcp
hw.physicalcpu: 4
hw.logicalcpu: 8下面是processDTO所做的工作:
private void processDTO(int dealerCode, int yearPeriod, int monthPeriod, AbstractDTO dto) throws FileAlreadyExistsInS3Exception {
String flatJson = JsonFlattener.flatten(new JSONObject(dto).toString());
String jsonFileName = dto.fileName() + JSON_TYPE;;
String jsonFilePath = buildFilePathNew(dto.endpoint(), dealerCode, yearPeriod, monthPeriod, AWS_S3_JSON_ROOT_FOLDER);
uploadFileToS3(jsonFilePath + jsonFileName, flatJson);
}public void uploadFileToS3(String fileName, String fileContent) throws FileAlreadyExistsInS3Exception {
if (s3client.doesObjectExist(bucketName, fileName)) {
throw new FileAlreadyExistsInS3Exception(ErrorMessages.FILE_ALREADY_EXISTS_IN_S3.getMessage());
}
s3client.putObject(bucketName, fileName, fileContent);
}发布于 2019-04-03 19:51:32
由于您的所有有益建议和解释,我设法将缩短到了8秒。
因为瓶颈是上传到aws s3,而且您在aws上提到了一个非阻塞API,经过一些研究后,我发现TransferManager类包含一个非阻塞上传。
因此,我没有使用ForkJoinPool来增加线程数,而是保留了简单的parallelStream:
dtos.parallelStream().forEach(dto -> {
try {
processDTO(dealerCode, yearPeriod, monthPeriod, dto);
} catch (FileAlreadyExistsInS3Exception e) {
failedToUploadDTOs.add(e.getLocalizedMessage() + ": " + dto.fileName() + ".json");
}
});uploadToS3Method稍微改变了一点,我没有使用AmazonS3,而是使用了TransferManager
public Upload uploadAsyncFileToS3(String fileName, String fileContent) throws FileAlreadyExistsInS3Exception {
if (s3client.doesObjectExist(bucketName, fileName)) {
throw new FileAlreadyExistsInS3Exception(ErrorMessages.FILE_ALREADY_EXISTS_IN_S3.getMessage());
}
InputStream targetStream = new ByteArrayInputStream(fileContent.getBytes());
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(fileContent.getBytes().length);
return transferManager.upload(bucketName, fileName, targetStream, metadata);
}这样,当上传被调用时,它不会等待它完成,让另一个DTO被处理。在处理所有DTO时,我检查它们的上载状态,以查看可能的错误(在第一个forEach之外)
发布于 2019-04-03 17:04:11
parallelism参数决定ForkJoinPool将使用多少线程。这就是为什么默认情况下,parallelism值是可用的CPU核心计数:
Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors())在您的示例中,瓶颈应该是检查文件是否存在并将其上传到S3。这里的时间至少取决于几个因素: CPU、网卡和驱动程序、操作系统等等。在您的情况下,似乎S3网络操作时间不受CPU限制,因为您正在通过创建更多模拟工作线程来观察到改进,也许网络请求是由操作系统排队的。
parallelism的正确值因工作负载类型的不同而不同。由于上下文切换的负面影响,默认的parallelism等于CPU核心的情况下,CPU绑定工作流更好。像您这样的非CPU绑定工作负载可以通过更多的工作线程来加速,假设工作负载不会阻塞CPU,例如忙着等待。
parallelism在ForkJoinPool中没有一个单一的理想值。
https://stackoverflow.com/questions/55500189
复制相似问题