首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >正确使用CompletableFuture.anyOf()

正确使用CompletableFuture.anyOf()
EN

Stack Overflow用户
提问于 2022-05-24 05:03:53
回答 1查看 197关注 0票数 1

我想向不同的外部服务部门打平行电话,并进行第一次成功的响应。

在这里成功意味着服务返回的结果在某些字段中具有某些值。

然而,对于CompletableFuture来说,除了例外之外,其他一切都是成功的。因此,即使对于业务失败,我也不得不抛出一个异常,向CompletableFuture发出不成功的信号。这感觉不对,理想情况下,我希望提供一个布尔值来表示业务的成功/失败。有什么更好的方法来表明商业失败吗?

我遇到的第二个问题是,如何确保我不会因为放弃的CompletableFuture而耗尽线程,即使在CompletableFutures.anyOf()返回之后也会继续运行。理想情况下,我希望强制停止线程,但根据下面的线程,我所能做的最好是取消下游操作。

How to cancel Java 8 completable future?

当您调用CompletableFuture#cancel时,您只停止链的下游部分。上游部分,即最终将被称为完整(.)或者completeExceptionally(.),没有收到任何信号表明不再需要结果。

我可以通过提供自己的ExecutorService并在shutdown()/shutdownNow()返回后调用CompletableFuture.anyOf() /shutdownNow()来强制停止踏板。但是,我不确定为每个请求创建一个新的ExecutorService实例意味着什么。

EN

回答 1

Stack Overflow用户

发布于 2022-05-24 06:31:00

这也许是试图避开anyOf(),因为用它取消其他任务是不容易的。

这样做的目的是创建并启动异步任务,然后保持对未来的引用,以及用于有效终止其他任务的对象,而这些任务当然取决于实际的代码/任务。在本例中,我只是返回输入字符串(因此类型为Pair<CompletableFuture<String>, String>);您的代码可能有类似请求对象的内容。

代码语言:javascript
复制
ExecutorService exec = Executors.newFixedThreadPool(5);
List<String> source = List.of();

List<Pair<CompletableFuture<String>, String>> futures = source.stream()
        .map(s -> Pair.of(CompletableFuture.supplyAsync(() -> s.toUpperCase(), exec), s))
        .collect(Collectors.toList());

Consumer<Pair<CompletableFuture<String>, String>> cancelOtherTasksFunction = pair -> {
    futures.stream()
            .filter(future -> future != pair)
            .forEach(future -> {
                future.getLeft().cancel(true); // cancel the future
                // future.getRight().doSomething() // cancel actual task
            });
};

AtomicReference<String> result = new AtomicReference<>();
futures.forEach(future -> future.getLeft()
        .thenAccept(s -> {
            if (null == result.compareAndExchangeRelease(null, s)) {
                // the first success is recorded
                cancelOtherTasksFunction.accept(future);
            }
        }));

我想你可以(应该吗?)创建一个类来保存未来和任务对象(例如您可以取消的http请求)来替换Pair

注意:

只有当您的未来返回一个非空值时,

  • if (null == result.compareAndExchangeRelease(null, s))才是有效的。
  • 第一个有效的响应将在result中出现,但是在返回它之前仍然需要测试是否阻塞,虽然我认为它应该可以工作,因为其他任务正在被取消(这是理论)。
  • 您可能决定将futures.forEach作为流管道的一部分,只是要小心地强制提交所有任务( collect(Collectors.toList())会这样做)。
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72357411

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档