WebClient性能分析与对比测试

背景
在Spring 5之前,如果我们想要调用其他系统提供的HTTP服务,我们通常可以使用Spring提供的RestTemplate来访问。RestTemplate用法很简单,但它的不足之处在于它的请求是同步阻塞模式,因此存在一定性能瓶颈,当然如果想要使用异步方式请求,也可以使用AsyncRestTemplate。
从Spring 5开始,Spring中全面引入了Reactive响应式编程,WebClient就属于Spring WebFlux 的一部分。WebClient的请求模式属于异步非阻塞、反应式的,能够以少量固定的线程处理高并发的HTTP 请求。
因此,从Spring 5 开始,HTTP服务之间的通信方式我们可以考虑使用WebCLient来取代之前的RestTemplate。
WebClient 详解
WebClient特点
webClient是一个功能完善的HTTP请求客户端,支持以下内容:
- 非阻塞I/O
- 反应流回压(即消费者负载过高时,主动反馈生产者放慢速度的机制)
- 具有高并发性,硬件资源消耗更少
- 流程的API设计
- 同步与异步交互
- 流式传输支持
WebClient 开发配置
1 添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
2 基础配置
WebClient 实例构造器可以设置一些基础的全局web请求配置信息,比如默认的cookie、header、baseUrl等。
WebClient.builder()
.defaultCookie("kl","kl")
.defaultUriVariables(ImmutableMap.of("name","kl"))
.defaultHeader("header","kl")
.defaultHeaders(httpHeaders -> {
httpHeaders.add("header1","kl");
httpHeaders.add("header2","kl");
})
.defaultCookies(cookie ->{
cookie.add("cookie1","kl");
cookie.add("cookie2","kl");
})
.baseUrl("http://www.compay.com")
.build();
3 底层依赖Netty库配置
通过配置Netty底层库,可以配置SSL安全链接、请求超时、读写超时等。
可以通过配置动态连接池,自定义突破这些默认配置。包括Netty的select线程和工作线程也可以自己设置。
- 最大连接数(maxConnections):ConnectionProvider连接池的最大连接数选项,默认为
ConnectionProvider.DEFAULT_POOL_MAX_CONNECTIONS,即:2 * 可用处理器数(但最小值为 16),#2052- Reactor Netty 提供了两种创建客户端的方式
HttpClient.create()- 这使用预定义ConnectionProvider的,它是这样创建的ConnectionProvider.create(name, 500)。这意味着500maxConnections(这是在构建器中明确指定的ConnectionProvider)和1000pendingAcquireMaxCount(这是ConnectionProvider构建器提供的默认值)HttpClient.create(ConnectionProvider)- 用户提供自定义ConnectionProvider- JavaDoc:https://projectreactor.io/docs/netty/release/api/reactor/netty/resources/ConnectionProvider.ConnectionPoolSpec.html#maxConnections-int-
- Reactor Netty 提供了两种创建客户端的方式
- 最大空闲时间(maxIdleTime):默认
ConnectionProvider.DEFAULT_POOL_MAX_IDLE_TIME,默认无空闲时间 - 最大存活时间(maxLifeTime):默认
ConnectionProvider.DEFAULT_POOL_MAX_LIFE_TIME - 等待获取时间(pendingAcquireTimeout):获取连接超时默认45000ms,即45秒
- 挂起的获取最大计数(pendingAcquireMaxCount):获取的最大注册请求数以保留在挂起队列中当使用 -1 调用时,挂起队列将没有上限。默认为
2 * max connections.
//最大连接数、最大空闲时间、最大存活时间、获取连接超时时间
//挂起的获取最大计数
//在后台被驱逐时间
ConnectionProvider provider = ConnectionProvider.builder("fixed")
.maxConnections(200)
.maxIdleTime(Duration.ofSeconds(60))
.maxLifeTime(Duration.ofSeconds(60))
.pendingAcquireTimeout(Duration.ofSeconds(60))
.pendingAcquireMaxCount(20000)
.evictInBackground(Duration.ofSeconds(120)).build();
//配置Netty select线程
HttpClient httpClient = HttpClient.create(provider)
.tcpConfiguration(tcpClient -> {
//指定Netty的select 和 work线程数量
LoopResources loop = LoopResources.create("kl-event-loop", 1, 4, true);
return tcpClient.doOnConnected(connection -> {
//读写超时设置
connection.addHandlerLast(new ReadTimeoutHandler(10, TimeUnit.SECONDS))
.addHandlerLast(new WriteTimeoutHandler(10));
})
//连接超时设置
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.option(ChannelOption.TCP_NODELAY, true)
.runOn(loop);
});
WebClient.builder().clientConnector(new ReactorClientHttpConnector(httpClient).build();
WebClient 使用上常见问题
1 PoolAcquirePendingLimitException
Pending acquire queue has reached its maximum size of 1000; nested exception is reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException: Pending acquire queue has reached its maximum size of 1000
WebClient需要一个HTTP客户端库来执行请求。 默认情况下,它使用Reactor Netty,客户端使用一个“固定”连接池,其中500个是活动通道的最大数量,1000个是允许保持在挂起状态的进一步通道获取尝试的最大数量。
只要创建的通道少于500个,并且由池管理,那么实现就会创建一个新通道。 当达到池中通道的最大数量时,最多会延迟(挂起)1000次获取通道的新尝试,直到通道再次返回到池中,并且会拒绝进一步尝试,并出现错误。
你有两个选择来解决这个问题,垂直扩展:增加连接池大小和/或获取队列长度,水平扩展:创建应用程序的其他实例。
1.1 垂直扩展:增加连接池大小和/或获取队列长度
ConnectionProvider connectionProvider = ConnectionProvider.builder("myConnectionPool")
.maxConnections(<your_desired_max_connections>)
.pendingAcquireMaxCount(<your_desired_pending_queue_size>)
.build();
ReactorClientHttpConnector clientHttpConnector = new ReactorClientHttpConnector(HttpClient.create(connectionProvider));
WebClient.builder()
.clientConnector(clientHttpConnector)
.build();
1.2 水平扩展:创建应用程序的其他实例,并在实例之间平衡api调用的负载。
在计算连接池的大小时,值得考虑下游api调用的延迟。
connection_pool_size=tps * downstream_api_latency
tps(每秒事务数)
2 Connection reset by peer exception
Connection reset by peer exception
由于没有设置合理的连接池抛出的异常信息。
https://github.com/reactor/reactor-netty/issues/1774
构建HTTP请求示例
1 GET请求示例
URI构造时支持属性占位符,真实参数在入参时排序好就可以。同时可以通过accept设置媒体类型,以及编码。最终的结果值是通过Mono和Flux来接收的,在subscribe方法中订阅返回值。
WebClient client = WebClient.create("http://www.compay.com");
Mono<String> result = client.get()
.uri("/book/{id}.html", 123)
.acceptCharset(StandardCharsets.UTF_8)
.accept(MediaType.TEXT_HTML)
.retrieve()
.bodyToMono(String.class);
result.subscribe(System.err::println);
如果需要携带复杂的查询参数,可以通过UriComponentsBuilder构造出URL请求地址,如:
//定义query参数
MultiValueMap<String, String> params = new LinkedMultiValueMap<>();
params.add("name", "daybreak");
//定义url参数
Map<String, Object> uriVariables = new HashMap<>();
uriVariables.put("id", 200);
String uri = UriComponentsBuilder.fromUriString("/book/{id}.html")
.queryParams(params)
.uriVariables(uriVariables)
.toUriString();
2 POST请求示例
POST请求示例演示了一个同时包含表单参数和文件流数据
WebClient client = WebClient.create("http://www.compay.com");
FormInserter formInserter = fromMultipartData("name","daybreak")
.with("map",ImmutableMap.of("xx","xx"))
.with("file",new File("d://xxx.doc"));
Mono<String> result = client.post().uri("/book/{id}.html", 123)
.contentType(MediaType.APPLICATION_JSON)
.body(formInserter)
.retrieve()
.bodyToMono(String.class);
result.subscribe(System.err::println);
普通POST请求,直接通过bodyValue设置对象实例即可。不用FormInserter构造
WebClient client = WebClient.create("http://www.compay.com");
Mono<String> result = client.post().uri("/book/{id}.html", 123)
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(ImmutableMap.of("name","daybreak"))
.retrieve()
.bodyToMono(String.class);
result.subscribe(System.err::println);
3 同步返回结果
WebClient client = WebClient.create("http://www.compay.com");
String result = client .get().uri("/book/{id}.html", 123)
.retrieve()
.bodyToMono(String.class)
.block();
System.err.println(result);
4 异常处理
可以使用OnStatus根据status code进行异常适配。
可以使用doOnError异常适配
可以使用onErrorReturn返回指定默认返回值
WebClient webClient = WebClient.builder().baseUrl("https://api.compay.com")
.defaultHeader(HttpHeaders.CONTENT_TYPE, "application/json")
.defaultHeader(HttpHeaders.USER_AGENT, "Spring 5 WebClient")
.build();
WebClient.ResponseSpec responseSpec = webClient.method(HttpMethod.GET)
.uri("/user/list?sort={sortField}", "updated")
.retrieve();
Mono<String> mono = responseSpec
.onStatus(e -> e.is4xxClientError(),resp -> {
LOGGER.error("error:{},msg:{}",resp.statusCode().value(),resp.statusCode().getReasonPhrase());
return Mono.error(new RuntimeException(resp.statusCode().value() + " : " + resp.statusCode().getReasonPhrase()));
})
.bodyToMono(String.class)
.doOnError(WebClientResponseException.class, err -> {
LOGGER.info("ERROR status:{},msg:{}",err.getRawStatusCode(),err.getResponseBodyAsString());
throw new RuntimeException(err.getMessage());
})
.onErrorReturn("fallback");
String result = mono.block();
LOGGER.info("result:{}",result);
性能对比测试
接下来我们进行性能对比测试。
在某个业务场景中我们使用了RestTemplate,借此分析下WebClient 与 RestTemplate 两者的性能实际表现,通过实际的案例性能对比,看看WebClient 是否有如此优秀。
业务场景说明
在我们的业务场景中,是一个异步处理的场景。当完成某项业务后,进行HTTP异步通知处理。这个也是在业务开发中比较场景的一种场景。
测试方案设计
延迟的负载测试分析
搭建待测试项目
private AtomicInteger metrics = new AtomicInteger(0);
@RequestMapping(value = "/check", method = RequestMethod.GET)
@ResponseBody
public String check() throws InterruptedException {
Thread.sleep(100L);
return "success_" + metrics.incrementAndGet();
}
Caller设计
1 ThreadPoolTaskExecutor + RestTemplate
通过线程组组合 RestTemplate 实现异步HTTP请求。
private static final int poolSize = Math.max(Runtime.getRuntime().availableProcessors(), 8) * 2;
private static final int queueSize = 20000;
@Bean
public TaskExecutor listenerExecutor() {
ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
pool.setCorePoolSize(poolSize);
pool.setMaxPoolSize(poolSize);
pool.setQueueCapacity(queueSize);
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
pool.setWaitForTasksToCompleteOnShutdown(true);
pool.setAwaitTerminationSeconds(60);
return pool;
}
- corePoolSize:Math.max(Runtime.getRuntime().availableProcessors(), 8) * 2
- maxPoolSize:Math.max(Runtime.getRuntime().availableProcessors(), 8) * 2
- queueCapacity:20000
- rejectedExecutionHandler:CallerRunsPolicy
2 AsyncRestTemplate
原官方的线程组+RestTemplate方法,在Spring5.0以及废弃,替换换WebClient。
@Bean
public AsyncRestTemplate asyncRestTemplate() {
SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
factory.setConnectTimeout(100);
factory.setReadTimeout(200);
ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
int poolSize = Math.max(Runtime.getRuntime().availableProcessors(), 8) * 2;
pool.setCorePoolSize(poolSize);
pool.setMaxPoolSize(poolSize);
pool.setQueueCapacity(20000);
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
pool.setWaitForTasksToCompleteOnShutdown(true);
pool.setAwaitTerminationSeconds(60);
pool.initialize();
factory.setTaskExecutor(pool);
return new AsyncRestTemplate(factory);
}
- corePoolSize:Math.max(Runtime.getRuntime().availableProcessors(), 8) * 2
- maxPoolSize:Math.max(Runtime.getRuntime().availableProcessors(), 8) * 2
- queueCapacity:20000
- rejectedExecutionHandler:CallerRunsPolicy
WebClient:异步非阻塞、反应式的,能够以少量固定的线程处理高并发的HTTP 请求。
@Bean
public WebClient webClient(){
ConnectionProvider provider = ConnectionProvider.builder("fixed")
.maxConnections(200)
.maxIdleTime(Duration.ofSeconds(60))
.maxLifeTime(Duration.ofSeconds(60))
.pendingAcquireTimeout(Duration.ofSeconds(60))
.pendingAcquireMaxCount(1000).build();
return WebClient.builder().clientConnector(new ReactorClientHttpConnector(HttpClient.create(provider))).build();
}
- 最大连接数(maxConnections):200
- 最大空闲时间(maxIdleTime):60s
- 最大存活时间(maxLifeTime):60s
- 等待获取时间(pendingAcquireTimeout):60s
- 挂起的获取最大计数(pendingAcquireMaxCount):1000
测试结果分析
1 ThreadPoolTaskExecutor + RestTemplate
| 用户量 | 线程数 | 吞吐量(req/sec) | 95%(ms) | 延迟完成(min) | 堆内存(Mb) | CPU占用率 |
|---|---|---|---|---|---|---|
| 100 | 44 | 168 | 1 | 0 | 100 | 3 |
| 200 | 42 | 334 | 1 | 1 | 100 | 3 |
| 400 | 92 | 602 | 115 | 1 | 100 | 3 |
| 800 | 232 | 972 | 533 | 2 | 100 | 3 |
| 1200 | 232 | 1007 | 1175 | 2 | 150 | 4 |
| 1600 | 232 | 1003 | 1683 | 3 | 200 | 5 |

2 AsyncRestTemplate
| 用户量 | 线程数 | 吞吐量(req/sec) | 95%(ms) | 延迟完成(min) | 堆内存(Mb) | CPU占用率 |
|---|---|---|---|---|---|---|
| 100 | 47 | 168 | 1 | 0 | 200 | 3 |
| 200 | 37 | 334 | 1 | 2 | 300-500 | 3 |
| 400 | 93 | 603 | 111 | 3 | 400-500 | 5 |
| 800 | 169 | 981 | 127 | 3 | 1000 | 5 |
| 1200 | 235 | 1633 | 215 | 3 | 1000 | 5 |
| 1600 | 235 | 1869 | 417 | 3 | 1000 | 5 |

3 Webclient
| 用户量 | 线程数 | 吞吐量(req/sec) | 95%(ms) | 延迟完成(min) | 堆内存(Mb) | CPU占用率 | 信息 |
|---|---|---|---|---|---|---|---|
| 100 | 38 | 168 | 1 | 0 | 100 | 2 | |
| 200 | 37 | 335 | 1 | 0 | 100 | 2 | |
| 400 | 37 | 667 | 1 | 0 | 100 | 2 | |
| 800 | 43 | 1334 | 1 | 0 | 100 | 3 | |
| 1200 | 50 | 2000 | 1 | 0 | 110 | 4 | |
| 1400 | 43 | 2326 | 1 | 0 | 120 | 5 | |
| 1600 | 51 | 2632 | 1 | 0 | 130 | 5 | PoolAcquirePendingLimitException |
| 1800 | 44 | 3031 | 1 | 0 | 150 | 6 | PoolAcquirePendingLimitException |
| 2000 | 48 | 3334 | 1 | 0 | 200 | 7 | PoolAcquirePendingLimitException |
| 2400 | 51 | 4000 | 1 | 0 | 250 | 8 | PoolAcquirePendingLimitException |
| 3000 | 54 | 4994 | 1 | 0 | 500 | 10 | PoolAcquirePendingLimitException |
| 4000 | 83 | 6666 | 1 | 0 | 800 | 15 | PoolAcquirePendingLimitException |
| 5000 | 130 | 8308 | 1 | 0 | 1000 | 20 | PoolAcquirePendingLimitException |


压测结论
1.ThreadPoolTaskExecutor + RestTemplate,用户并发量在800,吞吐量可以达到972/s,但是tomcat线程被打满,已经出现了延迟等待的现象(由于使用的是CallerRunsPolicy策略)。
2.AsyncRestTemplate,用户并发量在800,吞吐量981/s,线程数169基本也进入CallerRunsPolicy策略。
3.Webclient,用户并发量在1400,吞吐量2326/s,线程数、内存、cpu都相对问题,由于Webclient是异步非阻塞的,不能像线程池一样设置执行CallerRunsPolicy策略,当用户并发量1600达到性能瓶颈,开始触发PoolAcquirePendingLimitException异常,Webclient的弊端在业务的做好容量规划,做好对应的垂直、水平扩展以及失败情况的fallback。
总结
WebClient性能非常优异,同样能够以少量而固定的线程数处理高并发的Http请求,在基于Http的服务间通信方面,完全可以取代RestTemplate以及AsyncRestTemplate。
参考:
