下面是我的代码:
final int startIndex = LoaderConstants.ServeiTerritorial.DEFAULT_START_INDEX;
final long pageSize = LoaderConstants.ServeiTerritorial.DEFAULT_PAGE_SIZE;
final String tableName = LoaderConstants.ServeiTerritorial.TIPUS_VIA_TABLE_NAME;
final String ownerType = LoaderConstants.ServeiTerritorial.TIPUS_VIA_OWNER_TYPE;
LongStream
.iterate(1, n -> n <= 1000 / pageSize, n -> n+1)
.mapToObj(pageNumber -> this.buildCompletableFutureOfResultSetType(tableName, ownerType, pageNumber * pageSize, pageSize))
.map(CompletableFuture::join)我想要得到的是并行化每一项。
首先,对于每个页面,我构建了一个CompletableFuture
/**
* Builds a {@link CompletableFuture} in order to get oid.
*/
private CompletableFuture<ResultSetType> buildCompletableFutureOfResultSetType(
final String tableName,
final String owner,
final long pageNumber,
final long pageSize
) {
Supplier<ResultSetType> supplier = () -> this.serveiTerritorialCatalegsClientRepository.getCataleg(tableName, null, owner, null, null, null, pageNumber, pageSize);
return CompletableFuture.supplyAsync(supplier, this.servidorTerminologicExecutor);
}当我意识到它是按顺序执行时,我认为它工作得很好。以下是我的日志:
2021-07-21 12:46:35.894 DEBUG [hes-mpi-imdg-loader,9ffb4627802548bf,9ffb4627802548bf,false] 22536 --- [ool-1-thread-38] ServeiTerritorialOidClientRepositoryImpl : Preparant crida a servei-territorial.getOid (oid: 2.16.724.4.402)
2021-07-21 12:46:35.895 DEBUG [hes-mpi-imdg-loader,9ffb4627802548bf,9ffb4627802548bf,false] 22536 --- [ool-1-thread-38] ServeiTerritorialOidClientRepositoryImpl : Creant petició cap a servei-territorial (request: <request><oid>2.16.724.4.402</oid><startIndex>13801</startIndex><pageSize>100</pageSize></request>)
2021-07-21 12:46:35.896 DEBUG [hes-mpi-imdg-loader,9ffb4627802548bf,9ffb4627802548bf,false] 22536 --- [ool-1-thread-38] ServeiTerritorialOidClientRepositoryImpl : Enviant petició a servei-territorial (request: <request><oid>2.16.724.4.402</oid><startIndex>13801</startIndex><pageSize>100</pageSize></request>)
2021-07-21 12:46:35.985 DEBUG [hes-mpi-imdg-loader,501c638f07534a0d,501c638f07534a0d,false] 22536 --- [ool-1-thread-39] ServeiTerritorialOidClientRepositoryImpl : Preparant crida a servei-territorial.getOid (oid: 2.16.724.4.402)
2021-07-21 12:46:35.986 DEBUG [hes-mpi-imdg-loader,501c638f07534a0d,501c638f07534a0d,false] 22536 --- [ool-1-thread-39] ServeiTerritorialOidClientRepositoryImpl : Creant petició cap a servei-territorial (request: <request><oid>2.16.724.4.402</oid><startIndex>13901</startIndex><pageSize>100</pageSize></request>)
2021-07-21 12:46:35.987 DEBUG [hes-mpi-imdg-loader,501c638f07534a0d,501c638f07534a0d,false] 22536 --- [ool-1-thread-39] ServeiTerritorialOidClientRepositoryImpl : Enviant petició a servei-territorial (request: <request><oid>2.16.724.4.402</oid><startIndex>13901</startIndex><pageSize>100</pageSize></request>)
2021-07-21 12:46:36.057 DEBUG [hes-mpi-imdg-loader,d840cc01f137b810,d840cc01f137b810,false] 22536 --- [ool-1-thread-40] ServeiTerritorialOidClientRepositoryImpl : Preparant crida a servei-territorial.getOid (oid: 2.16.724.4.402)
2021-07-21 12:46:36.058 DEBUG [hes-mpi-imdg-loader,d840cc01f137b810,d840cc01f137b810,false] 22536 --- [ool-1-thread-40] ServeiTerritorialOidClientRepositoryImpl : Creant petició cap a servei-territorial (request: <request><oid>2.16.724.4.402</oid><startIndex>14001</startIndex><pageSize>100</pageSize></request>)
2021-07-21 12:46:36.061 DEBUG [hes-mpi-imdg-loader,d840cc01f137b810,d840cc01f137b810,false] 22536 --- [ool-1-thread-40] ServeiTerritorialOidClientRepositoryImpl : Enviant petició a servei-territorial (request: <request><oid>2.16.724.4.402</oid><startIndex>14001</startIndex><pageSize>100</pageSize></request>)
2021-07-21 12:46:36.141 DEBUG [hes-mpi-imdg-loader,9ec5d6687eb6e9aa,9ec5d6687eb6e9aa,false] 22536 --- [ool-1-thread-41] ServeiTerritorialOidClientRepositoryImpl : Preparant crida a servei-territorial.getOid (oid: 2.16.724.4.402)
2021-07-21 12:46:36.142 DEBUG [hes-mpi-imdg-loader,9ec5d6687eb6e9aa,9ec5d6687eb6e9aa,false] 22536 --- [ool-1-thread-41] ServeiTerritorialOidClientRepositoryImpl : Creant petició cap a servei-territorial (request: <request><oid>2.16.724.4.402</oid><startIndex>14101</startIndex><pageSize>100</pageSize></request>)
2021-07-21 12:46:36.142 DEBUG [hes-mpi-imdg-loader,9ec5d6687eb6e9aa,9ec5d6687eb6e9aa,false] 22536 --- [ool-1-thread-41] ServeiTerritorialOidClientRepositoryImpl : Enviant petició a servei-territorial (request: <request><oid>2.16.724.4.402</oid><startIndex>14101</startIndex><pageSize>100</pageSize></request>)我不太明白我做错了什么。
有什么想法吗?
发布于 2021-07-21 19:13:28
您将在中间步骤中使用CompletableFutre.join调用来运行此顺序流。
问题是:元素一次遍历(顺序)流中的一个,并遍历每个步骤。当然,这包括join()调用。这意味着对于每个元素,整个操作(包括“异步”部分,this.serveiTerritorialCatalegsClientRepository.getCataleg)必须在随后的元素进入处理之前完成。
为了解决这个问题,在开始调用join()调用之前,强制您的管道为所有元素创建未来。像这样的东西应该是有效的:
List<CompletableFuture<ResultSetType>> submittedTasks = LongStream
.iterate(1, n -> n <= 1000 / pageSize, n -> n+1)
.mapToObj(pageNumber -> this.buildCompletableFutureOfResultSetType(tableName,
ownerType, pageNumber * pageSize,
pageSize))
.collect(Collectors.toList());这样,终端collect()将收集所有提交的CompletableFuture对象,而不会阻塞。您不需要收集到一个列表,但是任何强制提交异步任务的终端操作都应该这样做;只要您能够对每个任务分别调用join。
在此之后,您可以阻塞,因为您知道所有异步任务都已启动或至少已排队。
submittedTasks.stream()
.map(CompletableFuture::join)
.forEach(...) //some terminal operation这样,您就避免了不必要的阻塞代码。
我注意到我提到过几次“顺序”,但这并不意味着在并行流上运行相同的代码就能解决它。主要问题仍然存在,尽管吞吐量可能会提高,因为您将并行阻塞
https://stackoverflow.com/questions/68468272
复制相似问题