首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Java completableFuture:如何并行化多个CompletableFutures

Java completableFuture:如何并行化多个CompletableFutures
EN

Stack Overflow用户
提问于 2021-07-21 18:55:43
回答 1查看 38关注 0票数 2

下面是我的代码:

代码语言:javascript
复制
final int startIndex = LoaderConstants.ServeiTerritorial.DEFAULT_START_INDEX; 
final long pageSize = LoaderConstants.ServeiTerritorial.DEFAULT_PAGE_SIZE;
final String tableName = LoaderConstants.ServeiTerritorial.TIPUS_VIA_TABLE_NAME;
final String ownerType = LoaderConstants.ServeiTerritorial.TIPUS_VIA_OWNER_TYPE;

LongStream
    .iterate(1, n -> n <= 1000 / pageSize, n -> n+1)
    .mapToObj(pageNumber -> this.buildCompletableFutureOfResultSetType(tableName, ownerType, pageNumber * pageSize, pageSize))
    .map(CompletableFuture::join)

我想要得到的是并行化每一项。

首先,对于每个页面,我构建了一个CompletableFuture

代码语言:javascript
复制
/**
* Builds a {@link CompletableFuture} in order to get oid.
*/
private CompletableFuture<ResultSetType> buildCompletableFutureOfResultSetType(
    final String tableName,
    final String owner,
    final long pageNumber,
    final long pageSize
) {
    Supplier<ResultSetType> supplier = () -> this.serveiTerritorialCatalegsClientRepository.getCataleg(tableName, null, owner, null, null, null, pageNumber, pageSize);

    return CompletableFuture.supplyAsync(supplier, this.servidorTerminologicExecutor);
}

当我意识到它是按顺序执行时,我认为它工作得很好。以下是我的日志:

