首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用CompletableFuture supplyAsync的集合进行并行处理,然后收集结果

使用CompletableFuture supplyAsync的集合进行并行处理,然后收集结果
EN

Stack Overflow用户
提问于 2017-04-09 02:59:38
回答 1查看 6.8K关注 0票数 1
代码语言:javascript
复制
//Unit of logic I want to make it to run in parallel
public PagesDTO convertOCRStreamToDTO(String pageId, Integer pageSequence) throws Exception {
    LOG.info("Get OCR begin for pageId [{}] thread name {}",pageId, Thread.currentThread().getName());
    OcrContent ocrContent = getOcrContent(pageId);
    OcrDTO ocrData = populateOCRData(ocrContent.getInputStream());
    PagesDTO pageDTO = new PagesDTO(pageId, pageSequence.toString(), ocrData);
    return pageDTO; 
}

执行convertOCRStreamToDTO(..)的逻辑然后并行地收集执行单个线程时的结果。

代码语言:javascript
复制
List<PagesDTO> pageDTOList = new ArrayList<>();
//javadoc: Creates a work-stealing thread pool using all available processors as its target parallelism level.
ExecutorService newWorkStealingPool = Executors.newWorkStealingPool(); 
Instant start = Instant.now();
List<CompletableFuture<PagesDTO>> pendingTasks = new ArrayList<>();
List<CompletableFuture<PagesDTO>> completedTasks = new ArrayList<>();
CompletableFuture<<PagesDTO>> task = null;

for (InputPageDTO dcInputPageDTO : dcReqDTO.getPages()) {
    String pageId = dcInputPageDTO.getPageId();
    task = CompletableFuture
            .supplyAsync(() -> {
                try {
                    return convertOCRStreamToDTO(pageId, pageSequence.getAndIncrement());
                } catch (HttpHostConnectException | ConnectTimeoutException e) {
                    LOG.error("Error connecting to Redis for pageId [{}]", pageId, e);
                    CaptureException e1 = new CaptureException(Error.getErrorCodes().get(ErrorCodeConstants.REDIS_CONNECTION_FAILURE),
                            " Connecting to the Redis failed while getting OCR for pageId ["+pageId +"] " + e.getMessage(), CaptureErrorComponent.REDIS_CACHE, e);
                    exceptionMap.put(pageId,e1);
                } catch (CaptureException e) {
                    LOG.error("Error in Document Classification Engine Service while getting OCR for pageId [{}]",pageId,e);
                    exceptionMap.put(pageId,e);
                } catch (Exception e) {
                    LOG.error("Error getting OCR content for the pageId [{}]", pageId,e);
                    CaptureException e1 = new CaptureException(Error.getErrorCodes().get(ErrorCodeConstants.TECHNICAL_FAILURE),
                            "Error while getting ocr content for pageId : ["+pageId +"] " + e.getMessage(), CaptureErrorComponent.REDIS_CACHE, e);
                    exceptionMap.put(pageId,e1);
                }
                return null;
            }, newWorkStealingPool);
    //collect all async tasks
    pendingTasks.add(task);
}

//TODO: How to avoid unnecessary loops which is happening here just for the sake of waiting for the future tasks to complete???
//TODO: Looking for the best solutions
while(pendingTasks.size() > 0) {
    for(CompletableFuture<PagesDTO> futureTask: pendingTasks) {
        if(futureTask != null && futureTask.isDone()){
            completedTasks.add(futureTask);
            pageDTOList.add(futureTask.get());
        }
    }
    pendingTasks.removeAll(completedTasks);
}

//Throw the exception cought while getting converting OCR stream to DTO - for any of the pageId
for(InputPageDTO dcInputPageDTO : dcReqDTO.getPages()) {
    if(exceptionMap.containsKey(dcInputPageDTO.getPageId())) {
        CaptureException e = exceptionMap.get(dcInputPageDTO.getPageId());
        throw e;
    }
}

LOG.info("Parallel processing time taken for {} pages = {}", dcReqDTO.getPages().size(),
        org.springframework.util.StringUtils.deleteAny(Duration.between(Instant.now(), start).toString().toLowerCase(), "pt-"));

请查看我以上代码库中的待办事项,我有以下两个考虑事项,我正在寻求关于堆栈溢出的建议:

1)我希望避免不必要的循环(发生在上面的while循环中),乐观地等待所有线程完成异步执行,然后从其中收集结果的最佳方法是什么?有谁有建议吗? 2) ExecutorService实例是在我的service类级别创建的,认为它将被用于每个请求,而不是在方法的本地创建它,最后关闭。我在这里做什么??或者在我的思维过程中做任何修正?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-04-10 13:02:57

只需删除whileif就可以了:

代码语言:javascript
复制
for(CompletableFuture<PagesDTO> futureTask: pendingTasks) {
    completedTasks.add(futureTask);
    pageDTOList.add(futureTask.get());
}

get() (以及join())将在返回值之前等待将来完成。而且,没有必要对null进行测试,因为列表中永远不会包含任何内容。

但是,您可能应该改变处理异常的方式。CompletableFuture有一种特殊的机制来处理它们,并在调用get()/join()时重新抛出它们。您可能只需要在CompletionException中包装检查过的异常。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/43302589

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档