首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >Spring WebClient + 虚拟线程实战

Spring WebClient + 虚拟线程实战

作者头像
FunTester
发布2025-12-21 14:01:34
发布2025-12-21 14:01:34
2160
举报
文章被收录于专栏:FunTesterFunTester

Spring WebClient 与虚拟线程:实战集成指南

在上一篇文章中,我们介绍了 JDK HttpClient 和虚拟线程的基础概念。现在让我们深入探讨如何将虚拟线程与 Spring WebClient 集成,以及在实际项目中的应用场景。

与 Spring WebClient 集成

你也可以增强 Spring WebClient,使其使用虚拟线程,将 Spring 强大的生态系统与虚拟线程的可扩展性结合起来。这样你既能享受 Spring 的便利,又能获得虚拟线程的性能优势。

自定义 WebClient 配置

下面是一个自定义的 WebClient 配置,让它使用虚拟线程:

代码语言:javascript
复制
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.JdkClientHttpRequestFactory;
import org.springframework.web.reactive.function.client.WebClient;
import java.net.http.HttpClient;
import java.time.Duration;
import java.util.concurrent.Executors;

/**
 * WebClient 配置类,使用虚拟线程增强性能
 * WebClient configuration class with virtual thread enhancement
 */
@Configuration
publicclassWebClientConfig {

    /**
     * 创建使用虚拟线程的 WebClient Bean
     * Create WebClient Bean using virtual threads
     * @return WebClient 实例 / WebClient instance
     */
    @Bean
    public WebClient webClient() {
        // 创建使用虚拟线程的 JDK HttpClient
        // Create JDK HttpClient using virtual threads
        HttpClienthttpClient= HttpClient.newBuilder()
            .version(HttpClient.Version.HTTP_2)  // 使用 HTTP/2 协议 / Use HTTP/2 protocol
            .connectTimeout(Duration.ofSeconds(10))  // 设置连接超时 / Set connection timeout
            .executor(Executors.newVirtualThreadPerTaskExecutor())  // 使用虚拟线程执行器 / Use virtual thread executor
            .build();

        // 创建基于 JDK HttpClient 的请求工厂
        // Create request factory based on JDK HttpClient
        JdkClientHttpRequestFactoryrequestFactory=
            newJdkClientHttpRequestFactory(httpClient);

        // 构建 WebClient,配置基础 URL 和默认请求头
        // Build WebClient with base URL and default headers
        return WebClient.builder()
            .clientConnector(newHttpComponentsClientHttpConnector(requestFactory))  // 设置客户端连接器 / Set client connector
            .baseUrl("https://api.example.com")  // 设置基础 URL / Set base URL
            .defaultHeader("User-Agent", "FunTester-Spring-VirtualThread-Client")  // 设置 User-Agent 请求头 / Set User-Agent header
            .build();
    }
}

实际服务实现

下面是一个利用这种配置的真实服务,展示了如何在业务代码中使用虚拟线程增强的 WebClient:

代码语言:javascript
复制
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.concurrent.Executors;

/**
 * 用户服务类,使用虚拟线程增强的 WebClient
 * User service class using virtual thread enhanced WebClient
 */
