/assets/avatar.png?s=240&d=mp

Hi 👋, I'm j5land

专注广告技术,喜欢高并发技术,学习K8s/容器技术,期待与大家分享经验,共同学习。

WebClient性能分析与对比测试

背景

在Spring 5之前,如果我们想要调用其他系统提供的HTTP服务,我们通常可以使用Spring提供的RestTemplate来访问。RestTemplate用法很简单,但它的不足之处在于它的请求是同步阻塞模式,因此存在一定性能瓶颈,当然如果想要使用异步方式请求,也可以使用AsyncRestTemplate。

从Spring 5开始,Spring中全面引入了Reactive响应式编程,WebClient就属于Spring WebFlux 的一部分。WebClient的请求模式属于异步非阻塞、反应式的,能够以少量固定的线程处理高并发的HTTP 请求。

因此,从Spring 5 开始,HTTP服务之间的通信方式我们可以考虑使用WebCLient来取代之前的RestTemplate。

WebClient 详解

WebClient特点

webClient是一个功能完善的HTTP请求客户端,支持以下内容:

  • 非阻塞I/O
  • 反应流回压(即消费者负载过高时,主动反馈生产者放慢速度的机制)
  • 具有高并发性,硬件资源消耗更少
  • 流程的API设计
  • 同步与异步交互
  • 流式传输支持

WebClient 开发配置

1 添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

2 基础配置

WebClient 实例构造器可以设置一些基础的全局web请求配置信息,比如默认的cookie、header、baseUrl等。

WebClient.builder()
                .defaultCookie("kl","kl")
                .defaultUriVariables(ImmutableMap.of("name","kl"))
                .defaultHeader("header","kl")
                .defaultHeaders(httpHeaders -> {
                    httpHeaders.add("header1","kl");
                    httpHeaders.add("header2","kl");
                })
                .defaultCookies(cookie ->{
                    cookie.add("cookie1","kl");
                    cookie.add("cookie2","kl");
                })
                .baseUrl("http://www.compay.com")
                .build();

3 底层依赖Netty库配置

通过配置Netty底层库,可以配置SSL安全链接、请求超时、读写超时等。

可以通过配置动态连接池,自定义突破这些默认配置。包括Netty的select线程和工作线程也可以自己设置。

  • 最大连接数(maxConnections):ConnectionProvider连接池的最大连接数选项,默认为ConnectionProvider.DEFAULT_POOL_MAX_CONNECTIONS,即:2 * 可用处理器数(但最小值为 16),#2052
    • Reactor Netty 提供了两种创建客户端的方式
      • HttpClient.create()- 这使用预定义 ConnectionProvider的,它是这样创建的ConnectionProvider.create(name, 500)。这意味着500 maxConnections(这是在构建器中明确指定的ConnectionProvider)和1000 pendingAcquireMaxCount(这是ConnectionProvider构建器提供的默认值)
      • HttpClient.create(ConnectionProvider)- 用户提供自定义ConnectionProvider
      • JavaDoc:https://projectreactor.io/docs/netty/release/api/reactor/netty/resources/ConnectionProvider.ConnectionPoolSpec.html#maxConnections-int-
  • 最大空闲时间(maxIdleTime):默认ConnectionProvider.DEFAULT_POOL_MAX_IDLE_TIME,默认无空闲时间
  • 最大存活时间(maxLifeTime):默认ConnectionProvider.DEFAULT_POOL_MAX_LIFE_TIME
  • 等待获取时间(pendingAcquireTimeout):获取连接超时默认45000ms,即45秒
  • 挂起的获取最大计数(pendingAcquireMaxCount):获取的最大注册请求数以保留在挂起队列中当使用 -1 调用时,挂起队列将没有上限。默认为2 * max connections.
//最大连接数、最大空闲时间、最大存活时间、获取连接超时时间
//挂起的获取最大计数
//在后台被驱逐时间
ConnectionProvider provider = ConnectionProvider.builder("fixed")
                .maxConnections(200)
                .maxIdleTime(Duration.ofSeconds(60))
                .maxLifeTime(Duration.ofSeconds(60))
                .pendingAcquireTimeout(Duration.ofSeconds(60))
                .pendingAcquireMaxCount(20000)
                .evictInBackground(Duration.ofSeconds(120)).build();

