我想向不同的外部服务部门打平行电话,并进行第一次成功的响应。
在这里成功意味着服务返回的结果在某些字段中具有某些值。
然而,对于CompletableFuture来说,除了例外之外,其他一切都是成功的。因此,即使对于业务失败,我也不得不抛出一个异常,向CompletableFuture发出不成功的信号。这感觉不对,理想情况下,我希望提供一个布尔值来表示业务的成功/失败。有什么更好的方法来表明商业失败吗?
我遇到的第二个问题是,如何确保我不会因为放弃的CompletableFuture而耗尽线程,即使在CompletableFutures.anyOf()返回之后也会继续运行。理想情况下,我希望强制停止线程,但根据下面的线程,我所能做的最好是取消下游操作。
How to cancel Java 8 completable future?
当您调用CompletableFuture#cancel时,您只停止链的下游部分。上游部分,即最终将被称为完整(.)或者completeExceptionally(.),没有收到任何信号表明不再需要结果。
我可以通过提供自己的ExecutorService并在shutdown()/shutdownNow()返回后调用CompletableFuture.anyOf() /shutdownNow()来强制停止踏板。但是,我不确定为每个请求创建一个新的ExecutorService实例意味着什么。
发布于 2022-05-24 06:31:00
这也许是试图避开anyOf(),因为用它取消其他任务是不容易的。
这样做的目的是创建并启动异步任务,然后保持对未来的引用,以及用于有效终止其他任务的对象,而这些任务当然取决于实际的代码/任务。在本例中,我只是返回输入字符串(因此类型为Pair<CompletableFuture<String>, String>);您的代码可能有类似请求对象的内容。
ExecutorService exec = Executors.newFixedThreadPool(5);
List<String> source = List.of();
List<Pair<CompletableFuture<String>, String>> futures = source.stream()
.map(s -> Pair.of(CompletableFuture.supplyAsync(() -> s.toUpperCase(), exec), s))
.collect(Collectors.toList());
Consumer<Pair<CompletableFuture<String>, String>> cancelOtherTasksFunction = pair -> {
futures.stream()
.filter(future -> future != pair)
.forEach(future -> {
future.getLeft().cancel(true); // cancel the future
// future.getRight().doSomething() // cancel actual task
});
};
AtomicReference<String> result = new AtomicReference<>();
futures.forEach(future -> future.getLeft()
.thenAccept(s -> {
if (null == result.compareAndExchangeRelease(null, s)) {
// the first success is recorded
cancelOtherTasksFunction.accept(future);
}
}));我想你可以(应该吗?)创建一个类来保存未来和任务对象(例如您可以取消的http请求)来替换Pair。
注意:
只有当您的未来返回一个非空值时,
if (null == result.compareAndExchangeRelease(null, s))才是有效的。result中出现,但是在返回它之前仍然需要测试是否阻塞,虽然我认为它应该可以工作,因为其他任务正在被取消(这是理论)。futures.forEach作为流管道的一部分,只是要小心地强制提交所有任务( collect(Collectors.toList())会这样做)。https://stackoverflow.com/questions/72357411
复制相似问题