@Service
publicclassUserService {

    // WebClient 实例
    // WebClient instance
    privatefinal WebClient webClient;

    /**
     * 构造函数,注入 WebClient
     * Constructor, inject WebClient
     * @param webClient WebClient 实例 / WebClient instance
     */
    publicUserService(WebClient webClient) {
        this.webClient = webClient;
    }

    /**
     * 根据用户 ID 获取用户信息
     * Get user info by user ID
     * @param userId 用户 ID / User ID
     * @return Mono<User> 响应式用户对象 / Reactive User object
     */
    public Mono<User> getUserById(Long userId) {
        return webClient.get()
            .uri("/users/{id}", userId)  // 设置请求路径 / Set request path
            .retrieve()  // 检索响应 / Retrieve response
            .bodyToMono(User.class)  // 转换为 User 对象 / Convert to User object
            .doOnError(error ->
                System.err.println("Error fetching user: FunTester - " + error.getMessage()));  // 错误处理 / Error handling
    }

    /**
     * 获取所有用户列表
     * Get all users list
     * @return Flux<User> 响应式用户流 / Reactive User stream
     */
    public Flux<User> getAllUsers() {
        return webClient.get()
            .uri("/users")  // 设置请求路径 / Set request path
            .retrieve()  // 检索响应 / Retrieve response
            .bodyToFlux(User.class);  // 转换为 User 流 / Convert to User stream
    }

    /**
     * 基于虚拟线程的批量处理用户信息
     * Batch process user info based on virtual threads
     * @param userIds 用户 ID 列表 / List of user IDs
     * @return 用户列表 / List of users
     */
    public List<User> getUsersBatch(List<Long> userIds) {
        // 创建虚拟线程执行器
        // Create virtual thread executor
        try (varexecutor= Executors.newVirtualThreadPerTaskExecutor()) {
            // 为每个用户 ID 提交一个虚拟线程任务
            // Submit a virtual thread task for each user ID
            varfutures= userIds.stream()
                .map(id -> executor.submit(() ->
                    getUserById(id).block()))  // 阻塞等待结果,虚拟线程下成本低 / Block and wait for result, low cost with virtual threads
                .toList();

            // 收集所有任务结果
            // Collect all task results
            return futures.stream()
                .map(future -> {
                    try {
                        return future.get();  // 获取任务结果 / Get task result
                    } catch (Exception e) {
                        returnnull;  // 失败返回 null / Return null on failure
                    }
                })
                .filter(user -> user != null)  // 过滤掉 null 值 / Filter out null values
                .toList();
        }
    }
}

高级模式和最佳实践

连接池和资源管理

JDK HttpClient 自动管理连接池,但你仍然需要注意资源限制。虽然虚拟线程让你可以创建大量并发请求,但网络连接和服务器资源仍然是有限的:

代码语言:javascript
复制
// 创建使用虚拟线程的 HTTP 客户端,支持 HTTP/2 多路复用
// Create HTTP client using virtual threads with HTTP/2 multiplexing support
HttpClient client = HttpClient.newBuilder()
    .version(HttpClient.Version.HTTP_2)  // 使用 HTTP/2 协议 / Use HTTP/2 protocol
    .connectTimeout(Duration.ofSeconds(10))  // 设置连接超时 / Set connection timeout
    .executor(Executors.newVirtualThreadPerTaskExecutor())  // 使用虚拟线程执行器 / Use virtual thread executor
    // HTTP/2 多路复用允许一个连接上处理多个请求
    // HTTP/2 multiplexing allows multiple requests on a single connection
    .build();

HTTP/2 的多路复用能力意味着多个请求可以共享一个 TCP 连接,与 HTTP/1.1 相比,大幅减少了开销。这就像 HTTP/1.1 是单车道,一次只能过一辆车;HTTP/2 是多车道,可以同时过很多辆车,效率自然高很多。

错误处理和弹性

构建弹性 HTTP 客户端需要正确的错误处理和重试逻辑。在实际生产环境中,网络请求可能会失败,服务器可能会返回错误,这时候重试机制就很重要了。下面是一个带重试逻辑的 HTTP 客户端实现:

代码语言:javascript
复制
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpTimeoutException;
import java.time.Duration;

/**
 * 具有重试机制的弹性 HTTP 客户端
 * Resilient HTTP client with retry mechanism
 */
publicclassResilientHttpClient {

    // HTTP 客户端实例
    // HTTP client instance
    privatefinal HttpClient httpClient;
    // 最大重试次数
    // Maximum retry count
    privatefinalintmaxRetries=3;

    /**
     * 带重试机制的 HTTP 请求方法
     * HTTP request method with retry mechanism
     * @param url 请求 URL / Request URL
     * @return 响应内容 / Response content
     */
    public String fetchWithRetry(String url) {
        intattempt=0;  // 当前尝试次数 / Current attempt count
        ExceptionlastException=null;  // 最后一次异常 / Last exception

        // 重试循环
        // Retry loop
        while (attempt < maxRetries) {
            try {
                // 构建 HTTP 请求
                // Build HTTP request
                HttpRequestrequest= HttpRequest.newBuilder()
                    .uri(URI.create(url))  // 设置请求 URI / Set request URI
                    .timeout(Duration.ofSeconds(5))  // 设置请求超时 / Set request timeout
                    .GET()  // 设置为 GET 请求 / Set as GET request
                    .build();

                // 发送请求并获取响应
                // Send request and get response
                HttpResponse<String> response = httpClient.send(request,
                    HttpResponse.BodyHandlers.ofString());

                // 检查响应状态码
                // Check response status code
                if (response.statusCode() >= 200 && response.statusCode() < 300) {
                    return response.body();  // 成功返回响应体 / Success, return response body
                } elseif (response.statusCode() >= 500) {
                    // 服务器错误时重试
                    // Retry on server error
                    Thread.sleep(1000 * (attempt + 1)); // 指数退避策略 / Exponential backoff strategy
                    attempt++;
                    continue;
                } else {
                    thrownewRuntimeException("FunTester - HTTP " + response.statusCode());
                }

            } catch (HttpTimeoutException e) {
                lastException = e;
                attempt++;
                System.out.println("Timeout on attempt FunTester - " + attempt);
            } catch (Exception e) {
                thrownewRuntimeException("FunTester - Request failed", e);
            }
        }

        thrownewRuntimeException("FunTester - Max retries exceeded", lastException);
    }
}