//配置Netty select线程
HttpClient httpClient = HttpClient.create(provider)
  							.tcpConfiguration(tcpClient -> {
                     //指定Netty的select 和 work线程数量
                     LoopResources loop = LoopResources.create("kl-event-loop", 1, 4, true);
                     return tcpClient.doOnConnected(connection -> {
                         //读写超时设置
                         connection.addHandlerLast(new ReadTimeoutHandler(10, TimeUnit.SECONDS))
                                 .addHandlerLast(new WriteTimeoutHandler(10));
                     })
                       //连接超时设置
                       .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
                       .option(ChannelOption.TCP_NODELAY, true)
                       .runOn(loop);
                 });

WebClient.builder().clientConnector(new ReactorClientHttpConnector(httpClient).build();

WebClient 使用上常见问题

1 PoolAcquirePendingLimitException

Pending acquire queue has reached its maximum size of 1000; nested exception is reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException: Pending acquire queue has reached its maximum size of 1000

WebClient需要一个HTTP客户端库来执行请求。 默认情况下,它使用Reactor Netty,客户端使用一个“固定”连接池,其中500个是活动通道的最大数量,1000个是允许保持在挂起状态的进一步通道获取尝试的最大数量。

只要创建的通道少于500个,并且由池管理,那么实现就会创建一个新通道。 当达到池中通道的最大数量时,最多会延迟(挂起)1000次获取通道的新尝试,直到通道再次返回到池中,并且会拒绝进一步尝试,并出现错误。

你有两个选择来解决这个问题,垂直扩展:增加连接池大小和/或获取队列长度,水平扩展:创建应用程序的其他实例。

1.1 垂直扩展:增加连接池大小和/或获取队列长度

ConnectionProvider connectionProvider = ConnectionProvider.builder("myConnectionPool")
        .maxConnections(<your_desired_max_connections>)
        .pendingAcquireMaxCount(<your_desired_pending_queue_size>)
        .build();
ReactorClientHttpConnector clientHttpConnector = new ReactorClientHttpConnector(HttpClient.create(connectionProvider));
WebClient.builder()
        .clientConnector(clientHttpConnector)
        .build();

1.2 水平扩展:创建应用程序的其他实例,并在实例之间平衡api调用的负载。 在计算连接池的大小时,值得考虑下游api调用的延迟。 connection_pool_size=tps * downstream_api_latency tps(每秒事务数)

2 Connection reset by peer exception

Connection reset by peer exception

由于没有设置合理的连接池抛出的异常信息。

https://github.com/reactor/reactor-netty/issues/1774

构建HTTP请求示例

1 GET请求示例

URI构造时支持属性占位符,真实参数在入参时排序好就可以。同时可以通过accept设置媒体类型,以及编码。最终的结果值是通过Mono和Flux来接收的,在subscribe方法中订阅返回值。

WebClient client = WebClient.create("http://www.compay.com");
Mono<String> result = client.get()
  .uri("/book/{id}.html", 123)
  .acceptCharset(StandardCharsets.UTF_8)
  .accept(MediaType.TEXT_HTML)
  .retrieve()
  .bodyToMono(String.class);
result.subscribe(System.err::println);

如果需要携带复杂的查询参数,可以通过UriComponentsBuilder构造出URL请求地址,如:

//定义query参数
MultiValueMap<String, String> params = new LinkedMultiValueMap<>();
params.add("name", "daybreak");
//定义url参数
Map<String, Object> uriVariables = new HashMap<>();
uriVariables.put("id", 200);
String uri = UriComponentsBuilder.fromUriString("/book/{id}.html")
  .queryParams(params)
  .uriVariables(uriVariables)
  .toUriString();

2 POST请求示例

POST请求示例演示了一个同时包含表单参数和文件流数据

WebClient client = WebClient.create("http://www.compay.com");
FormInserter formInserter = fromMultipartData("name","daybreak")
	.with("map",ImmutableMap.of("xx","xx"))
	.with("file",new File("d://xxx.doc"));
Mono<String> result = client.post().uri("/book/{id}.html", 123)
	.contentType(MediaType.APPLICATION_JSON)
	.body(formInserter)
	.retrieve()
	.bodyToMono(String.class);
result.subscribe(System.err::println);

普通POST请求,直接通过bodyValue设置对象实例即可。不用FormInserter构造

WebClient client = WebClient.create("http://www.compay.com");
Mono<String> result = client.post().uri("/book/{id}.html", 123)
	.contentType(MediaType.APPLICATION_JSON)
  .bodyValue(ImmutableMap.of("name","daybreak"))
	.retrieve()
	.bodyToMono(String.class);
result.subscribe(System.err::println);

3 同步返回结果

WebClient client =  WebClient.create("http://www.compay.com");
String result = client .get().uri("/book/{id}.html", 123)
	.retrieve()
	.bodyToMono(String.class)
	.block();
System.err.println(result);

4 异常处理

可以使用OnStatus根据status code进行异常适配。

可以使用doOnError异常适配

可以使用onErrorReturn返回指定默认返回值

WebClient webClient = WebClient.builder().baseUrl("https://api.compay.com")
        .defaultHeader(HttpHeaders.CONTENT_TYPE, "application/json")
        .defaultHeader(HttpHeaders.USER_AGENT, "Spring 5 WebClient")
        .build();
WebClient.ResponseSpec responseSpec = webClient.method(HttpMethod.GET)
        .uri("/user/list?sort={sortField}", "updated")
        .retrieve();
Mono<String> mono = responseSpec
        .onStatus(e -> e.is4xxClientError(),resp -> {
            LOGGER.error("error:{},msg:{}",resp.statusCode().value(),resp.statusCode().getReasonPhrase());
            return Mono.error(new RuntimeException(resp.statusCode().value() + " : " + resp.statusCode().getReasonPhrase()));
        })
        .bodyToMono(String.class)
        .doOnError(WebClientResponseException.class, err -> {
            LOGGER.info("ERROR status:{},msg:{}",err.getRawStatusCode(),err.getResponseBodyAsString());
            throw new RuntimeException(err.getMessage());
        })
        .onErrorReturn("fallback");
String result = mono.block();
LOGGER.info("result:{}",result);

性能对比测试

接下来我们进行性能对比测试。

在某个业务场景中我们使用了RestTemplate,借此分析下WebClient 与 RestTemplate 两者的性能实际表现,通过实际的案例性能对比,看看WebClient 是否有如此优秀。

业务场景说明

在我们的业务场景中,是一个异步处理的场景。当完成某项业务后,进行HTTP异步通知处理。这个也是在业务开发中比较场景的一种场景。

测试方案设计

延迟的负载测试分析

搭建待测试项目

    private AtomicInteger metrics = new AtomicInteger(0);

    @RequestMapping(value = "/check", method = RequestMethod.GET)
    @ResponseBody
    public String check() throws InterruptedException {
        Thread.sleep(100L);
        return "success_" + metrics.incrementAndGet();
    }

Caller设计

1 ThreadPoolTaskExecutor + RestTemplate

通过线程组组合 RestTemplate 实现异步HTTP请求。

		private static final int poolSize = Math.max(Runtime.getRuntime().availableProcessors(), 8) * 2;
    private static final int queueSize = 20000;	
	
		@Bean
    public TaskExecutor listenerExecutor() {
        ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
        pool.setCorePoolSize(poolSize);
        pool.setMaxPoolSize(poolSize);
        pool.setQueueCapacity(queueSize);
        pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        pool.setWaitForTasksToCompleteOnShutdown(true);
        pool.setAwaitTerminationSeconds(60);
        return pool;
    }
  • corePoolSize:Math.max(Runtime.getRuntime().availableProcessors(), 8) * 2
  • maxPoolSize:Math.max(Runtime.getRuntime().availableProcessors(), 8) * 2
  • queueCapacity:20000
  • rejectedExecutionHandler:CallerRunsPolicy

2 AsyncRestTemplate

原官方的线程组+RestTemplate方法,在Spring5.0以及废弃,替换换WebClient。

		@Bean
    public AsyncRestTemplate asyncRestTemplate() {
        SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
        factory.setConnectTimeout(100);
        factory.setReadTimeout(200);
        ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
        int poolSize = Math.max(Runtime.getRuntime().availableProcessors(), 8) * 2;
        pool.setCorePoolSize(poolSize);
        pool.setMaxPoolSize(poolSize);
        pool.setQueueCapacity(20000);
        pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        pool.setWaitForTasksToCompleteOnShutdown(true);
        pool.setAwaitTerminationSeconds(60);
        pool.initialize();
        factory.setTaskExecutor(pool);
        return new AsyncRestTemplate(factory);
    }
  • corePoolSize:Math.max(Runtime.getRuntime().availableProcessors(), 8) * 2
  • maxPoolSize:Math.max(Runtime.getRuntime().availableProcessors(), 8) * 2
  • queueCapacity:20000
  • rejectedExecutionHandler:CallerRunsPolicy

WebClient:异步非阻塞、反应式的,能够以少量固定的线程处理高并发的HTTP 请求。

		@Bean
    public WebClient webClient(){
        ConnectionProvider provider = ConnectionProvider.builder("fixed")
                .maxConnections(200)
                .maxIdleTime(Duration.ofSeconds(60))
                .maxLifeTime(Duration.ofSeconds(60))
                .pendingAcquireTimeout(Duration.ofSeconds(60))
                .pendingAcquireMaxCount(1000).build();
        return WebClient.builder().clientConnector(new ReactorClientHttpConnector(HttpClient.create(provider))).build();
    }
  • 最大连接数(maxConnections):200
  • 最大空闲时间(maxIdleTime):60s
  • 最大存活时间(maxLifeTime):60s
  • 等待获取时间(pendingAcquireTimeout):60s
  • 挂起的获取最大计数(pendingAcquireMaxCount):1000

测试结果分析

1 ThreadPoolTaskExecutor + RestTemplate

用户量 线程数 吞吐量(req/sec) 95%(ms) 延迟完成(min) 堆内存(Mb) CPU占用率
100 44 168 1 0 100 3
200 42 334 1 1 100 3
400 92 602 115 1 100 3
800 232 972 533 2 100 3
1200 232 1007 1175 2 150 4
1600 232 1003 1683 3 200 5

resttemplate

2 AsyncRestTemplate

用户量 线程数 吞吐量(req/sec) 95%(ms) 延迟完成(min) 堆内存(Mb) CPU占用率
100 47 168 1 0 200 3
200 37 334 1 2 300-500 3
400 93 603 111 3 400-500 5
800 169 981 127 3 1000 5
1200 235 1633 215 3 1000 5
1600 235 1869 417 3 1000 5

async-resttemplate2

3 Webclient

用户量 线程数 吞吐量(req/sec) 95%(ms) 延迟完成(min) 堆内存(Mb) CPU占用率 信息
100 38 168 1 0 100 2
200 37 335 1 0 100 2
400 37 667 1 0 100 2
800 43 1334 1 0 100 3
1200 50 2000 1 0 110 4
1400 43 2326 1 0 120 5
1600 51 2632 1 0 130 5 PoolAcquirePendingLimitException
1800 44 3031 1 0 150 6 PoolAcquirePendingLimitException
2000 48 3334 1 0 200 7 PoolAcquirePendingLimitException
2400 51 4000 1 0 250 8 PoolAcquirePendingLimitException
3000 54 4994 1 0 500 10 PoolAcquirePendingLimitException
4000 83 6666 1 0 800 15 PoolAcquirePendingLimitException
5000 130 8308 1 0 1000 20 PoolAcquirePendingLimitException

webclient2

image-20230116151614439

压测结论

1.ThreadPoolTaskExecutor + RestTemplate,用户并发量在800,吞吐量可以达到972/s,但是tomcat线程被打满,已经出现了延迟等待的现象(由于使用的是CallerRunsPolicy策略)。

2.AsyncRestTemplate,用户并发量在800,吞吐量981/s,线程数169基本也进入CallerRunsPolicy策略。

3.Webclient,用户并发量在1400,吞吐量2326/s,线程数、内存、cpu都相对问题,由于Webclient是异步非阻塞的,不能像线程池一样设置执行CallerRunsPolicy策略,当用户并发量1600达到性能瓶颈,开始触发PoolAcquirePendingLimitException异常,Webclient的弊端在业务的做好容量规划,做好对应的垂直、水平扩展以及失败情况的fallback。

总结

WebClient性能非常优异,同样能够以少量而固定的线程数处理高并发的Http请求,在基于Http的服务间通信方面,完全可以取代RestTemplate以及AsyncRestTemplate。

参考:

单元测试最佳实践

背景 在近期的代码重构的过程中,遇到各式各样的问题,比如调整代码顺序导致的bug,方法重构参数校验逻辑的移除等。 在上线前需要花大量时间进行测试

如何编写优雅的Java代码

背景 随着软件代码日积月累,系统维护成本变得越来越高,是所有开发团队面临的问题。持续优化代码,提高代码质量是提升系统生命力有效手段之一。在编码
0%