首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >CompletableFuture问题-不等待调用所有URL

CompletableFuture问题-不等待调用所有URL
EN

Stack Overflow用户
提问于 2021-10-15 22:18:27
回答 1查看 154关注 0票数 0

关于并发异步调用,我遇到的问题是,在连续运行后,代码不会等到两个调用都完成并包含响应。有时同时执行两个URL,有时仅执行其中一个URL

我希望实现的目标是:

a)使CompletableFutures等待,直到接收到所有响应

b)在列表中提取所提供调用的所有响应

c)能够通过键值对(例如: request1、request2、request3WithInvalidData等)对它们进行分类,并在完成后迭代列表,提取每个请求(statusCode、body等)的响应和数据,并进一步进行断言。

如果有人能帮助我,我将不胜感激!我是多线程的新手,它很难掌握。一点帮助和对解决方案的解释会对我有很大的帮助!

谢谢!(下面是我的代码)

代码语言:javascript
复制
class HelloWorld {

private static void concurrentCalls(List<String> paths) {
    var client = HttpClient.newHttpClient();

    List<HttpRequest> requests = paths.stream()
            .map(URI::create)
            .map(uri -> HttpRequest.newBuilder(uri).GET().timeout(Duration.ofSeconds(20)).build())
            .collect(Collectors.toList());

    List<String> llll = new ArrayList<>();

    CompletableFuture<?>[] responses = requests.stream()
            .map(request -> client.sendAsync(request, HttpResponse.BodyHandlers.ofString())
                    .thenApply(HttpResponse::body)
                    .exceptionally(e -> "Error: " + e.getMessage())
                    .thenAccept(llll::add))
            .toArray(CompletableFuture<?>[]::new);

   
    CompletableFuture.allOf(responses).join();

    llll.stream().forEach(System.out::println);
}

public static void main(String[] args) {
    List<String> listOfUrls = new ArrayList<>();
    listOfUrls.add("https://postman-echo.com/get?foo1=bar1&foo2=bar2");
    listOfUrls.add("https://postman-echo.com/response-headers?foo1=bar1&foo2=bar2");
    concurrentCalls(listOfUrls);
}
EN

回答 1

Stack Overflow用户

发布于 2021-10-20 10:20:42

代码语言:javascript
复制
    CompletableFuture<?>[] responses = requests.stream()
            .map(request -> client.sendAsync(request, HttpResponse.BodyHandlers.ofString())
                    .thenApply(HttpResponse::body)
                    .exceptionally(e -> "Error: " + e.getMessage())
                    .thenAccept(llll::add))
            .toArray(CompletableFuture<?>[]::new);

这段代码获取所有的HttpRequest对象,对它们调用sendAsync(),然后在完成各自的调用后为每个对象分配一些要执行的任务。这些任务是thenApply()exceptionally()thenAccept()

当你调用这两条线路时,问题就来了。

代码语言:javascript
复制
CompletableFuture.allOf(responses).join();

llll.stream().forEach(System.out::println);

join()所做的正是您所期望的-阻塞线程,等待所有请求完成并接收它们的响应。更具体地说,在每个HttpRequest对象至少到达client.sendAsync()行之前,join()不会让代码继续执行。

然而,这正是问题所在-代码没有机会通过 client.sendAsync,这意味着它可能没有足够的时间添加到您的列表llll中。请记住,添加到该列表是我们分配给每个HttpResponse对象的任务之一,但根据运行方式的不同,在打印llll内容之前,将响应添加到llll的速度可能不够快。

这就是你的解决方案不一致的原因--你有一个race condition。如果希望避免竞争条件,则必须使用collect(Collectors.toList())函数将所有响应存储到llll中,然后必须将CompletableFurure.allOf().join()替换为对join()的单独调用(作为CompletionStage的一部分)。下面是一个例子。

代码语言:javascript
复制
import java.net.http.HttpClient;
import java.net.http.HttpResponse;
import java.net.http.HttpRequest;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

class HelloWorld {

   private static void concurrentCalls(List<String> paths) {
      var client = HttpClient.newHttpClient();
   
      List<HttpRequest> requests = paths.stream()
               .map(URI::create)
               .map(uri -> HttpRequest.newBuilder(uri).GET().timeout(Duration.ofSeconds(20)).build())
               .collect(Collectors.toList());
   
      List<CompletableFuture<HttpResponse<String>>> inProgressResponses = requests.stream()
               .map(request -> client.sendAsync(request, HttpResponse.BodyHandlers.ofString()))
               .collect(Collectors.toList());
               
      List<String> completedResponses = inProgressResponses.stream()
            .map(response -> response
                  .thenApply(HttpResponse::body)
                  .exceptionally(e -> "Error: " + e.getMessage())
                  .join())
            .collect(Collectors.toList())
            ;
      
      completedResponses.stream().forEach(System.out::println);
      
   }

   public static void main(String[] args) {
      List<String> listOfUrls = new ArrayList<>();
      listOfUrls.add("https://postman-echo.com/get?foo1=bar1&foo2=bar2");
      listOfUrls.add("https://postman-echo.com/response-headers?foo1=bar1&foo2=bar2");
      concurrentCalls(listOfUrls);
   }

}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69591166

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档