结构化并发

Java 21 还引入了结构化并发,这与虚拟线程完美搭配,用于管理复杂的并发操作。结构化并发就像给并发操作加了一个生命周期管理,确保所有子任务一起完成或失败,不会出现任务泄漏。这在处理多个相关 HTTP 请求时特别有用:

代码语言:javascript
复制
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.StructuredTaskScope.Subtask;

/**
 * 使用结构化并发获取聚合数据
 * Fetch aggregated data using structured concurrency
 */
publicclassStructuredHttpFetcher {

    /**
     * 获取所有相关数据
     * Fetch all related data
     * @param userId 用户 ID / User ID
     * @return 聚合数据 / Aggregated data
     */
    public AggregatedData fetchAllData(String userId) {
        // 创建结构化任务作用域,任一任务失败则取消其他任务
        // Create structured task scope, cancel other tasks if any task fails
        try (varscope=newStructuredTaskScope.ShutdownOnFailure()) {

            // 并行执行三个 HTTP 请求任务
            // Execute three HTTP request tasks in parallel
            Subtask<User> userTask = scope.fork(() -> fetchUser(userId));  // 获取用户信息 / Fetch user info
            Subtask<List<Order>> ordersTask = scope.fork(() -> fetchOrders(userId));  // 获取订单列表 / Fetch orders list
            Subtask<Profile> profileTask = scope.fork(() -> fetchProfile(userId));  // 获取用户资料 / Fetch user profile

            scope.join();           // 等待所有任务完成 / Wait for all tasks to complete
            scope.throwIfFailed();  // 如果任何任务失败则抛出异常 / Throw exception if any task failed

            // 聚合所有数据
            // Aggregate all data
            returnnewAggregatedData(
                userTask.get(),  // 获取用户任务结果 / Get user task result
                ordersTask.get(),  // 获取订单任务结果 / Get orders task result
                profileTask.get()  // 获取资料任务结果 / Get profile task result
            );
        } catch (Exception e) {
            thrownewRuntimeException("FunTester - Failed to fetch data", e);
        }
    }
}

这种模式确保所有子任务一起完成或失败,并自动清理和传播错误。如果任何一个子任务失败,其他任务也会被取消,避免资源浪费。

实际用例

微服务通信

在微服务架构中,服务之间不断通过 HTTP 进行通信。虚拟线程消除了简单性(每个请求一个线程)与可扩展性之间的传统权衡。以前你要么用线程池限制并发,要么忍受资源消耗,现在虚拟线程让你可以轻松处理大量并发请求:

代码语言:javascript
复制
/**
 * 订单控制器,使用虚拟线程和结构化并发
 * Order controller using virtual threads and structured concurrency
 */
