vlambda博客
学习文章列表

WebClient-Reactor风格的异步调用

前言

正如一书中所述,Spring5中引入了与Web Servlet技术栈并行存在的,框架层面支持异步处理的Webflux技术栈。

利用Spring WebFlux可以实现服务端异步执行,另外其也提供了WebClient用来执行HTTP请求,用来实现客户端异步调用。

WebClient 提供了Reactor的功能性、流畅的API,支持多种异步逻辑处理的声明式编排,另外它是完全非阻塞的,并且支持流式传输。

WebClient的使用

首先我们创建一个简单的web服务器,用来作为webclient的服务端,概要代码如下:

public class Server { public static void main(String[] arg) throws IOException { // 创建 http 服务器, 绑定本地 8080 端口 HttpServer httpServer = HttpServer.create(new InetSocketAddress(8080), 0);
// 创建上下文监听, "/" 表示匹配所有 URI 请求 httpServer.createContext("/", new HttpHandler() { @Override public void handle(HttpExchange httpExchange) throws IOException { // 响应内容 byte[] respContents = "Hello World".getBytes("UTF-8"); // 设置响应头 httpExchange.getResponseHeaders().add("Content-Type", "text/html; charset=UTF-8"); httpExchange.sendResponseHeaders(200, respContents.length); // 模拟服务端执行耗时 try { Thread.sleep(10000); } catch (Exception e) {
} // 设置响应内容 httpExchange.getResponseBody().write(respContents); httpExchange.close(); } }); httpServer.start(); }}
  • 如上代码我们启动了一个Web服务端,服务端内部实现休眠10s用来模拟服务端执行逻辑,最后http 响应body内写回Hello World作为客户端响应结果。


下面我们来看如何使用Webclient来做客户端:

public class App { private static WebClient client;
public static void init() { //创建webclient实例 client = WebClient.builder(). build(); }
public static void main(String[] args) throws KsMerchantApiException, IOException {
//1.初始化webclient init();
//2.反应式调用,返回reactor流对象 Mono<String> resp = client .method(HttpMethod.GET) .uri("http://127.0.0.1:8080") .retrieve() .bodyToMono(String.class);
//3.订阅流对象 resp.onErrorMap(throwable -> { System.out.println("onErrorMap:" + throwable.getLocalizedMessage()); return throwable; }).subscribe(s -> System.out.println("result:" + Thread.currentThread().getName() + " " + s));
//4.调用线程打印 System.out.println("main is over");}
  • 如上代码1我们创建了一个WebClient对象

  • 代码2我们使用client发起了一个get请求,并且使用bodyToMono(String.class)返回反应式流对象。

  • 代码3 我们订阅流对象,并打印响应结果。

  • 代码4打印main执行结束了。


首先运行服务端,然后运行客户端,会发现客户端如下输出:

main is overresult:reactor-http-nio-1 Hello World


下面我们来分析下调用逻辑:

  • 客户端调用线程,执行代码2发起调用后会马上返回一个Mono对象,不会阻塞等待服务端写回响应结果;调用线程继续向下运行执行代码3订阅流结果,该过程不会阻塞调用线程,调用线程马上返回;最后调用线程执行代码2打印日志,最终调用线程执行结束。

  • 服务端结束到请求后,会休眠10s,休眠结束后,把结果写回客户端,客户端IO线程接受到服务端响应结果后,回调代码3设置的订阅回调函数,输出响应结果。

可知客户端调用线程在发起请求时没被阻塞,响应结果回来后也没阻塞,而是使用的IO线程来处理响应结果。可知webclient实现了。

WebClient的线程模型

  • 如上当调用线程使用webclient发起请求后,内部会先创建一个Mono响应对象,然后切换到IO线程具体发起网络请求。

  • 调用线程获取到Mono对象后,一般会订阅,也就是设置一个Consumer用来具体处理服务端响应结果。

  • 服务端接受请求后,进行处理,最后把结果写回客户端,客户端接受响应后,使用IO线程把结果设置到Mono对象,从而触发设置的Consumer回调函数的执行。

注:WebClient默认内部使用Netty实现http客户端调用,这里IO线程其实是netty的IO线程,而netty客户端的IO线程内是不建议做耗时操作的,因为IO线程是用来轮训注册到select上的channel的数据的,如果阻塞了,那么其他channel的读写请求就会得不到及时处理。所以如果consumer内逻辑比较耗时,建议从IO线程切换到其他线程来做。

那么如何切换那?可以使用publishOn把IO线程切换到自定义线程池进行处理:

resp.publishOn(Schedulers.elastic())//切换到Schedulers.elastic()对应的线程池进行处理 .onErrorMap(throwable -> { System.out.println("onErrorMap:" + throwable.getLocalizedMessage()); return throwable; }).subscribe(s -> System.out.println("result:" + Thread.currentThread().getName() + " " + s));


Reactor中Schedulers提供了几种内置实现:

  • Schedulers.elastic():线程池中的线程是可以复用的,按需创建与空闲回收,该调度器适用于 I/O 密集型任务。

  • Schedulers.parallel():含有固定个数的线程池,该调度器适用于计算密集型任务。

  • Schedulers.single():单一线程来执行任务

  • Schedulers.immediate():立刻使用调用线程来执行。

  • Schedulers.fromExecutor():使用已有的Executor转换为Scheduler来执行任务。

总结

异步非阻塞是未来,而结合反应式框架Reactor或Rxjava可以实现Reactor风格的异步编程,实现声明式编程模式,可以让我们编写的异步代码,可读性大大提高。



戳下面阅读

👇

  



点亮再看哦👇