WebClient-Reactor风格的异步调用
前言
正如一书中所述,Spring5中引入了与Web Servlet技术栈并行存在的,框架层面支持异步处理的Webflux技术栈。
利用Spring WebFlux可以实现服务端异步执行,另外其也提供了WebClient用来执行HTTP请求,用来实现客户端异步调用。
WebClient 提供了Reactor的功能性、流畅的API,支持多种异步逻辑处理的声明式编排,另外它是完全非阻塞的,并且支持流式传输。
WebClient的使用
首先我们创建一个简单的web服务器,用来作为webclient的服务端,概要代码如下:
public class Server {
public static void main(String[] arg) throws IOException {
// 创建 http 服务器, 绑定本地 8080 端口
HttpServer httpServer = HttpServer.create(new InetSocketAddress(8080), 0);
// 创建上下文监听, "/" 表示匹配所有 URI 请求
httpServer.createContext("/", new HttpHandler() {
public void handle(HttpExchange httpExchange) throws IOException {
// 响应内容
byte[] respContents = "Hello World".getBytes("UTF-8");
// 设置响应头
httpExchange.getResponseHeaders().add("Content-Type", "text/html; charset=UTF-8");
httpExchange.sendResponseHeaders(200, respContents.length);
// 模拟服务端执行耗时
try {
Thread.sleep(10000);
} catch (Exception e) {
}
// 设置响应内容
httpExchange.getResponseBody().write(respContents);
httpExchange.close();
}
});
httpServer.start();
}
}
如上代码我们启动了一个Web服务端,服务端内部实现休眠10s用来模拟服务端执行逻辑,最后http 响应body内写回Hello World作为客户端响应结果。
下面我们来看如何使用Webclient来做客户端:
public class App {
private static WebClient client;
public static void init() {
//创建webclient实例
client = WebClient.builder(). build();
}
public static void main(String[] args) throws KsMerchantApiException, IOException {
//1.初始化webclient
init();
//2.反应式调用,返回reactor流对象
Mono<String> resp = client
.method(HttpMethod.GET)
.uri("http://127.0.0.1:8080")
.retrieve()
.bodyToMono(String.class);
//3.订阅流对象
resp.onErrorMap(throwable -> {
System.out.println("onErrorMap:" + throwable.getLocalizedMessage());
return throwable;
}).subscribe(s -> System.out.println("result:" + Thread.currentThread().getName() + " " + s));
//4.调用线程打印
System.out.println("main is over");
}
如上代码1我们创建了一个WebClient对象
代码2我们使用client发起了一个get请求,并且使用bodyToMono(String.class)返回反应式流对象。
代码3 我们订阅流对象,并打印响应结果。
代码4打印main执行结束了。
首先运行服务端,然后运行客户端,会发现客户端如下输出:
main is over
result:reactor-http-nio-1 Hello World
下面我们来分析下调用逻辑:
客户端调用线程,执行代码2发起调用后会马上返回一个Mono对象,不会阻塞等待服务端写回响应结果;调用线程继续向下运行执行代码3订阅流结果,该过程不会阻塞调用线程,调用线程马上返回;最后调用线程执行代码2打印日志,最终调用线程执行结束。
服务端结束到请求后,会休眠10s,休眠结束后,把结果写回客户端,客户端IO线程接受到服务端响应结果后,回调代码3设置的订阅回调函数,输出响应结果。
可知客户端调用线程在发起请求时没被阻塞,响应结果回来后也没阻塞,而是使用的IO线程来处理响应结果。可知webclient实现了。
WebClient的线程模型
如上当调用线程使用webclient发起请求后,内部会先创建一个Mono响应对象,然后切换到IO线程具体发起网络请求。
调用线程获取到Mono对象后,一般会订阅,也就是设置一个Consumer用来具体处理服务端响应结果。
服务端接受请求后,进行处理,最后把结果写回客户端,客户端接受响应后,使用IO线程把结果设置到Mono对象,从而触发设置的Consumer回调函数的执行。
注:WebClient默认内部使用Netty实现http客户端调用,这里IO线程其实是netty的IO线程,而netty客户端的IO线程内是不建议做耗时操作的,因为IO线程是用来轮训注册到select上的channel的数据的,如果阻塞了,那么其他channel的读写请求就会得不到及时处理。所以如果consumer内逻辑比较耗时,建议从IO线程切换到其他线程来做。
那么如何切换那?可以使用publishOn把IO线程切换到自定义线程池进行处理:
resp.publishOn(Schedulers.elastic())//切换到Schedulers.elastic()对应的线程池进行处理
.onErrorMap(throwable -> {
System.out.println("onErrorMap:" + throwable.getLocalizedMessage());
return throwable;
}).subscribe(s -> System.out.println("result:" + Thread.currentThread().getName() + " " + s));
Reactor中Schedulers提供了几种内置实现:
Schedulers.elastic():线程池中的线程是可以复用的,按需创建与空闲回收,该调度器适用于 I/O 密集型任务。
Schedulers.parallel():含有固定个数的线程池,该调度器适用于计算密集型任务。
Schedulers.single():单一线程来执行任务
Schedulers.immediate():立刻使用调用线程来执行。
Schedulers.fromExecutor():使用已有的Executor转换为Scheduler来执行任务。
总结
异步非阻塞是未来,而结合反应式框架Reactor或Rxjava可以实现Reactor风格的异步编程,实现声明式编程模式,可以让我们编写的异步代码,可读性大大提高。
👇
点亮再看哦👇