@RestController
@RequestMapping("/api")
publicclassOrderController {

    // HTTP 客户端实例
    // HTTP client instance
    privatefinal HttpClient httpClient;

    /**
     * 构造函数,初始化使用虚拟线程的 HTTP 客户端
     * Constructor, initialize HTTP client using virtual threads
     */
    publicOrderController() {
        this.httpClient = HttpClient.newBuilder()
            .executor(Executors.newVirtualThreadPerTaskExecutor())  // 使用虚拟线程执行器 / Use virtual thread executor
            .build();
    }

    /**
     * 获取订单详情,并行调用多个微服务
     * Get order details, call multiple microservices in parallel
     * @param orderId 订单 ID / Order ID
     * @return 订单详情 / Order details
     */
    @GetMapping("/orders/{orderId}/details")
    public OrderDetails getOrderDetails(@PathVariable String orderId) {
        // 这些调用在虚拟线程上并发运行
        // These calls run concurrently on virtual threads
        try (varscope=newStructuredTaskScope.ShutdownOnFailure()) {
            // 并行执行多个 HTTP 请求
            // Execute multiple HTTP requests in parallel
            varorderTask= scope.fork(() -> fetchOrder(orderId));  // 获取订单信息 / Fetch order info
            varuserTask= scope.fork(() -> fetchUser(orderTask.get().getUserId()));  // 获取用户信息 / Fetch user info
            varinventoryTask= scope.fork(() -> fetchInventory(orderId));  // 获取库存信息 / Fetch inventory info
            varshippingTask= scope.fork(() -> fetchShipping(orderId));  // 获取物流信息 / Fetch shipping info

            scope.join();  // 等待所有任务完成 / Wait for all tasks to complete
            scope.throwIfFailed();  // 检查是否有任务失败 / Check if any task failed

            // 组装订单详情
            // Assemble order details
            returnnewOrderDetails(
                orderTask.get(),  // 订单信息 / Order info
                userTask.get(),  // 用户信息 / User info
                inventoryTask.get(),  // 库存信息 / Inventory info
                shippingTask.get()  // 物流信息 / Shipping info
            );
        } catch (Exception e) {
            thrownewRuntimeException("FunTester - Failed to build order details", e);
        }
    }
}

API 网关聚合

API 网关通常需要从多个后端服务聚合数据。虚拟线程使这种模式高效且简单。以前你可能需要复杂的异步编排,现在用虚拟线程可以轻松实现并行请求多个服务:

代码语言:javascript
复制
/**
 * API 网关,聚合多个后端服务的数据
 * API gateway aggregating data from multiple backend services
 */
publicclassApiGateway {

    // HTTP 客户端实例
    // HTTP client instance
    privatefinal HttpClient httpClient;

    /**
     * 获取仪表板数据,并行调用多个服务
     * Get dashboard data by calling multiple services in parallel
     * @param userId 用户 ID / User ID
     * @return 仪表板数据 / Dashboard data
     */
    public DashboardData getDashboard(String userId) {
        // 创建虚拟线程执行器
        // Create virtual thread executor
        try (varexecutor= Executors.newVirtualThreadPerTaskExecutor()) {

            // 并行提交多个服务调用任务
            // Submit multiple service call tasks in parallel
            varfutures= List.of(
                executor.submit(() -> fetchService("user-service", userId)),  // 用户服务 / User service
                executor.submit(() -> fetchService("notification-service", userId)),  // 通知服务 / Notification service
                executor.submit(() -> fetchService("analytics-service", userId)),  // 分析服务 / Analytics service
                executor.submit(() -> fetchService("recommendation-service", userId))  // 推荐服务 / Recommendation service
            );

            // 收集所有服务调用结果
            // Collect all service call results
            List<String> results = futures.stream()
                .map(future -> {
                    try {
                        return future.get(Duration.ofSeconds(2));  // 设置超时时间 / Set timeout
                    } catch (Exception e) {
                        return"{}"; // 回退到空数据 / Fallback to empty data
                    }
                })
                .toList();

            // 聚合结果
            // Aggregate results
            return aggregateResults(results);
        }
    }
}

总结

通过将虚拟线程与 Spring WebClient 集成,我们可以在 Spring 生态系统中充分利用虚拟线程的性能优势。高级模式如结构化并发、错误处理和连接池管理,让我们能够构建更加健壮和高效的 HTTP 客户端。

在实际应用中,微服务通信和 API 网关聚合是虚拟线程发挥优势的典型场景。在下一篇文章中,我们将深入探讨性能调优、监控和迁移策略,帮助你将虚拟线程应用到生产环境中。


FunTester 原创精华
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-12-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 FunTester 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Spring WebClient 与虚拟线程:实战集成指南
  • 与 Spring WebClient 集成
    • 自定义 WebClient 配置
    • 实际服务实现
  • 高级模式和最佳实践
    • 连接池和资源管理
    • 错误处理和弹性
    • 结构化并发
  • 实际用例
    • 微服务通信
    • API 网关聚合
  • 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档