代码语言:javascript
复制
2021-07-21 12:46:35.894 DEBUG [hes-mpi-imdg-loader,9ffb4627802548bf,9ffb4627802548bf,false] 22536 --- [ool-1-thread-38] ServeiTerritorialOidClientRepositoryImpl : Preparant crida a servei-territorial.getOid (oid: 2.16.724.4.402)
2021-07-21 12:46:35.895 DEBUG [hes-mpi-imdg-loader,9ffb4627802548bf,9ffb4627802548bf,false] 22536 --- [ool-1-thread-38] ServeiTerritorialOidClientRepositoryImpl : Creant petició cap a servei-territorial (request: <request><oid>2.16.724.4.402</oid><startIndex>13801</startIndex><pageSize>100</pageSize></request>)
2021-07-21 12:46:35.896 DEBUG [hes-mpi-imdg-loader,9ffb4627802548bf,9ffb4627802548bf,false] 22536 --- [ool-1-thread-38] ServeiTerritorialOidClientRepositoryImpl : Enviant petició a servei-territorial (request: <request><oid>2.16.724.4.402</oid><startIndex>13801</startIndex><pageSize>100</pageSize></request>)
2021-07-21 12:46:35.985 DEBUG [hes-mpi-imdg-loader,501c638f07534a0d,501c638f07534a0d,false] 22536 --- [ool-1-thread-39] ServeiTerritorialOidClientRepositoryImpl : Preparant crida a servei-territorial.getOid (oid: 2.16.724.4.402)
2021-07-21 12:46:35.986 DEBUG [hes-mpi-imdg-loader,501c638f07534a0d,501c638f07534a0d,false] 22536 --- [ool-1-thread-39] ServeiTerritorialOidClientRepositoryImpl : Creant petició cap a servei-territorial (request: <request><oid>2.16.724.4.402</oid><startIndex>13901</startIndex><pageSize>100</pageSize></request>)
2021-07-21 12:46:35.987 DEBUG [hes-mpi-imdg-loader,501c638f07534a0d,501c638f07534a0d,false] 22536 --- [ool-1-thread-39] ServeiTerritorialOidClientRepositoryImpl : Enviant petició a servei-territorial (request: <request><oid>2.16.724.4.402</oid><startIndex>13901</startIndex><pageSize>100</pageSize></request>)
2021-07-21 12:46:36.057 DEBUG [hes-mpi-imdg-loader,d840cc01f137b810,d840cc01f137b810,false] 22536 --- [ool-1-thread-40] ServeiTerritorialOidClientRepositoryImpl : Preparant crida a servei-territorial.getOid (oid: 2.16.724.4.402)
2021-07-21 12:46:36.058 DEBUG [hes-mpi-imdg-loader,d840cc01f137b810,d840cc01f137b810,false] 22536 --- [ool-1-thread-40] ServeiTerritorialOidClientRepositoryImpl : Creant petició cap a servei-territorial (request: <request><oid>2.16.724.4.402</oid><startIndex>14001</startIndex><pageSize>100</pageSize></request>)
2021-07-21 12:46:36.061 DEBUG [hes-mpi-imdg-loader,d840cc01f137b810,d840cc01f137b810,false] 22536 --- [ool-1-thread-40] ServeiTerritorialOidClientRepositoryImpl : Enviant petició a servei-territorial (request: <request><oid>2.16.724.4.402</oid><startIndex>14001</startIndex><pageSize>100</pageSize></request>)
2021-07-21 12:46:36.141 DEBUG [hes-mpi-imdg-loader,9ec5d6687eb6e9aa,9ec5d6687eb6e9aa,false] 22536 --- [ool-1-thread-41] ServeiTerritorialOidClientRepositoryImpl : Preparant crida a servei-territorial.getOid (oid: 2.16.724.4.402)
2021-07-21 12:46:36.142 DEBUG [hes-mpi-imdg-loader,9ec5d6687eb6e9aa,9ec5d6687eb6e9aa,false] 22536 --- [ool-1-thread-41] ServeiTerritorialOidClientRepositoryImpl : Creant petició cap a servei-territorial (request: <request><oid>2.16.724.4.402</oid><startIndex>14101</startIndex><pageSize>100</pageSize></request>)
2021-07-21 12:46:36.142 DEBUG [hes-mpi-imdg-loader,9ec5d6687eb6e9aa,9ec5d6687eb6e9aa,false] 22536 --- [ool-1-thread-41] ServeiTerritorialOidClientRepositoryImpl : Enviant petició a servei-territorial (request: <request><oid>2.16.724.4.402</oid><startIndex>14101</startIndex><pageSize>100</pageSize></request>)

我不太明白我做错了什么。

有什么想法吗?

EN

回答 1

Stack Overflow用户

发布于 2021-07-21 19:13:28

您将在中间步骤中使用CompletableFutre.join调用来运行此顺序流。

问题是:元素一次遍历(顺序)流中的一个,并遍历每个步骤。当然,这包括join()调用。这意味着对于每个元素,整个操作(包括“异步”部分,this.serveiTerritorialCatalegsClientRepository.getCataleg)必须在随后的元素进入处理之前完成。

为了解决这个问题,在开始调用join()调用之前,强制您的管道为所有元素创建未来。像这样的东西应该是有效的:

代码语言:javascript
复制
List<CompletableFuture<ResultSetType>> submittedTasks = LongStream
    .iterate(1, n -> n <= 1000 / pageSize, n -> n+1)
    .mapToObj(pageNumber -> this.buildCompletableFutureOfResultSetType(tableName, 
                                       ownerType, pageNumber * pageSize,
                                       pageSize))
    .collect(Collectors.toList());

这样,终端collect()将收集所有提交的CompletableFuture对象,而不会阻塞。您不需要收集到一个列表,但是任何强制提交异步任务的终端操作都应该这样做;只要您能够对每个任务分别调用join

在此之后,您可以阻塞,因为您知道所有异步任务都已启动或至少已排队。

代码语言:javascript
复制
submittedTasks.stream()
    .map(CompletableFuture::join)
    .forEach(...) //some terminal operation

这样,您就避免了不必要的阻塞代码。

我注意到我提到过几次“顺序”,但这并不意味着在并行流上运行相同的代码就能解决它。主要问题仍然存在,尽管吞吐量可能会提高,因为您将并行阻塞

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68468272

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档