vlambda博客
学习文章列表

读书笔记《hands-on-reactive-programming-in-spring-5》WebFlux 异步非阻塞通信

第 6 章 WebFlux 异步非阻塞通信

在上一章中,我们开始了解 Spring Boot 2.x。我们看到 很多有用的更新和模块已经到了 Spring 框架的第五版,我们还查看了 Spring WebFlux 模块。

在本章中,我们将详细了解该模块。我们将把 WebFlux 的内部设计与优秀的旧 Web MVC 进行比较,并尝试了解两者的优缺点。我们还将使用 WebFlux 构建一个简单的 Web 应用程序。

本章涵盖以下主题:

  • A bird's-eye view of Spring WebFlux
  • Spring WebFlux versus Spring Web MVC
  • A comprehensive design overview of Spring WebFlux

WebFlux 作为中央反应式服务器基础


正如我们第1章中看到的a>, 为什么选择 Reactive Spring?,以及 第 4 章Project Reactor - Reactive Apps 的基础,应用服务器的新时代为开发者带来了新技术。从 Spring Framework 在 Web 应用领域发展之初,就决定 将 Spring Web 模块与Java EE 的Servlet API 集成。 Spring 框架的整个基础架构都是围绕 Servlet API 构建的,并且它们是紧密耦合的。例如,整个 Spring Web MVC 都是基于 Front Controller pattern. 该模式在 Spring Web MVC 中由 org.springframework.web.servlet 实现.DispatcherServlet 类,它间接扩展了 javax.servlet.http.HttpServlet 类。

另一方面,Spring 框架确实为我们提供了更好的抽象层次 Spring Web 模块,它是许多功能的构建块,例如 注解驱动的控制器。 尽管这个模块部分地将公共接口与其实现分开,但最初的  ;Spring Web 的设计也是基于同步交互模型,因此阻塞了 IO。 尽管如此,这种分离是一个很好的基础,所以在继续研究响应式 Web 之前,让我们回顾一下Web 模块的设计并尝试了解这里发生了什么:

读书笔记《hands-on-reactive-programming-in-spring-5》WebFlux 异步非阻塞通信

图 6.1。 Spring WebMVC模块中web栈的实现

 

following 是对上图的描述:

  1. 传入的请求由底层 Servlet Container 处理。在这里,Servlet Container 负责将传入的 body 转换为 servlet API 的ServletRequest 接口,并以 ServletResponse< 的形式准备输出/代码>接口。
  2. 通过过滤器过滤ServletRequest的阶段结合在 FilterChain中。
  3. 下一个阶段是 DispatcherServlet 处理阶段。请记住 DispatcherServlet 扩展了 Servlet 类。它还包含 HandlerMappings (4)的列表, HandlerAdapters (5) 和 ViewResolvers (未在架构中描述)。 在当前执行流程的上下文中,DispatcherServlet 类负责搜索一个 HandlerMapping 实例并使用合适的 HandlerAdapter. 对其进行调整。然后它会搜索 ViewResolver  ;可以解析 View 这样 DispatcherServlet 启动渲染结果的 HandlerMapping 和 HandlerAdapter 执行。
  4. 之后,我们有了HandlerMapping阶段。 DispatcherServlet (3) 搜索所有 HandlerMapping< /code> 应用程序上下文中的bean。在映射的初始化过程中,扫描过程中找到的所有实例都按顺序排序。订单号由@Order 注解指定,或者在 HandlerMapping 实现 有序界面。因此,查找合适的 HandlerMapping instances 取决于之前设置的顺序。在上图中,描述了一些常见的 HandlerMapping 实例。最熟悉的是RequestMappingHandlerMapping,它启用了基于注解的编程模型。
  5. 最后,我们有 RequestMappingHandlerAdapter 阶段,它负责将传入的ServletRequest正确绑定到@Controller 注释对象。 RequestMappingHandlerAdapter 还提供请求验证、响应转换和许多其他有用的功能,使 Spring Web MVC 框架在日常 Web 开发中非常有用。

我们可能已经注意到,整体设计依赖于底层 servlet 容器,该容器负责处理容器内所有映射的 servlet。 DispatchServlet 充当灵活且高度可配置的 Spring Web 基础架构与繁重而复杂的 Servlet API 之间的集成点。 HandlerMapping 的可配置抽象有助于从 Servlet API 中分离出最终的业务逻辑,例如控制器和 bean。

 

笔记

Spring MVC 支持与 HttpServletRequestHttpServletResponse 的直接交互,以及映射、绑定和验证功能。但是,当使用这些类时,我们对 Servlet API 有额外的直接依赖。这可能被认为是不好的做法,因为它可能会使从 Web MVC 到 WebFlux 或 Spring 的任何其他 Web 扩展的迁移过程复杂化。建议使用org.springframework.http.RequestEntityorg.springframework.http.ResponseEntity代替。这些类将请求和响应对象与 Web 服务器实现隔离开来。

Spring Web MVC 方法多年来一直是一种方便的编程模型。它已被证明是 Web 应用程序开发的坚实稳定的骨架。这就是为什么在 2003 年,Spring 框架开始成为最流行的在 Servlet API 之上构建 Web 应用程序的解决方案之一。 但是,过去的方法和技术并不适合满足现代数据密集型系统的要求。

尽管 Servlet API 支持 异步、非阻塞通信(从 3.1 版开始),但 Spring 的实现MVC 模块有很多空白,不允许在整个请求生命周期中进行非阻塞操作。例如,没有开箱即用的非阻塞 HTTP 客户端,因此任何外部交互 很可能会导致阻塞 IO 调用。如第 5 章中所述, 使用 Spring Boot 2 实现响应式,Web MVC 抽象并不支持非阻塞 Servlet API 3.1 的所有特性。在此之前,不能将 Spring Web MVC 视为高负载项目的框架。旧 Spring 中 Web 抽象的另一个缺点是,对于 Netty 等非 servlet 服务器,无法灵活地重用 Spring Web 功能或编程模型。

这就是为什么 Spring Framework 团队在过去几年中面临的主要挑战是构建一个新的解决方案,该解决方案允许相同的基于注释的编程模型,并同时提供异步非阻塞服务器的所有好处。

反应式网络核心

假设我们正在为新的 Spring 生态系统开发新的 asynchronous 非阻塞 Web 模块。新的响应式 Web 堆栈应该是什么样子?首先,让我们分析现有的解决方案并突出显示应该增强或消除的部分。

需要注意的是,总的来说,Spring MVC 的内部 API 是经过精心设计的。唯一应该添加到 API 的是对 Servlet API 的直接依赖。因此,最终的解决方案应该具有与 Servlet API 类似的接口。设计响应式堆栈的第一步是将 javax.servlet.Servlet#service 替换为类比接口和响应传入请求的方法。我们还必须更改相关的接口和类。 Servlet API 交换客户端请求以获取服务器响应的方式也应该得到增强和定制。

虽然我们自己的 API 的引入让我们与服务器引擎和具体的 API 解耦,但它并不能帮助我们建立反应式通信。因此,所有新接口都应该提供对所有数据的访问,例如响应式格式的请求正文和会话。正如我们从前几章中了解到的,Reactive Streams 模型允许根据数据的可用性和需求与数据进行交互和处理。由于 Project Reactor 遵循 Reactive Streams 标准并从功能的角度提供了广泛的 API,因此它可能是在其之上构建所有响应式 Web API 的合适工具。

最后,如果我们在实际实现中结合这些东西,我们会得出以下代码:

interface ServerHttpRequest {                                      // (1) 
...                                                             //
   Flux<DataBuffer> getBody();                                     // (1.1)
...                                                             //
}                                                                  //

interface ServerHttpResponse {                                     // (2)
   ...                                                             //
   Mono<Void> writeWith(Publisher<? extends DataBuffer> body);     // (2.1)
   ...                                                             //
}                                                                  //

interface ServerWebExchange {                                      // (3)
   ...                                                             // 
ServerHttpRequest getRequest();                                 // (3.1)
ServerHttpResponse getResponse();                               // (3.2)
...                                                             //
   Mono<WebSession> getSession();                                  // (3.3)
...                                                             //
}                                                                  //

 

前面的代码可以解释如下:

  1. 这是表示传入消息的接口草稿。正如我们所见,在 (1.1) 点,提供对传入字节的访问权限的中央抽象是 Flux,这意味着根据定义,它具有响应式访问。 我们可能还记得 第 5 章, 使用 Spring Boot 2 实现响应式字节缓冲区有一个有用的抽象,即 DataBuffer。这是一种与特定服务器实现交换数据的便捷方式。 除了请求的主体外,任何 HTTP 请求通常都包含有关传入标头、路径、cookie 和查询参数的信息,因此 信息 可以在该接口或其子接口中表示为单独的方法。
  2. 这是响应接口的草稿,是 ServerHttpRequest 接口的配套接口。如点 (2.1) 所示,与 ServerHttpRequest#getBody 方法不同,ServerHttpResponse# writeWith 方法 接受任何Publisher 类。在这种情况下,Publisher 响应式类型为我们提供了更大的灵活性,并与特定的响应式库分离。因此,我们可以使用接口的任何实现,并将业务逻辑与框架解耦。 方法返回 Mono ,表示异步发送过程数据到网络。这里重要的一点是,发送数据的过程只有在我们订阅给定的 Mono 时才会执行。此外,接收服务器可以控制背压,这取决于传输协议的控制流。
  3. 这是 ServerWebExchange 接口声明。在这里,接口充当 HTTP 请求-响应实例的容器(在 3.13.2)。该接口是基础结构的,并且与 HTTP 交互一样,可能包含与框架相关的信息。例如,它可能包含有关从传入请求恢复的 WebSession 的信息,如点 (3.3) 所示。或者,它可以在请求和响应接口之上提供额外的基础设施方法。

在前面的示例中,我们为我们的响应式 Web 堆栈起草了潜在的接口。一般来说, 这三个接口与我们在 Servlet API 中的接口相似。例如, ServerHttpRequest 和 ServerHttpResponse 可能会让我们想起 ServletRequest< /code> 和 ServletResponse。从本质上讲,响应式对应物旨在从交互模型的角度提供几乎相同的方法。然而,由于 Reactive Streams 的异步和非阻塞特性,我们有一个开箱即用的流式传输基础,并且可以防止复杂的基于回调的 API。这也保护我们免受回调地狱的影响。

除了中央接口之外,为了完成整个交互流程,我们必须定义请求-响应处理程序和过滤器 API,可能如下所示:

interface WebHandler {                                             // (1)
   Mono<Void> handle(ServerWebExchange exchange);                  //  
}                                                                  //

interface WebFilterChain {                                         // (2)
   Mono<Void> filter(ServerWebExchange exchange);                  //  
}                                                                  //

interface WebFilter {                                              // (3)
   Mono<Void> filter(ServerWebExchange exch, WebFilterChain chain);// 
}                                                                  //

上述代码中编号的部分可以描述如下:

  1. 这是任何 HTTP 交互的中心入口点,称为 WebHandler . 在这里,我们的接口扮演了一个抽象的角色 DispatcherServlet, 所以我们可以在它之上构建任何实现。由于接口的职责是找到请求的处理程序,然后将其与将执行结果写入 ServerHttpResponse 的视图渲染器进行组合,因此 DispatcheServlet #handle 方法不必返回任何结果。但是,在处理完成时收到通知可能很有用。 通过依赖这样的通知信号,我们可以应用处理超时,这样,如果在指定的持续时间内没有信号出现,我们可以取消执行。为此,该方法从Void返回 Mono,允许等待异步处理完成而不必处理结果。 
  2. 这是允许将几个WebFilter instances (3) 连接成一个链的接口,类似于Servlet API。
  3. 这是响应式 Filter 表示。

前面的接口为我们提供了一个基础,我们可以在此基础上开始为框架的其余部分构建业务逻辑。

我们几乎已经完成了响应式 Web 基础架构的基本元素。 为了完成抽象层次结构,我们的设计需要用于响应式 HTTP 请求处理的最低级合约。由于我们 以前 只定义了负责数据传输和处理的接口,所以我们必须定义一个接口来负责使服务器引擎适应定义的基础设施。为此,我们需要一个额外的抽象层来负责与 ServerHttpRequest 和 ServerHttpResponse的直接交互.

此外,这一层应该负责构建 ServerWebExchange。 特定的 Session 存储、Locale 解析器和类似的基础设施 在这里举行:

public interface HttpHandler {
    Mono<Void> handle(
      ServerHttpRequest request,
      ServerHttpResponse response);
}

最后,对于每个服务器引擎,我们可能有一个调用中间件的 HttpHandler 的适配,然后它组成给定的 ServerHttpResponseServerHttpRequest 到 ServerWebExchange并将其传递给 WebFilterChain和 WebHandler. 有了这样的设计,对于 Spring WebFlux 用户来说,特定的服务器引擎是如何工作的并不重要,因为我们 现在 有一个适当的抽象级别来隐藏服务器引擎。我们现在可以继续下一步并构建高级响应式抽象。

反应式 Web 和 MVC 框架

我们可能还记得,Spring Web MVC " class="indexterm"> 模块是它的基于注解的编程模型。因此,核心挑战是为响应式 Web 堆栈提供相同的概念。如果我们查看当前的 Spring Web MVC 模块,我们可以看到,总的来说,该模块设计得当。除了构建新的响应式 MVC 基础架构,我们可以重用现有的基础架构,并用响应式类型替换同步通信,例如 FluxMono,以及 Publisher。例如,用于映射请求和将上下文信息(例如标头、查询参数、属性和会话)绑定到已找到的处理程序的两个中心接口是HandlerMappingHandlerAdapter。一般来说,我们可以保持与 Spring Web MVC 中相同的 HandlerMappingHandlerAdapter 链,但是将 eager imperative 替换为使用 Reactor 的 types 进行反应式交互:

interface HandlerMapping {                                         // (1)
/* HandlerExecutionChain getHandler(HttpServletRequest request) */ // (1.1)
   Mono<Object>          getHandler(ServerWebExchange exchange);   // (1.2)
}                                                                  //

interface HandlerAdapter {                                         // (2)
   boolean supports(Object handler);                               //
                                                                   //
/* ModelAndView        handle(                                     // (2.1)
      HttpServletRequest request, HttpServletResponse response,    //
      Object handler                                               //
   ) */                                                          //
Mono<HandlerResult> handle(                                     // (2.2)
      ServerWebExchange exchange,                                  //
      Object handler                                               //
   );                                                              //
}                                                                  //

前面的代码在下面的编号列表中进行了解释:

  1. 这是响应式 HandlerMapping 接口的声明。在这里,为了突出旧的 Web MVC 实现和改进的 Web MVC 实现之间的区别, 代码包含两种方法的声明。旧的实现,在 (1.1) 处,用 /* ... */ 注释,并且 使用 < span class="emphasis">italic 字体样式,而新界面在 (1.2) 处突出显示bold。我们可以看到,总的来说,这些方法非常相似。不同之处在于最后一个返回 Mono 类型,因此启用反应行为。
  2. 这是响应式 HandlerAdapter 接口版本。正如我们在这里看到的,handle 方法的响应式版本更加简洁,因为 ServerWebExchange 类结合了请求和同时响应实例。在(2.2)点,方法返回HandlerResultMono而不是 ModelAndView(位于 2.1)。我们可能还记得,ModelAndView 负责提供状态码、Model“查看”HandlerResult class 包含相同的信息,除了状态码。 HandlerResult 更好,因为它提供了直接执行的结果,所以DispatcherHandler更容易找到处理程序。 在Web MVC中,查看  负责渲染模板和对象。它也渲染结果,所以它在 Web MVC 中的用途可能有点不清楚。 不幸的是,这样的多重职责不能轻易地适应异步结果处理。在这种情况下,当结果是一个普通的 Java 对象时,View 查找是在 HandlerAdapter 中完成的,而不是该类的直接责任。正因为如此,最好保持责任明确,所以前面代码中实现的更改是一种改进。

遵循这些步骤将为我们提供一个反应式交互模型,而不会破坏整个执行层次结构,从而保留现有设计并可能重用现有代码,而只需进行最少的更改。 

最后,通过收集我们迄今为止为实现响应式 Web 堆栈和纠正请求的处理流程所采取的所有步骤,并考虑到实际实现,我们将提出以下设计:

读书笔记《hands-on-reactive-programming-in-spring-5》WebFlux 异步非阻塞通信

图 6.2。重新设计的响应式 Web 和 MVC 堆栈

 

 

上图可以解释如下:

  1. 这是传入的请求,由底层 服务器引擎处理。正如我们所看到的,服务器引擎列表不仅限于基于 Servlet API 的服务器,现在 包括 NettyUndertow。在这里,每个服务器引擎都有自己的响应式适配器,它将 HTTP 请求和 HTTP 响应的内部表示映射到  ServerHttpRequestServerHttpResponse
  2. 这是 HttpHandler阶段,它组成了给定的ServerHttpRequest, ServerHttpResponse ,用户的 Session 和相关信息到一个 ServerWebExchage 实例中。
  3. 至此,我们有了WebFilterChain stage,它将定义好的WebFilter 组成链。然后,WebFilterChain负责执行 WebFilter#filter 每个WebFilter的方法 这个链中的实例,为了过滤传入的 ServerWebExchange
  4. 如果满足所有过滤条件, WebFilterChain 调用 WebHandler instance。
  5. 下一步是查找 HandlerMapping 的实例并调用第一个合适的实例。在这个例子中,我们描述了一些 HandlerMapping 实例,例如 RouterFunctionMapping,众所周知的 RequestMappingHandlerMapping,以及 HandlerMapping 资源。新的 HandlerMapping 这里的实例是 RouterFunctionMapping,它是在 WebFlux 模块中引入的,它超越了纯粹的功能请求处理。我们不会在这里详细介绍该功能;我们将在下一节中介绍这一点。
  6. 这是 RequestMappingHandlerAdapter 阶段,它具有与以前相同的功能,但现在使用反应式流来构建反应式交互流。

上图仅描述了 WebFlux 模块中底层交互流的简化视图。需要注意的是,在 WebFlux 模块中,默认的服务器引擎是 Netty。 Netty 服务器是一个合适的默认值,因为它广泛用于响应式空间。此外,该服务器引擎提供客户端和服务器异步非阻塞交互。这意味着它更适合 Spring WebFlux 提供的反应式编程范式。尽管 Netty 是一个很好的默认服务器引擎,但使用 WebFlux,我们可以灵活地选择我们的服务器引擎,这意味着我们可以轻松地在各种现代服务器引擎之间切换,例如 Undertow、Tomcat、Jetty 或任何其他基于 Servlet API如我们所见,WebFlux 模块反映了 Spring Web MVC 模块的架构,因此对于有旧 Web 框架经验的人来说很容易理解。此外,Spring Web Flux 模块还有很多隐藏的宝石, 这将在以下部分中介绍。

使用 WebFlux 的纯功能 Web

正如我们在preceding 图中可能已经注意到,虽然它与Web MVC 有很多相似之处,但WebFlux 也提供了很多 在微型微服务、Amazon Lambda 和类似云服务的时代,提供允许开发人员创建具有几乎相同框架库的轻量级应用程序的功能非常重要 特性。 使竞争对手框架(如 Vert.x 或 Ratpack)更具吸引力的特性之一是它们能够生成轻量级应用程序,这是通过功能路由映射和允许我们编写复杂的请求路由逻辑的内置 API 来实现。这就是为什么 Spring Framework 团队决定将此功能合并到 WebFlux 模块中的原因。此外,纯函数式路由的组合与新的反应式编程方法充分匹配。例如,让我们看看如何使用新的函数式方法构建复杂的路由:

import static ...RouterFunctions.nest;                             // (1)
import static ...RouterFunctions.nest;                             //
import static ...RouterFunctions.route;                            //
...
import static ...RequestPredicates.GET;                            // (2)
import static ...RequestPredicates.POST;                           //
import static ...RequestPredicates.accept;                         //
import static ...RequestPredicates.contentType;                    //
import static ...RequestPredicates.method;                         //
import static ...RequestPredicates.path;                           //

@SpringBootApplication                                             // (3)
public class DemoApplication {                                     //
   ...                                                             
@Bean                                                           
public RouterFunction<ServerResponse> routes(                   // (4)
      OrderHandler handler                                         // (4.1)
   ) {                                                             //
return 
nest(path("/orders"),                                     // (5)
nest(accept(APPLICATION_JSON),                         //
route(GET("/{id}"), handler::get)                   //
               .andRoute(method(HttpMethod.GET), handler::list)    //
            )                                                      //
            .andNest(contentType(APPLICATION_JSON),                //
route(POST("/"), handler::create)                   //
            )                                                      //
         );
}
}

前面的代码可以解释如下:

  1. 这是从 RouterFunctions 类的静态导入声明。正如我们所见, RouterFunctions 类 提供了一个广泛的工厂方法列表,它们返回 RouterFunction 接口具有不同的行为。
  2. 这是 RequestPredicates 类的静态导入声明。从前面的代码中我们可以看到, RequestPredicates 类允许从不同的角度检查传入的请求。通常,RequestPredicates 提供对 RequestPredicate 接口的不同实现的访问,它是一个函数式接口和可以很容易地扩展,以便以自定义方式验证传入的请求。
  3. 这是 Spring Boot 应用程序的常见声明,其类使用  @SpringBootApplication 注释。
  4. 这是一个初始化 RouterFunction<ServerResponse> bean 的方法声明。在此示例中,该方法在应用程序的引导期间被调用。
  5. 这是 RouterFunction的声明,在RouterFunctionsRequestPredicates<的支持下表达/代码> API。

preceding 示例中,我们使用了 declaring 应用程序的 Web API。该技术为处理程序声明提供了一种功能方法,并允许我们将所有路由明确定义在一个地方。此外, API (例如之前使用的 API)允许我们轻松编写自己的请求谓词 。例如,以下代码展示了如何实现自定义RequestPredicate 并将其应用于路由逻辑:

nest((serverRequest) -> serverRequest.cookies()
                                     .containsKey("Redirect-Traffic"),
route(all(), serverRedirectHandler)       
)

在前面的示例中,我们创建了一个小的 RouterFunction,它 将流量重定向到另一个服务器 如果"Redirect-Traffic" cookie 存在。

新的功能网络还引入了一种处理请求和响应的新方法。例如,以下代码示例显示了 OrderHandler 实现的一部分:

class OrderHandler {                                               // (1)
final OrderRepository orderRepository;                          //
   ...                                                             
public Mono<ServerResponse> create(ServerRequest request) {     // (2)
return request                                               // 
.bodyToMono(Order.class)                                  // (2.1)
         .flatMap(orderRepository::save)                           //
         .flatMap(o ->                                             //
ServerResponse.created(URI.create("/orders/" + o.id))  // (2.2)
                          .build()                                 //
         );                                                        //
}                                                              //
    ...                                                            //
}                                                                  //

上述代码可以描述如下:

  1. 这是 OrderHandler 类声明。在此示例中,我们跳过构造函数声明,以便 专注于功能路由的 API。
  2. 这是 create 方法声明。正如我们所见,该方法接受 ServerRequest,这是特定于功能路由请求类型的。正如我们在 (2.1) 处看到的,ServerRequest 暴露了 API,允许手动映射请求体MonoFlux。此外,API 允许我们指定请求正文应映射到的类。最后,WebFlux 中的功能添加提供了一个 API,允许我们使用 ServerResponse 类的 fluent API 构建响应。

正如我们所见,除了函数式路由声明的 API 之外,我们还有一个函数式 API 用于请求和响应处理。

尽管新 API 为我们提供了一种声明处理程序和映射的函数式方法,但它并没有为我们提供一个完全轻量级的网络应用。在某些情况下,Spring 生态系统的整个功能可能是多余的,因此会减少应用程序的整体启动时间。例如,假设 那个 我们必须构建一个负责匹配用户密码的服务。通常,此类服务会通过对传入密码进行哈希处理,然后将其与存储的密码进行比较来消耗大量 CPU。我们唯一需要的功能是来自 Spring Security 模块的 PasswordEncoder 接口,它允许我们使用  PasswordEncoder#matchs 方法。因此,具有 IoC、注释处理和自动配置的整个 Spring 基础设施是多余的,并且在启动时间方面使我们的应用程序变慢。

幸运的是,新的功能性 Web 框架允许我们在不启动整个 Spring 基础架构的情况下构建 Web 应用程序。让我们考虑以下示例,以了解我们如何实现这一目标:

class StandaloneApplication {                                      // (1)

public static void main(String[] args) {                        // (2)
HttpHandler httpHandler = RouterFunctions.toHttpHandler(     // (2.1)
         routes(new BCryptPasswordEncoder(18))                     // (2.2)
      );                                                           //
ReactorHttpHandlerAdapter reactorHttpHandler =               // (2.3)
new ReactorHttpHandlerAdapter(httpHandler);               //

HttpServer.create()                                          // (3)
                .port(8080)                                        // (3.1)
                .handle(reactorHttpHandler)                        // (3.2)
                .bind()                                            // (3.3)
                .flatMap(DisposableChannel::onDispose)             // (3.4)
                .block();                                          // 
}                                                               //

static RouterFunction<ServerResponse> routes(                   // (4)
      PasswordEncoder passwordEncoder                              //
   ) {                                                             //
return
         route(POST("/check"),                                     // (5)
request -> request                                     //
.bodyToMono(PasswordDTO.class)                      // (5.1)
               .map(p -> passwordEncoder                           // 
                  .matches(p.getRaw(), p.getSecured()))            // (5.2)
               .flatMap(isMatched -> isMatched                     // (5.3)
? ServerResponse                                 //
                       .ok()                                       //
                       .build()                                    // 
                  : ServerResponse                                 //
                       .status(HttpStatus.EXPECTATION_FAILED)      //
                       .build()                                    //
               )                                                   //
         );                                                        //
   }
}

以下编号列表描述了前面的代码示例:

  1. 这是主应用程序类的声明。正如我们所见,Spring Boot 没有额外的注解。
  2. 在这里,我们有 main 方法的声明以及所需变量的初始化。在(2.2)处,我们调用 routes方法,然后转换RouterFunction HttpHandler。然后,在 (2.3) 处,我们使用内置的 HttpHandler 适配器称为 ReactorHttpHandlerAdapter
  3. 此时,我们创建了一个 HttpServer instance,它是 Reactor-Netty API 的一部分。在这里,我们使用 HttpServer 类的 fluent API 来设置服务器。在(3.1)点,我们声明端口,把创建的ReactorHttpHandlerAdapter实例 (在3.2),然后在 (3.3) 处调用 bind 开始服务器引擎。最后,为了让应用程序保持活跃 我们阻塞了主线程 并在点监听创建的服务器的disposal事件(3.4)
  4. 这一点显示了routes 方法的声明。
  1. 这是路由映射逻辑,它处理对任何 POST 方法的请求使用 /check  路径。在这里,我们首先在 bodyToMono 方法的支持下映射传入请求。然后,一旦正文被转换,我们使用一个 PasswordEncoder 实例来检查原始密码和编码密码(在我们的例子中,我们使用强大的 BCrypt 算法18 轮散列,编码/匹配可能需要几秒钟)(5.2)。最后,如果密码与存储的密码匹配,ServerResponse 返回 状态为 OK(200)  或 EXPECTATION_FAILED(417) 如果密码与存储的密码不匹配。

前面的示例显示了我们可以轻松地设置 Web 应用程序,而无需运行整个 Spring Framework 基础架构。这种 Web 应用程序的好处是它的启动时间要短得多。应用程序的启动时间约为 700 毫秒,而具有 Spring Framework 和 Spring Boot 基础架构的同一应用程序的启动过程需要长达 2 秒(约 2,000 毫秒),大约 span> 慢三倍。

 

笔记

请注意,启动时间可能会有所不同,但总体比例应该相同。

通过切换到功能路由声明来总结路由声明技术,我们将所有路由配置维护在一个地方,并使用响应式方法处理传入请求。同时,在访问传入请求参数、路径变量和请求的其他重要组件方面,这种技术提供了与通常的基于注释的方法几乎相同的灵活性。它还为我们提供了避免运行整个 Spring Framework 基础设施的能力,并且在路由设置方面具有相同的灵活性,这可能会将应用程序的引导时间减少多达三倍。

与 WebClient 的非阻塞跨服务通信

在前面的部分中,我们查看了新的 Spring WebFlux 模块的基本 设计 和更改的概述,并了解了RoutesFunction 的新功能方法。但是,Spring WebFlux 还包含其他 new 可能性。其中最重要的介绍之一是新的非阻塞 HTTP 客户端,称为 WebClient

 

本质上, WebClient 是旧的 RestTemplate的被动替换。 然而,在 WebClient 中,我们有一个更适合响应式方法的函数式 API,并提供到 Project Reactor 类型的内置映射,例如 Flux 单声道。为了进一步了解WebClient,我们来看下面的例子:

WebClient.create("http://localhost/api")                           // (1)
         .get()                                                    // (2)
         .uri("/users/{id}", userId)                               // (3)
         .retrieve()                                               // (4)
         .bodyToMono(User.class)                                   // (5)
         .map(...)                                                 // (6)
         .subscribe();                                             //

在前面的示例中,我们使用名为 create 的工厂方法创建了一个 WebClient 实例,如第 1 点所示。这里,< code class="literal">create 方法允许我们指定基本 URI,它在内部用于所有未来的 HTTP 调用。然后,为了开始构建对远程服务器的调用,我们可以执行一个 WebClient 听起来像 HTTP 方法的方法。在前面的示例中,我们使用了 WebClient#get,显示在 (2) 处。 一旦我们调用 < code class="literal">WebClient#get方法,我们对request builder实例进行操作,可以在 uri方法中指定相对路径,如图在 (3) 点。 除了相对路径,我们还可以指定 headers、cookies 和 request body。但是,为简单起见,我们在这种情况下省略了这些设置,并继续通过调用 retrieve or exchange< 来编写请求/代码> 方法。在这个例子中,我们使用 retrieve 方法,显示在(4)处。 这个选项很有用当我们 只有 有兴趣检索身体并进行进一步处理时。设置请求后,我们可以使用其中一种方法来帮助我们转换响应正文。在这里,我们使用 bodyToMono方法,它将传入的payload User 转换为单声道,显示在 (5) 处。最后,我们可以使用 Reactor API 构建传入响应的处理流程,并通过调用subscribe 方法执行远程调用。

笔记

WebClient 遵循 Reactive Streams 规范中描述的行为。这意味着只有调用 subscribe 方法才能 WebClient 连接连接并开始将数据发送到远程服务器。

尽管在大多数情况下,最常见的响应处理是正文处理,但在某些情况下,我们需要处理响应状态、标头或 cookie。例如,让我们调用 password 检查服务并使用 以自定义方式处理响应状态WebClient API:

class DefaultPasswordVerificationService                           // (1)
implements PasswordVerificationService {                        //

final WebClient webClient;                                      // (2)
                                                                   //
public DefaultPasswordVerificationService(                      // 
WebClient.Builder webClientBuilder                           //
   ) {                                                             //
      this.webClient = webClientBuilder                            // (2.1)
.baseUrl("http://localhost:8080")                         // 
         .build();                                                 //
}                                                               //

@Override                                                       // (3)
public Mono<Void> check(String raw, String encoded) {           //
return webClient                                             //
.post()                                                   // (3.1)
         .uri("/check")                                            //
         .body(BodyInserters.fromPublisher(                        // (3.2)
Mono.just(new PasswordDTO(raw, encoded)),              //
PasswordDTO.class                                      //
))                                                        //
         .exchange()                                               // (3.3)
         .flatMap(response -> {                                    // (3.4)
if (response.statusCode().is2xxSuccessful()) {         // (3.5)
return Mono.empty();                                //
}                                                      //
else if(resposne.statusCode() == EXPECTATION_FAILD) {  //
return Mono.error(                                  // (3.6)
new BadCredentialsException(...)                 //
               );                                                  //
}                                                      //
            return Mono.error(new IllegalStateException());        //
         });                                                       //
}                                                               //
}                                                                  //

以下编号列表描述了 preceding 代码示例:

  1. 这是 PasswordVerificationService 接口的实现。
  2. 这是 WebClient 实例的初始化。重要的是要注意,我们在这里为每个类使用一个 WebClient 实例 因此我们不必在每次执行时都初始化一个新实例 检查 方法。这种技术减少了初始化 WebClient 的新实例的需要,并减少了方法的执行时间。但是,WebClient 的默认实现使用 Reactor-Netty HttpClient,在默认配置中共享一个公共资源池在所有 HttpClient 实例中。因此,创建一个新的 HttpClient instance 的成本并不高。一旦调用了 DefaultPasswordVerificationService 的构造函数,我们就开始初始化 webClient 并使用流利的构建器,如 (2.1),为了设置客户端。  
  3. 这是 check 方法的实现。在这里,我们使用 webClient 实例来执行一个 post 请求,显示在 (3.1)。此外,我们使用 body  方法发送正文,并准备使用 BodyInserters#fromPublisher 插入它 工厂方法,在(3.2).中我们执行(3.3) 处的 "literal">exchange 方法,返回 Mono 。因此,我们可以使用 flatMap 运算符处理响应,如 (3.4) 所示。如果password验证成功,如(3.5)点所示,check 方法 返回Mono.empty。或者,在 EXPECTATION_FAILED(417) 状态码的情况下,我们可能会返回 Mono "literal">BadCredentialsExeception,如 (3.6) 处所示。

从前面的例子我们可以看出,在需要处理普通 HTTP 响应的状态码、标头、cookie 等内部结构的情况下,最合适的方法是  exchange 方法,返回ClientResponse。 

如前所述,DefaultWebClient 使用 Reactor-Netty HttpClient 为了提供与远程服务器。但是,DefaultWebClient 旨在能够轻松更改底层 HTTP 客户端。为此,围绕 HTTP 连接有一个低级的响应式抽象,称为 org.springframework.http.client.reactive.ClientHttpConnector。默认情况下,DefaultWebClient 预配置为使用 ReactorClientHttpConnector,这是 的实现ClientHttpConnector 接口。从 Spring WebFlux 5.1 开始,有一个 JettyClientHttpConnector 实现,它使用来自 Jetty 的响应式 HttpClient。为了更改底层 HTTP 客户端引擎,我们可以使用 WebClient.Builder#clientConnector 方法并传递所需的实例,该实例可能是自定义实现,也可能是现有实例。

除了有用的抽象层之外,ClientHttpConnector 可以以原始格式使用。例如,它可以用于下载大文件、即时处理或只是简单的字节扫描。  ClientHttpConnector我们就不赘述了;我们将把它留给好奇的读者自己研究。

反应式 WebSocket API

我们现在已经介绍了新 WebFlux 模块的大部分新功能。然而,现代 Web 的关键部分之一是流交互模型,其中客户端和服务器都可以相互流式传输消息。在本节中,我们将了解用于双工客户端-服务器通信的最著名的双工协议之一,称为 WebSocket

尽管通过 WebSocket 协议的通信是在 2013 年初在 Spring 框架中引入的,并且是为异步消息发送而设计的,但实际实现还是有一些阻塞操作。例如,将数据写入 I/O 或从 I/O 读取数据仍然是阻塞操作,因此都会影响应用程序的性能。因此,WebFlux 模块引入了 WebSocket 基础架构的改进版本。

WebFlux 提供客户端和服务器基础设施。我们将从分析服务器端 WebSocket 开始,然后介绍客户端的可能性。

服务器端 WebSocket API

WebFlux 提供 WebSocketHandler 作为处理 WebSocket 连接的中心接口。这个接口有一个名为handle的方法, 它接受WebSocketSessionWebSocketSession 类代表客户端和服务器之间的成功握手,并提供对信息的访问,包括有关握手、会话属性和传入数据流的信息。为了了解如何处理这些信息,让我们考虑以下用回显消息回复发送者的示例:

class EchoWebSocketHandler implements WebSocketHandler {           // (1)
@Override                                                       // 
public Mono<Void> handle(WebSocketSession session) {            // (2)
return session                                               // (3)
.receive()                                                // (4)
.map(WebSocketMessage::getPayloadAsText)                  // (5)
.map(tm -> "Echo: " + tm)                                 // (6)
.map(session::textMessage)                                // (7)
.as(session::send);                                       // (8)
}                                                              //
}

正如我们从前面的示例中看到的,新的 WebSocket API 构建在 Project Reactor 的反应类型之上。这里,在(1)点,我们提供了 WebSocketHandler接口的实现,并在点重写handle方法class="literal">(2). 然后,我们在 (3) 点使用 WebSocketSession#receive 方法 以构建传入 WebSocketMessage 的处理流程 使用 Flux API。  ;WebSocketMessageDataBuffer 的包装器 并提供额外的功能,例如将以字节表示的有效负载转换为点中的文本(5). 一旦收到的消息被提取出来,我们就会在该文本前面加上 "Echo:" 后缀显示在 (6) 处,将新文本消息包裹在 WebSocketMessage,并使用 WebSocketSession#send 方法将其发送回客户端。在这里,send 方法接受 Publisher  并返回 Mono<结果是无效>。因此,使用 Reactor API 中的 as 运算符,我们可以将 Flux as  Mono 并使用 session::send 作为转换函数。

除了 WebSocketHandler 接口实现之外,设置服务器端 WebSocket API 还需要配置额外的 HandlerMappingWebSocketHandlerAdapter 实例。考虑以下代码作为此类配置的示例:

@Configuration                                                     // (1)
public class WebSocketConfiguration {                              //

@Bean                                                           // (2)
public HandlerMapping handlerMapping() {                        //
SimpleUrlHandlerMapping mapping =                            //
new SimpleUrlHandlerMapping();                            // (2.1)
mapping.setUrlMap(Collections.singletonMap(                  // (2.2)
"/ws/echo",                                               //
new EchoWebSocketHandler()                                // 
      ));                                                          //
mapping.setOrder(-1);                                        // (2.3)
return mapping;                                              //
}                                                               //

@Bean                                                           // (3)
public HandlerAdapter handlerAdapter() {                        //
return new WebSocketHandlerAdapter();                        //
}                                                               //
}

前面的例子可以描述如下: 

  1. 这是用 @Configuration注释的类。
  2. 在这里,我们有 HandlerMapping bean 的声明和设置。在点 (2.1),我们创建 SimpleUrlHandlerMapping,它允许设置基于路径的映射,如点 (2.2), 到 WebSocketHandler。为了让 SimpleUrlHandlerMapping 在其他HandlerMapping实例之前被处理,它应该是一个更高的优先级。
  3. 这是HandlerAdapter bean的声明,即 WebSocketHandlerAdapter。在这里, WebSocketHandlerAdapter 扮演着最重要的角色,因为它将 HTTP 连接升级到 WebSocket 一个,然后调用 WebSocketHandler#handle 方法。

正如我们所见,WebSocket API 的配置很简单。

客户端 WebSocket API

WebSocket 模块(基于 WebMVC)不同,WebFlux 也为我们提供了客户端支持。为了发送 WebSocket 连接请求,我们有 WebSocketClient 类。 WebSocketClient 有两个 central 方法来执行 WebSocket 连接,如图以下代码示例:

public interface WebSocketClient {
Mono<Void> execute(
      URI url,
WebSocketHandler handler
   );
Mono<Void> execute(
      URI url,
HttpHeaders headers, 
      WebSocketHandler handler
   );
}

正如我们所见,WebSocketClient 使用相同的 WebSockeHandler 接口来处理来自服务器的消息并发送回消息。有一些与服务器引擎相关的 WebSocketClient 实现,例如 TomcatWebSocketClient 实现或 JettyWebSocketClient 实现。在以下示例中, 我们将查看 ReactorNettyWebSocketClient

WebSocketClient client = new ReactorNettyWebSocketClient();

client.execute(
URI.create("http://localhost:8080/ws/echo"),
session -> Flux
.interval(Duration.ofMillis(100))
      .map(String::valueOf)
      .map(session::textMessage)
      .as(session::send)
);

前面的示例展示了我们如何使用 ReactorNettyWebSocketClient 连接一个 WebSocket 连接并开始定期向服务器发送消息。

WebFlux WebSocket 与 Spring WebSocket 模块

熟悉基于 servlet 的 WebSocket 模块的读者可能会注意到两个模块的设计有很多相似之处.但是,也有很多不同之处。我们可能还记得,Spring WebSocket 模块的主要缺点是它与 IO 的阻塞交互,而 Spring WebFlux 完全不提供- 阻塞写入和读取。此外,WebFlux 模块通过使用 Reactive Streams 规范和 Project Reactor 提供了更好的流抽象。旧 WebSocket 模块的 WebSocketHandler 接口一次只允许处理一条消息。此外, WebSocketSession#sendMessage 方法只允许以同步方式发送消息。

但是,新的 Spring WebFlux 与 WebSocket 的集成存在一些差距。旧 Spring WebSocket 模块的关键特性之一是与 Spring Messaging 模块的良好集成,它允许使用 @MessageMapping 注释来声明 WebSocket 端点。下面的代码展示了一个使用 Spring Messaging 注释的旧的、基于 Web MVC 的 WebSocket API 的简单示例:

@Controller
public class GreetingController {

   @MessageMapping("/hello")
   @SendTo("/topic/greetings")
public Greeting greeting(HelloMessage message) {
return new Greeting("Hello, " + message.getName() + "!");
}
}

前面的代码展示了我们如何使用 Spring Messaging 模块来声明一个 WebSocket 端点。不幸的是,WebFlux 模块中的 WebSocket 集成缺少这种支持,为了声明复杂的处理程序,我们必须提供自己的基础设施。

第 8 章中, 使用 Cloud Streams 扩展 ,我们将介绍另一个强大的客户端和服务器之间双向消息传递的抽象,它可以在简单的浏览器-服务器交互之前使用。

反应式 SSE 作为 WebSockets 的轻量级替代品

与重量级 WebSocket 一起,HTML 5 引入了 一种创建静态(在本例中为半双工)连接的新方法,服务器能够推送事件的地方。这种技术解决了与 WebSocket 类似的问题。例如,我们可以声明一个 Server-sent events (SSE) 流使用相同的基于注解的编程模型,但返回 ServerSentEvent 对象的无限流,如以下示例所示:

@RestController                                                    // (1)
@RequestMapping("/sse/stocks")                                     //
class StocksController {                                           //
final Map<String, StocksService> stocksServiceMap;              //
   ...                                                                
@GetMapping                                                     // (2)
public Flux<ServerSentEvent<?>> streamStocks() { // (2.1) return Flux // .fromIterable(stocksServiceMap.values()) // .flatMap(StocksService::stream) // (2.2) .<ServerSentEvent<?>>map(item ->                          // 
ServerSentEvent                                        // (2.3)
.builder(item)                                      // (2.4)
               .event("StockItem")                                 // (2.5)
               .id(item.getId())                                   // (2.6)
               .build()                                            //
         )                                                         //
         .startWith(                                               // (2.7)
ServerSentEvent                                        // 
.builder()                                           //
              .event("Stocks")                                     // (2.8)
              .data(stocksServiceMap.keySet())                     // (2.9)
              .build()                                             //
         );                                                        //
}
}

上述代码中的数字可以解释如下:

  1. 这是 @RestController 类的声明。为了简化代码,我们跳过了构造函数和字段初始化部分。
  2. 在这里,我们有由熟悉的 @GetMapping 注释的处理程序方法的声明。正如我们在 (2.1) 处看到的, streamStocks 方法返回 ServerSentEvent的">Flux,表示当前处理程序启用事件流。然后,我们合并所有可用的股票来源并将更改流式传输到客户端,如点 (2.2) 所示。 之后,我们应用映射,映射每个 StockItem 到ServerSentEvent,如(2.3)所示,使用(2.4) 中的静态builder 方法。 为了正确设置 ServerSentEvent 实例,我们在构建器参数中提供事件 ID (2.6) 和事件名称 (2.5)< /code>, 允许在客户端区分消息。此外,在 (2.7) 点,我们用特定的 ServerSentEvent 启动 Flux  实例,显示在点(2.8), 向客户端声明可用的库存渠道 (2.9)

正如我们从前面的示例中看到的,Spring WebFlux 允许映射 Flux reactive 类型 的流性质,并向客户端发送无限的股票事件流。此外,SSE 流不需要我们更改 API 或使用额外的抽象。它只需要我们声明一个特定的返回类型来帮助框架弄清楚如何处理响应。我们不必声明 ServerSentEvent的 Flux 我们可以直接提供内容类型,如下例所示:

@GetMapping(produces = "text/event-stream")
public Flux<StockItem> streamStocks() {
   ...
}

在这种情况下,WebFlux 框架在内部将流的每个元素包装到 ServerSentEvent 中。

正如我们所看到的,ServerSentEvent 技术背后的主要好处是这种流模型的配置不需要额外的样板代码,我们在 WebFlux 中采用了 WebSocket。这是因为 SSE 是对 HTTP 的简单抽象,不需要协议 switching 并且不需要特定的服务器配置。正如我们从前面的示例中看到的,可以使用 @RestController@XXXMapping 注释的传统组合来配置 SSE。然而,在 WebSocket 的情况下,我们需要自定义消息转换配置,例如手动选择特定的消息传递协议。相比之下,对于 SSE,Spring WebFlux 提供与典型 REST 控制器相同的消息转换器配置。

另一方面,SSE 不支持二进制编码并将事件限制为 UTF-8 编码。这意味着 WebSocket 对于更小的消息大小和在客户端和服务器之间传输更少的流量 可能很有用,因此具有更低的延迟。

总而言之,SSE 通常是 WebSocket 的一个很好的替代方案。由于 SSE 是 HTTP 协议的抽象,WebFlux 支持与典型 REST 控制器相同的声明性和功能性端点配置和消息转换。

笔记

要详细了解 SSE 的优缺点以及它与 WebSocket 的比较,请参阅以下帖子: https://stackoverflow.com/a/5326159/4891253

反应式模板引擎

除了常规 API 功能外,现代 Web 应用程序最受欢迎的部分之一是 UI。当然,今天的 Web 应用程序 UI 基于复杂的 JavaScript 渲染,并且在大多数情况下,开发人员更喜欢客户端渲染而不是服务器端渲染。尽管如此,许多企业应用程序仍在使用与其用例相关的服务器端渲染技术。 Web MVC 支持各种技术,例如 JSP、JSTL、FreeMarker、Groovy Markup、Thymeleaf、Apache Tiles 等等。不幸的是,在 Spring 5.x 和 WebFlux 模块中,已经放弃了对其中许多的支持,包括 Apache Velocity。

尽管如此,Spring WebFlux 具有与 Web MVC 相同的视图渲染技术。下面的示例展示了一种指定渲染视图的熟悉方式: 

@RequestMapping("/")
public String index() {
return "index";
}

在前面的示例中,作为 index 方法调用的结果,我们返回了一个带有名称的 String 的看法。在后台,框架在配置的文件夹中查找该视图,然后使用适当的模板引擎呈现它。 

默认情况下, WebFlux 仅 支持 FreeMarker 服务器端渲染 引擎。但是,重要的是要弄清楚模板渲染过程中如何支持响应式方法。为此,让我们考虑一个涉及渲染大型音乐播放列表的案例:

@RequestMapping("/play-list-view")
public Mono<String> getPlaylist(final Model model) {               // (1)
final Flux<Song> playlistStream = ...;                          // (2)
return playlistStream                                           // 
.collectList()                                               // (3)
      .doOnNext(list -> model.addAttribute("playList", list))      // (4)
      .then(Mono.just("freemarker/play-list-view"));               // (5)
}

从前面的例子可以看出,我们使用的是响应式类型,Mono (显示在 1 处) ,以便异步返回视图名称。另外,我们的模板有一个占位符,  dataSource,应该由给定的  Song列表填充,如图所示在 (2) 点。提供上下文特定数据的常用方法是定义  Model(1)并将所需的属性放入其中,如点 (4) 所示。不幸的是,FreeMarker 不支持数据的反应式和非阻塞式渲染,因此我们必须将所有歌曲收集到一个列表中,并将收集到的数据放入  Model。最后,一旦所有条目被收集并存储在  Model中,我们就可以返回视图的名称并开始渲染它。

不幸的是,像这样的渲染模板是一个 CPU 密集型操作。如果我们有一个庞大的数据集,这可能需要一些时间和内存。幸运的是,Thymeleaf 的社区决定支持 Reactive WebFlux 并为异步和流式模板渲染提供更多可能性。 Thymeleaf 提供与 FreeMarker 类似的功能,并允许编写相同的代码来呈现 UI。 Thymeleaf 还为我们提供了使用响应式类型作为模板内的数据源的能力,并在流中的新元素可用时呈现模板的一部分。以下示例显示了我们如何在处理请求期间将 Reactive Streams 与 Thymeleaf 一起使用:

@RequestMapping("/play-list-view")
public String view(final Model model) {
final Flux<Song> playlistStream = ...;

   model.addAttribute(
"playList", 
      new ReactiveDataDriverContextVariable(playlistStream, 1, 1)
   );

   return "thymeleaf/play-list-view";
}

此示例引入了一种名为 ReactiveDataDriverContextVariable 的新数据类型,它接受诸如 Publisher、 Flux、 Mono、 Observable,以及 支持的其他响应式类型ReactiveAdapterRegistry class. 

尽管响应式支持需要围绕流的附加类包装器,但模板端不需要任何更改。以下示例显示了我们如何以与普通集合类似的方式使用反应流:

<!DOCTYPE html>                                                    // (1)
<html>                                                             //
   ...                                                             //
   <body>                                                          //
      ...                                                          // 
      <table>                                                      // (2)
         <thead>                                                   //
           ...                                                     // (3)
         </thead>                                                  //
         <tbody>                                                   // (4)
            <tr th:each="e : ${playList}">                         // (5)
               <td th:text="${e.id}">...</td>                      //
               <td th:text="${e.name}">...</td>                    //
               <td th:text="${e.artist}">...</td>                  //
               <td th:text="${e.album}">...</td>                   //
            </tr>                                                  //
         </tbody>                                                  //
      </table>                                                     //
  </body>                                                          //
</html>                                                            //

这段代码演示了如何使用 Thymeleaf 模板的标记,它有一个通用的 HTML 文档声明,如第 1 点所示。它呈现一个表格,如第 (2) 点所示, 带有一些标题(点 3) 和正文 (4)。这由 playList of Song entries  和有关它们的信息构成的行填充。

这里最有价值的优势是 Thymeleaf 的渲染引擎开始将数据流式传输到客户端,而无需等待最后一个元素被发射。此外,它支持渲染无限的元素流。通过添加对 Transfer-Encoding: chunked 的支持,这成为可能。 Thymeleaf 不是在内存中渲染整个模板,而是先渲染可用部分,然后在有新元素可用时以块的形式异步发送模板的其余部分。 

不幸的是,在撰写本文时,Thymeleaf 仅支持每个模板一个反应式数据源。尽管如此,这种技术返回第一块数据的速度比通常的渲染要快得多,这需要整个数据集都存在,并减少了请求和服务器的第一个反馈之间的延迟,从而改善了整体用户体验。

反应式网络安全

现代网络应用程序最重要的部分之一是安全性。从 Spring Web 的早期开始,它就带有一个配套模块——Spring Security 模块。这允许通过在任何控制器和 Web 处理程序调用之前提供一个 Filter 来设置一个安全的 Web 应用程序并自然地适应现有的 Spring Web 基础设施。多年来,Spring Security 模块与 Web MVC 基础架构相结合,并且只使用了 Servlet API 的Filter抽象。

幸运的是,随着 Reactive WebFlux 模块的引入,一切都发生了变化。为了支持组件之间的反应式和非阻塞交互并以反应式方式提供访问,Spring Security 提供了一个全新的反应式堆栈的实现,它使用新的 WebFilter  ;基础设施并严重依赖 Project Reactor 的 上下文功能。

对 SecurityContext 的响应式访问

为了访问 SecurityContext 在新的响应式Spring安全模块,我们有一个名为 ReactiveSecurityContextHolder 的新类。

ReactiveSecurityContextHolder 通过静态 getContextSecurityContext 的访问> 方法,返回 Mono<SecurityContext>。这意味着我们可以编写以下代码来 为了访问应用程序中的SecurityContext :

@RestController                                                    // (1)
@RequestMapping("/api/v1")                                         //
public class SecuredProfileController {                            //

@GetMapping("/profiles")                                        // (2)
   @PreAuthorize("hasRole(USER)")                                  // (2.1)
public Mono<Profile> getProfile() {                             // (2.2)
return ReactiveSecurityContextHolder                         // (2.3)
.getContext()                                             // (2.4)
         .map(SecurityContext::getAuthentication)                  //
         .flatMap(auth ->                                          //
            profileService.getByUser(auth.getName())               // (2.5)
         );                                                        //
}                                                               //
}

前面的例子可以解释如下:

  1. 这是 REST 控制器类的声明,请求映射等于 "/api/v1"

  2. 这是 getProfile handler 方法声明。正如我们所见,这个方法返回 Mono reactive 类型, 它允许对数据进行响应式访问,如点 (2.2) 。然后,为了访问当前的SecurityContext,我们调用 ReactiveSecurityContextHolder.getContext(),如点(2.3)(2.4)。 最后,如果 SecurityContext 存在, flatMap 被处理,我们可以访问用户的个人资料,如第 2.5 点所示。此外,此方法使用 @PreAuthorize 进行注释,在这种情况下,它会检查可用的 Authentication 是否具有所需的角色。请注意,如果我们有一个响应式返回类型,则该方法的调用将被推迟,直到所需的 Authentication 得到解决并且存在所需的权限。

正如我们所看到的,新的反应式上下文持有者的 API 有点类似于我们在 API 的同步对应物中所拥有的 API。此外,在新一代 Spring Security 中,我们可以使用相同的注解来检查所需的权限。

在内部,ReactiveSecurityContextHolder 依赖于 Reactor Context API。关于登录用户的当前信息保存在 Context 接口的实例中。以下示例显示了 ReactiveSecurityContextHolder 如何在后台工作:

static final Class<?> SECURITY_CONTEXT_KEY = SecurityContext.class;
...
public static Mono<SecurityContext> getContext() {
return Mono.subscriberContext()
      .filter(c -> c.hasKey(SECURITY_CONTEXT_KEY))
      .flatMap(c -> c.<Mono<SecurityContext>>get(SECURITY_CONTEXT_KEY));
}

我们可能记得在 第 4 章中,Project Reactor - 响应式应用程序的基础< /em>,为了访问内部的Reactor Context,我们可以使用Mono反应类型的专用操作符,称为 subscriberContext。然后,一旦访问上下文,我们就过滤 当前的Context 并检查它是否包含特定的键。隐藏在该键中的值是来自SecurityContextMono,这意味着我们可以访问当前的SecurityContext 以一种被动的方式。执行与从例如数据库中检索存储的 SecurityContext 相关,该数据库仅在有人订阅给定的 Mono< 时执行/代码>。

尽管 ReactiveSecurityContextHolder 的 API 看起来很熟悉,但它隐藏了很多陷阱。例如,我们可能会错误地遵循我们在使用 SecurityContextHolder时习惯的做法。 因此,我们可能会盲目地实现以下代码中描述的常见交互样本:

ReactiveSecurityContextHolder
   .getContext()
   .map(SecurityContext::getAuthentication)
   .block();

就像我们过去从 ThreadLocal中检索 SecurityContext一样,我们可能会尝试用 ReactiveSecurityContextHolder,如上例所示。不幸的是,当我们调用 getContext 并使用 block 方法订阅流时,将配置一个空上下文在溪流中。因此,一旦  ReactiveSecurityContextHodler 类试图访问内部  Context,没有可用的 SecurityContext 将在那里找到。

所以,问题是,当我们正确连接流时,如何设置 Context 并使其可访问,如本节开头所示?答案就在第五代Spring< /a> 安全模块。在调用期间,ReactorContextWebFilter 使用 subscriberContext 提供一个 Reactor Context方法。另外, SecurityContext的解析是使用 ServerSecurityContextRepository进行的。 ServerSecurityContextRepository 有两个方法,分别是save 和load

interface ServerSecurityContextRepository {

Mono<Void> save(ServerWebExchange exchange, SecurityContext context);

Mono<SecurityContext> load(ServerWebExchange exchange);
}

正如我们在前面的代码中看到的, save 方法 允许 将 SecurityContext 与一个特定的ServerWebExchange 然后 使用load方法从附加到  ServerWebExchange。

正如我们所见,新一代Spring的主要优势在于对响应式访问的全面支持 <代码类="literal">SecurityContext。这里,响应式访问意味着实际的 SecurityContext 可能存储在数据库中,因此存储的 SecurityContext 的解析不需要阻塞操作。上下文解析的策略是惰性的,所以实际调用底层存储只有在我们订阅 ReactiveSecurityContextHolder.getContext()时才会执行。最后, SecurityContext传输的机制让我们可以轻松构建复杂的流式处理 不用关注常见的ThreadLocal Thread 实例之间的传播。

启用反应式安全性

我们尚未讨论的最后一部分是如何complex 在响应式 Web 应用程序中启用安全性。幸运的是,现代基于 WebFlux 的应用程序中的安全配置需要声明几个 bean。以下是我们如何执行此操作的参考示例:

@SpringBootConfiguration                                           // (1)
@EnableReactiveMethodSecurity                                      // (1.1)
public class SecurityConfiguration {                               //

@Bean                                                           // (2)
public SecurityWebFilterChain securityWebFilterChain(           //
ServerHttpSecurity http                                      // (2.1)
) {                                                             //
return http                                                  // (2.2)
         .formLogin()                                              //
         .and()                                                    //
         .authorizeExchange()                                      //
            .anyExchange().authenticated()                         //
         .and()                                                    //
         .build();                                                 // (2.3)
}                                                               //

   @Bean                                                           // (3)
   public ReactiveUserDetailsService userDetailsService() {        //
UserDetails user =                                           //
         User.withUsername("user")                                 // (3.1)
             .withDefaultPasswordEncoder()                         //
             .password("password")                                 //
             .roles("USER", "ADMIN")                               //
             .build();                                             //
      return new MapReactiveUserDetailsService(user);              // (3.2)
   }                                                               //
}

代码中前面的数字可以解释如下:

  1. 这是配置类的声明。在这里,为了启用特定的 注解的MethodInterceptor,我们必须添加@EnableReactiveMethodSecurity注解,它会导入配置 需要 为此,如 (1.1) 所示。
  2. 在这里,我们配置了 SecurityWebFilterChain bean。为了配置所需的bean,Spring Security为我们提供了ServerHttpSecurity,它是一个builder(见2.3) 使用流畅的 API(如 2.2 所示)。
  3. 这是 ReactiveUserDetailsS​​ervice bean 的配置。为了在默认的 Spring Security 设置中对用户进行身份验证,我们必须提供 ReactiveUserDetailsS​​ervice 的实现。出于演示目的,我们提供了接口的内存实现,如 (3.2) 处所示,并配置测试用户(在 3.1) 以登录系统。

正如我们在前面的代码中可能注意到的,Spring Security 的整体配置与我们之前看到的类似。这意味着迁移到这样的配置不会花费太多时间。

新一代 Spring Security 对响应式的支持使我们能够以最少的基础设施设置工作来构建高度受保护的 Web 应用程序。

与其他响应式库的交互

尽管 WebFlux 使用 Project Reactor 3 作为 central 构建块,但 WebFlux 也允许使用其他响应式库。为了实现跨库互操作性,WebFlux 中的大多数操作都基于 Reactive Streams 规范中的接口。通过这种方式,我们可以轻松地将 Reactor 3 中编写的代码替换为 RxJava 2 或 Akka Streams:

import io.reactivex.Observable;                                    // (1)
...                                                                //

@RestController                                                    // (2)
class AlbomsController {                                           // 
   final ReactiveAdapterRegistry adapterRegistry;                  // (2.1)
   ...                                                             //

   @GetMapping("/songs")                                           // (3)
public Observable<Song> findAlbomByArtists(                     // (3.1)
      Flux<Artist> artistsFlux                                     // (3.2)
   ) {                                                             
      Observable<Artist> observable = adapterRegistry              // (4)
         .getAdapter(Observable.class)                             //
         .fromPublisher(artistsFlux);                              //
      Observable<Song> albomsObservable = ...;                     // (4.1) 
                                                                   //
return albomsObservable;                                     // (4.2)
}
}

此代码在以下列表中进行了说明:

  1. 这是导入声明,它表明我们从 RxJava 2 导入了 Observable
  2. 这是 AlbomsController 类,使用 @RestController 注释进行注释。我们还声明了一个 ReactiveAdapterRegistry 类型的字段,这个例子后面会用到。
  3. 在这里,我们有一个名为 findAlbumByArtists 的请求处理程序方法的声明。正如我们所见, findAlbumByArtists 接受 Publisher 类型为 Flux< Artist>,如图点(3.2), 并返回Observable<Song>,如图在 (3.1) 点。
  4. 在这里,我们有将 artistsFlux 映射到Observable<Artist>的声明, 执行业务逻辑(在 4.1并将结果返回给调用者。

前面的示例展示了如何使用来自 RxJava 的反应类型以及 Project Reactor 反应类型来重写反应通信。我们可能还记得 第 5 章使用 Spring Boot 2 实现响应式, 反应式类型转换是 Spring Core 模块的一部分,并由 org.springframework.core.ReactiveAdapterRegistryorg 支持。 springframework.core.ReactiveAdapter。这些类允许在 Reactive Streams Publisher 类之间进行转换。因此,有了这个支持库,我们几乎可以使用任何响应式库,而不必将它与 Project Reactor 紧密耦合。

WebFlux 与 Web MVC


在前面的部分中,我们简要概述了新 Spring WebFlux 中包含的主要组件。我们还查看了 Spring WebFlux 模块中引入的新功能以及如何使用它们。

然而,尽管我们现在已经了解如何使用新的 API 来构建 Web 应用程序,但仍然不清楚为什么新的 WebFlux 比 Web MVC 好。了解 WebFlux 的主要优势会有所帮助。

为此,我们必须深入研究如何构建 Web 服务器的理论基础,了解快速 Web 服务器的关键特征是什么,并考虑哪些因素可能会改变 Web 服务器的性能。在接下来的部分中,我们将分析现代 Web 服务器的关键特性,了解可能导致性能下降的原因,并思考如何避免这种情况。

比较框架时,法律很重要

在我们继续之前,让我们尝试了解我们将用于比较的系统的特征。大多数 Web 应用程序的核心指标是吞吐量、延迟、CPU 和内存使用情况。网络现在与刚开始时有完全不同的要求。以前,计算机是顺序的。用户过去很乐意观察简单的静态内容,并且系统的整体负载很低。主要操作涉及生成 HTML 或简单的计算。计算适合一个处理器,我们不需要一台以上的服务器来运行 Web 应用程序。

随着时间的推移,游戏规则发生了变化。网络开始按十亿计算用户,内容开始变得动态甚至实时。对吞吐量和延迟的要求发生了很大变化。我们的 Web 应用程序已经开始高度分布在核心和集群上。了解如何扩展 Web 应用程序已变得至关重要。一个重要的问题是——并行工作者的数量如何改变延迟或吞吐量?

小法则

为了回答这个问题,利特尔定律来拯救。 这个定律解释如何计算请求的数量同时处理(或简单地说应该有多少并行工作人员)以在特定延迟级别处理预定义的吞吐量。换句话说,使用这个公式,我们可以计算出系统容量,或者说我们需要多少计算机、节点或 Web 应用程序并行运行 为了以稳定的响应时间处理每秒所需的用户数量:

读书笔记《hands-on-reactive-programming-in-spring-5》WebFlux 异步非阻塞通信

上述公式可以解释为: 系统或队列中驻留的平均请求数(或同时处理的请求数)( N)等于吞吐量(或每秒用户数)(X)乘以平均值响应时间或延迟(R)。

这意味着如果我们的系统具有平均响应时间 R 0.2 秒和吞吐量 X 的 100每秒请求,那么它应该能够同时处理 20 个请求,或者并行处理 20 个用户。我们要么需要 20 名工人在一台机器上,要么需要 20 台机器和一名工人。这是一个理想的情况,工作人员或同时请求之间没有交集。如下图所示:

读书笔记《hands-on-reactive-programming-in-spring-5》WebFlux 异步非阻塞通信

图 6.3。理想的同时处理

正如我们在上图中看到的,系统中有 3 个工作人员,每秒可以处理 6 个参与者或请求。在这种情况下,所有actor在worker之间是平衡 这意味着它们之间不需要协调来选择worker .

但是,前面的情况其实不太现实,因为任何系统,比如web应用,都需要并发访问共享CPU 或内存等资源。因此,有一个 不断增长的 对整体吞吐量的修正列表, 这在阿姆达尔定律及其扩展中进行了描述, < span class="emphasis">通用可扩展性定律

阿姆达尔定律

这些定律中的第一个是关于 序列化访问对平均响应时间(或延迟)的影响,从而影响吞吐量。尽管我们可能总是希望并行化我们的工作,但可能会出现无法并行化的情况,而我们需要对工作进行序列化。如果我们在反应流中有一个 coordinator worker,或者 聚合或 reduction operator,这可能就是这种情况,这意味着我们必须加入所有执行。或者,它可能是一段仅在串行模式下工作的代码,因此不能并行执行。在大型微服务系统中,这可能是负载均衡器或 orchestration 系统。因此,我们可以参考阿姆达尔定律来使用以下公式计算吞吐量变化:

读书笔记《hands-on-reactive-programming-in-spring-5》WebFlux 异步非阻塞通信

在这个公式中,X(1) 是 < span class="emphasis">初始吞吐量N 是并行化或worker的数量,σ 是一个竞争系数(也称为 序列化系数),或者换句话说,执行不能并行处理的代码所花费的总时间的百分比。

如果我们做一个简单的计算并构建一个具有一些随机竞争系数的并行化吞吐量的依赖图, σ = 0,03 和初始吞吐量X(1)  = 在并行化范围内每秒 50 个请求 N = 0..500,那么我们实现了以下 曲线:

读书笔记《hands-on-reactive-programming-in-spring-5》WebFlux 异步非阻塞通信

图 6.4。吞吐量随并行化而变化

从上图中我们可以看出,随着并行化的增加,系统的吞吐量开始变得越来越慢。最后,吞吐量的整体增长结束,而是遵循渐近行为。 Amdahl 定律指出,整体工作并行化并不会带来吞吐量的线性增长,因为我们无法比代码的序列化部分更快地处理结果。从扩展普通 Web 应用程序的角度来看,这种说法意味着如果我们有一个无法更快工作的单点协调或处理,我们不会从增加系统中的核心或节点数量中获得任何好处。而且,我们通过支持冗余机器来赔钱,吞吐量的整体增加是不值得的。

从上图可以看出,吞吐量的变化依赖于并行化。但是,在许多情况下,我们必须了解延迟如何随着对并行化的依赖增加而发生变化。为此,我们可以结合利特尔定律和阿姆达尔定律的方程。我们可能记得,两个方程都包含吞吐量 (X )。因此,我们必须重写 Little's Law 以结合这两个公式:

读书笔记《hands-on-reactive-programming-in-spring-5》WebFlux 异步非阻塞通信

preceding 转换之后,我们可以替换 X(N) 在阿姆达尔定律中,并推导出以下内容:

读书笔记《hands-on-reactive-programming-in-spring-5》WebFlux 异步非阻塞通信

最后,为了导出延迟(R),我们必须进行以下转换:

读书笔记《hands-on-reactive-programming-in-spring-5》WebFlux 异步非阻塞通信
读书笔记《hands-on-reactive-programming-in-spring-5》WebFlux 异步非阻塞通信

从前面的公式中,我们可以得出总体增长是线性的结论。下图显示了取决于并行化的延迟增长曲线:

读书笔记《hands-on-reactive-programming-in-spring-5》WebFlux 异步非阻塞通信

图 6.5。延迟线性增长取决于并行化

这意味着随着并行化的增加,响应时间会减少。

总而言之,正如阿姆达尔定律所描述的,具有并行执行的系统总是有序列化点,这会导致额外的开销并且不允许我们达到更高的吞吐量 只是通过提高并行化水平。下图显示了这个系统:

读书笔记《hands-on-reactive-programming-in-spring-5》WebFlux 异步非阻塞通信

图 6.6。使用阿姆达尔定律的同时处理示例

上图可以描述如下: 

  1. 这是工人代表。请注意,即使在这里,也不能将代码拆分为可以独立执行的较小的子任务,这也应该被视为序列化点。
  2. 这是队列或用户请求中参与者的表示。
  1. 这是在将参与者或用户请求分配给专用工作人员之前的队列。序列化点是协调和分配一个actor给worker。
  2. 可能需要双向协调演员。此时,协调器可能会执行一些操作以将响应发送回用户。

总而言之,阿姆达尔定律指出系统存在瓶颈,因此,我们无法为更多用户提供服务或降低延迟。

通用可扩展性法则

尽管阿姆达尔定律解释了任何系统的可扩展性,但真正的应用程序显示出完全不同的可扩展性结果。经过对该领域的一些研究,Neil Gunther 发现,尽管有序列化,还有另一个更关键的点, 被称为不连贯

笔记

Neil Gunther 是一位计算机信息系统研究员,他以开发开源性能建模软件 Pretty Damn Quick 和开发计算机容量规划和性能分析的游击方法而闻名于世。 欲了解更多信息,请访问 http://www.perfdynamics.com/Bio/njg.html

不连贯是具有共享资源的并发系统中的常见现象。例如,从标准 Java Web 应用程序的角度来看,这种不连贯性暴露在混乱的 线程 对 CPU 等资源的访问。 整个 Java 线程模型不是理想的。在 Thread 实例多于实际处理器的情况下,不同的 线程之间存在直接冲突  ;用于访问 CPU 和实现其计算周期的实例。这需要额外的努力来解决它们多余的协调和连贯性。  Thread 对共享内存的每次访问都可能需要额外的同步并降低应用程序的吞吐量和延迟。 

In order to explain such behavior in the system, the Universal Scalability Law (USL) extension of Amdahl's Law provides the following formula for calculating throughput changes depending on parallelization:

读书笔记《hands-on-reactive-programming-in-spring-5》WebFlux 异步非阻塞通信

The preceding formula introduces a coefficient of coherence (k). The most notable thing here is that, from now on, there will be quadratic backward throughput X(N) relation on the parallelization N.

In order to understand the fatal effect of this connection, let's take a look at the following diagram, where we have the same as before—the initial throughput X(1) = 50, the coefficient of contention σ = 0,03, and the coefficient of coherence 0,00007:

读书笔记《hands-on-reactive-programming-in-spring-5》WebFlux 异步非阻塞通信

Diagram 6.7 Throughput depending on parallelization. A comparison of Amdahl's Law (dotted line) versus USL (solid line).

从前面的图中,我们可以观察到存在一个危机点,之后 throughput 开始下降。此外,为了更好地表示实际系统的可伸缩性,该图显示了由 USL 建模的系统可伸缩性和由 Amdahl 定律建模的系统可伸缩性。平均响应时间退化曲线也改变了它的行为。下图显示了取决于并行化的延迟变化:

读书笔记《hands-on-reactive-programming-in-spring-5》WebFlux 异步非阻塞通信

Diagram 6.8 Throughput depending on parallelization. Comparison of Amdahl Law (dotted line) versus the USL (solid line)

Similarly, for the purpose of showing a contrast, the latency change curve modeled by the USL is compared with the same curve modeled by Amdahl's Law. As we can see from the previous plots, the system behaves differently when there are shared points of access, which may be incoherent and require additional synchronization. A schematic example of such a system is depicted in the following diagram:

读书笔记《hands-on-reactive-programming-in-spring-5》WebFlux 异步非阻塞通信

图 6.9。 USL的同时处理示例

正如我们所看到的,系统的整体情况可能比利特尔定律最初引入的 复杂得多。有很多隐藏的陷阱,可能会直接影响系统的可扩展性。

总结这三个部分,对这些规律的整体理解在对可扩展系统建模和规划系统容量方面起着重要作用。这些定律可能适用于复杂的高负载分布式系统,以及使用 Spring Framework 构建的 Web 应用程序的多处理器节点。此外,了解影响系统可扩展性的因素有助于正确设计系统并避免不连贯和争用等陷阱。它还可以从法律的角度正确分析 WebFlux 和 Web MVC 模块,并预测哪些规模将表现最佳。

彻底的分析和比较

根据我们对可扩展性的了解,我们知道了解框架的行为、架构和资源使用模型是至关重要的。此外,选择合适的框架来解决具体问题也很关键。在接下来的几个小节中,我们将从不同的角度比较 Web MVC 和 WebFlux,最后了解它们各自更适合哪些问题领域。

了解 WebFlux 和 Web MVC 中的处理模型

首先,为了了解不同处理模型对系统吞吐量和延迟的影响,我们将回顾一下传入请求在 Web MVC 和 WebFlux 中是如何处理的。

如前所述,Web MVC 建立在阻塞 I/O 之上。这意味着处理每个传入的请求的 线程 可以通过从 I/O 读取传入的主体来阻止:

读书笔记《hands-on-reactive-programming-in-spring-5》WebFlux 异步非阻塞通信

图 6.10。阻塞请求和响应处理

在前面的示例中,所有请求都由一个 Thread 依次排队和处理。黑条表示存在从/到 I/O 的阻塞读/写操作。此外,正如我们可能注意到的,实际处理时间(白条)远小于阻塞操作所花费的时间。从这个简单的图表中,我们可以得出 线程效率低下,在接受和处理队列中的请求时可能会共享等待时间。

相比之下,WebFlux 建立在非阻塞 API 之上,这意味着没有操作需要与 I/O 块 Thread 交互。下图描述了这种接受和处理请求的有效技术:

读书笔记《hands-on-reactive-programming-in-spring-5》WebFlux 异步非阻塞通信

图 6.11。异步非阻塞请求处理

正如我们在上图中看到的,我们有一个与前面阻塞 I/O 案例相同的案例。在图的左侧,有一个请求队列,在中间,有一个处理时间线。在这种情况下,处理时间线没有任何黑条,这意味着即使没有足够的来自网络的字节来继续处理请求,我们也可以随时切换到处理另一个 请求而不阻塞线程。将前面的异步、非阻塞请求处理与阻塞示例进行比较,我们可能会注意到,现在 request body 被收集,Thread 被有效地用于接受新的连接。然后,底层操作系统可能会通知我们,比如请求体已经被收集,处理器可以不阻塞地拿去处理。在这种情况下,我们有最佳的 CPU 利用率。类似地,写入响应不需要阻塞,并允许我们以非阻塞方式写入 I/O。唯一的区别是系统会在准备好将一部分数据写入 I/O 时通知我们,而不会阻塞。

前面的例子表明,WebFlux 使用一个 Thread 的效率比 Web MVC 高得多,因此可以在同一时间段内处理更多的请求。然而,仍有可能争辩说,我们仍然在 Java 中使用多线程,因此我们可以通过适当数量的 Thread 实例来利用真正的处理器。因此,为了更快地处理请求并在阻塞 Web MVC 的情况下达到相同的 CPU 利用率,我们可以使用多个工作线程,而不是一个 Thread,甚至是一个 Thread

读书笔记《hands-on-reactive-programming-in-spring-5》WebFlux 异步非阻塞通信

图 6.12。每个连接的线程 Web MVC 模型

从上图中我们可以看出,多线程模型允许更快地处理排队的请求,并给人一种系统接受、处理和响应几乎相同数量的请求的错觉。

然而,这种设计有其缺陷。正如我们从通用可扩展性定律中了解到的,当系统具有共享资源(例如 CPU 或内存)时,扩展并行工作器的数量可能会降低系统的性能。在这种情况下,当用户请求的处理涉及到太多的Thread实例时,会因为它们之间的不连贯性而导致性能下降。

处理模型对吞吐量和延迟的影响

为了验证这个说法,让我们尝试做一个简单的负载测试。为此,我们打算使用带有Web MVC或WebFlux的简单Spring Boot 2.x应用程序(我们称之为中间件)。我们还将通过对第三方服务进行一些网络调用来模拟来自中间件的 I/O 活动,这将返回一个空的成功响应,并保证 200 毫秒的平均延迟。通信流程描述如下:

读书笔记《hands-on-reactive-programming-in-spring-5》WebFlux 异步非阻塞通信

图 6.13。基准通信流

为了启动我们的中间件并模拟客户端活动,我们将使用 Microsoft Azure 基础架构,并在每台机器上安装 Ubuntu Server 16.04。对于中间件,我们将使用 D12 v2 VM(4 个虚拟 CPU 和 28 GB RAM)。对于客户端,我们将使用 F4 v2 VM(4 个虚拟 CPU 和 8 GB RAM)。用户活动将逐步增加。我们将以 4 个并发用户开始负载测试,并以 20,000 个并发用户结束。这将为我们提供平滑的延迟曲线和吞吐量变化,并允许我们创建可理解的图形。为了在中间件上产生适当的负载并正确收集统计数据和测量特征,我们打算使用现代HTTP基准测试< span>tool 调用 wrkhttps://github.com/wg/wrk)。

笔记

请注意,这些基准 旨在显示趋势,而不是系统的稳定性 随着时间的推移,并衡量 WebFlux 框架的当前实现有多合适。以下测量显示了 WebFlux 中的非阻塞和异步通信相对于 Web MVC 中的阻塞同步和基于线程的通信的优势。

以下是用于测量的 Web MVC 中间件代码示例:

@RestController                                                    // (1)
@SpringBootApplication                                             // 
public class BlockingDemoApplication                               //
implements InitializingBean {                                   //
...                                                             // (1.1)
@GetMapping("/")                                                // (2)
public void get() {                                             //
restTemplate.getForObject(someUri, String.class);            // (2.1)
restTemplate.getForObject(someUri, String.class);            // (2.2)
}                                                               //
...                                                             //
}                                                                  //

上述代码可以描述如下:

  1. 这是类的声明,由@SpringBootApplication 注释。同时,这个类是一个用 @RestController注解的控制器。为了使这个例子尽可能简单,我们跳过了初始化过程并在这个类中声明了字段,如 (1.1) 处所示。
  2. 在这里,我们有一个带有 @GetMapping 声明的 get 方法。为了减少冗余的网络流量并只关注框架性能,我们不会在响应正文中返回任何内容。根据上图的流程,我们向远程服务器执行两次HTTP请求,分别在(2.1) (2.2)

从前面的示例和模式中可以看出,中间件的平均响应时间应该在 400 毫秒左右。

请注意,对于此测试,我们将使用 Tomcat Web 服务器,这是 Web MVC 的默认设置。此外,为了了解 Web MVC 的性能如何变化,我们将设置与并发用户一样多的 线程 实例。以下 sh 脚本显示了 Tomcat 的设置:

java -Xss512K -Xmx24G -Xms24G 
   -Dserver.tomcat.prestartmin-spare-threads=true
   -Dserver.tomcat.prestart-min-spare-threads=true 
   -Dserver.tomcat.max-threads=$1
   -Dserver.tomcat.min-spare-threads=$1
   -Dserver.tomcat.max-connections=100000
   -Dserver.tomcat.accept-count=100000
   -jar ...

从前面的脚本可以看出,max-threads 和min-spare-threads的值 参数是动态的,由测试中的并行用户数定义。

笔记

前面的设置不是生产就绪的,仅用于展示 Spring Web MVC 中使用的线程模型的缺点,特别是每个连接的线程模型。

通过针对我们的服务启动测试套件,我们将获得以下结果曲线:

读书笔记《hands-on-reactive-programming-in-spring-5》WebFlux 异步非阻塞通信

图 6.14。 Web MVC 吞吐量测量结果

 

上图显示,在某个时刻,我们开始失去吞吐量,这意味着我们的应用程序中存在争用或不一致。

为了比较 Web MVC 框架的性能结果,我们还必须对 WebFlux 运行相同的测试。以下是我们用来衡量基于 WebFlux 的应用程序性能的代码:

@RestController                                                   
@SpringBootApplication
public class ReactiveDemoApplication 
implements InitializingBean {
   ...
   @GetMapping("/")
public Mono<Void> get() {                                       // (1)
return                                                       // 
webClient                                                 //
.get()                                              // (2)
.uri(someUri)                                       // 
.retrieve()                                         // 
.bodyToMono(DataBuffer.class)                       //
.doOnNext(DataBufferUtils::release)                 //
.then(                                                    // (3)
webClient                                              //
.get()                                              // (4)
.uri(someUri)                                       //
.retrieve()                                         //
.bodyToMono(DataBuffer.class)                       //
.doOnNext(DataBufferUtils::release)                 //
.then()                                             //
)                                                         //
.then();                                                  // (5)
}
    ...
}

上面的代码表明我们现在正在积极地使用 Spring WebFlux 和 Project Reactor 特性来实现异步和非阻塞的请求和响应处理。就像在 Web MVC 案例中一样,在 (1) 点,我们返回一个Void结果,但它现在被包裹在反应类型, Mono。 然后,我们使用 WebClient API 执行 remote 调用,然后在 (3) 点,我们以相同的顺序方式执行第二次远程调用,显示在 (4) 。最后,我们跳过两次调用的执行结果并返回一个 Mono  通知订阅者两次执行都完成的结果。

 

笔记

请注意,使用 Reactor 技术,我们可以在不并行执行两个请求的情况下缩短执行时间。由于这两种执行都是非阻塞和异步的,因此我们不必为此分配额外的 Thread instances。但是,为了保持 图 6.13 中提到的系统行为,我们保持执行顺序,因此产生的延迟应该是 ~400 毫秒一般。

通过针对我们基于 WebFlux 的中间件启动测试套件,我们将获得 以下 结果曲线:

读书笔记《hands-on-reactive-programming-in-spring-5》WebFlux 异步非阻塞通信

图 6.15。 WebFlux 吞吐量测量结果

从上图中我们可以看出,WebFlux 曲线的趋势与 WebMVC 曲线有些相似。

 

为了比较两条曲线,我们将 它们 放在同一个图上:

读书笔记《hands-on-reactive-programming-in-spring-5》WebFlux 异步非阻塞通信

图 6.16。 WebFlux 与 Web MVC 吞吐量测量结果比较

在上图中,+ (plus) 符号的 行用于Web MVC 和 - (dash) 符号行用于 WebFlux。在这种情况下,越高意味着越好;正如我们所见,WebFlux 的吞吐量几乎是其两倍。

另外,这里应该注意的是,在 12,000 个并行用户之后没有 Web MVC 的测量值。问题是 Tomcat 的线程池占用了太多内存并且不适合给定的 28 GB。因此,每次 Tomcat 尝试使用超过 12,000 个Thread 实例时,Linux 内核都会终止该进程。这一点强调 thread-per-connection 模型不适合我们需要处理超过 10,000 个用户的情况。

笔记

前面的比较是 thread-per-connection 模型非阻塞异步的比较处理模型。在第一种情况下,在不显着影响延迟的情况下处理请求的唯一方法是为每个用户指定一个单独的 线程 。通过这种方式,我们最大限度地减少了用户在队列中等待可用 Thread 所花费的时间。相比之下,WebFlux 的配​​置不需要为每个用户分配单独的 Thread,因为我们使用的是非阻塞 I/O。 在实际场景中, Tomcat 服务器的常规配置对线程池的大小是有限的。

尽管如此,两条曲线都显示出相似的趋势并具有临界点,之后它们的吞吐量开始下降。这可能是因为许多系统在开放客户端连接方面存在局限性。另外,比较可能有点不公平,因为我们使用不同的 HTTP 客户端实现,具有不同的配置。例如, RestTemplate 的默认连接策略是在每次新调用时分配一个新的 HTTP 连接。相比之下,默认的 基于Netty的 WebClient 实现在底层使用连接池。在这种情况下,可以重用连接。即使系统可能被调整为重用打开的连接,这种比较可能是错误的。

因此,为了获得更好的比较,我们将通过提供 400 毫秒的延迟来模拟 网络活动。对于这两种情况,都使用以下代码:

Mono.empty()
    .delaySubscription(Duration.ofMillis(200))
    .then(Mono.empty()
              .delaySubscription(Duration.ofMillis(200)))
    .then()

对于WebFlux,返回类型是 Mono , 对于Web MVC,通过调用  结束执行流程.block() 操作, 所以 线程将被阻塞一段指定的延迟。在这里,我们使用相同的代码来获得相同的延迟调度行为。

我们还将使用类似的云设置。对于中间件,我们将使用 E4S V3 VM(四个虚拟 CPU 和 32 GB RAM)和客户端,B4MS VM(四个虚拟 CPU 和 16 GB RAM)。

通过针对服务运行我们的测试套件,可以观察到以下结果:

读书笔记《hands-on-reactive-programming-in-spring-5》WebFlux 异步非阻塞通信

图 6.17。 WebFlux 与 Web MVC 吞吐量测量结果比较,无需额外 I/O

在上图中, + (plus) 符号行用于Web MVC 和 - (dash) 符号行适用于 WebFlux。正如我们所看到的,总体结果高于实际外部调用。这意味着应用程序中的连接池或操作系统中的连接策略对系统性能有巨大的影响

尽管如此,WebFlux 仍然显示出两倍于 Web MVC 的吞吐量,这最终证明了我们关于每个连接线程模型效率低下的假设。 WebFlux 的行为仍然与阿姆达尔定律所建议的一样。但是,我们应该记住,除了应用限制之外,还有系统限制,这可能会改变我们对最终结果的解释。

我们还可以比较两个模块的延迟和 CPU 使用率,如图 6.18< em>6.19 分别为:

读书笔记《hands-on-reactive-programming-in-spring-5》WebFlux 异步非阻塞通信

图 6.18。 WebFlux 和 Web MVC 在没有额外 I/O 的情况下的延迟比较

在上图中, + (plus) 符号行用于Web MVC 和 - (dash) 符号行适用于 WebFlux。在这种情况下,结果越低越好。上图描述了 Web MVC 延迟的巨大下降。在 12,000 个并发用户的并行化水平上,WebFlux 的响应时间快了大约 2.1 倍。

从CPU使用率来看,我们有以下趋势:

读书笔记《hands-on-reactive-programming-in-spring-5》WebFlux 异步非阻塞通信

图 6.19。没有额外 I/O 的 WebFlux 和 Web MVC 的 CPU 使用比较

在上图中,实线表示 Web MVC,虚线表示 WebFlux。同样,在这种情况下,结果越低越好。 我们可以得出结论,WebFlux 在吞吐量、延迟和 CPU 使用率方面效率更高。 CPU 使用率的差异可以通过不同Thread 实例之间的冗余工作上下文切换来解释。

WebFlux 处理模型的挑战

WebFlux 与 Web MVC 显着不同。由于系统中没有阻塞 I/O ,我们可以只使用几个 Thread 实例来处理所有请求。处理事件同时不需要更多的线程 instances比系统中的处理器/内核。

笔记

这是因为 WebFlux 是建立在 Netty 之上的,其中 Thread instances 的默认数量是 the Runtime.getRuntime( ).availableProcessors()乘以二。

尽管使用非阻塞操作允许异步处理结果(参见 图 6.11),但我们可以更好地扩展,利用 CPU 更多高效,将 CPU 周期花费在实际处理上,并减少上下文切换的浪费,异步非阻塞处理模型有其自身的缺陷。首先,重要的是要了解 CPU 密集型任务应该安排在单独的 Thread  或 ThreadPool 实例。这个问题不适用于每连接线程模型或线程池具有大量工作人员的类似模型,因为在这种情况下,每个连接已经有一个专用工作人员。通常,大多数对此类模型有丰富经验的开发人员会忘记这一点,并在主线程上执行 CPU 密集型任务。像这样的错误会付出高昂的代价,并且会影响整体性能。在这种情况下,主线程忙于处理,没有时间接受或处理新连接:

读书笔记《hands-on-reactive-programming-in-spring-5》WebFlux 异步非阻塞通信

图 6.20。单处理器环境中的 CPU 密集型工作

从上图中我们可以看出,即使整个请求处理线由白条组成(这意味着没有阻塞 I/O),我们也可以通过运行硬计算来堆叠处理,从而窃取其他请求的处理时间。

为了解决这个问题,我们应该将长时间运行的工作委托给 separate 处理器池,或者在单个处理器的情况下-process 节点,将工作委托给不同的节点。例如,我们可以组织一个高效的事件循环(https://en.wikipedia.org /wiki/Event_loop),其中一个线程 接受连接,然后将实际处理委托给不同的工作/节点池:

读书笔记《hands-on-reactive-programming-in-spring-5》WebFlux 异步非阻塞通信

图 6.21。类 Netty 的非阻塞服务器架构

与异步、非阻塞编程有关的另一个常见错误是阻塞操作使用。 Web 应用程序开发的一个棘手部分是生成唯一的 UUID:

UUID requestUniqueId = java.util.UUID.randomUUID();

这里的问题是 #randomUUID() 使用 SecureRandom。典型的加密强度随机数生成器使用应用程序外部的熵源。它可能是硬件随机数生成器,但更常见的是累积随机性 由操作系统在正常运行中收集。

笔记

在这种情况下, 随机性 的概念是指诸如鼠标移动、电力变化和其他可能由系统在运行时收集的随机事件之类的事件。

问题是熵的来源有一个速率限制。如果在一段时间内超过这个值,对于某些系统,读取熵的系统调用将停止,直到有足够的熵可用。此外,线程的数量对 UUID 的生成性能有很大的影响。这可以通过查看 SecureRandom#nextBytes(byte[] bytes)的实现来解释, 它为 生成随机数UUID.randomUUID():

synchronized public void nextBytes(byte[] bytes) {
   secureRandomSpi.engineNextBytes(bytes);
}

正如我们所见, #nextBytes 是同步的,当被不同线程访问时,会导致显着的性能损失。

笔记

要了解有关 SecureRandom 的解析的更多信息,请参阅以下 Stack Overflow 答案:https://stackoverflow.com/questions/137212/how-to-solve-slow-java-securerandom

正如我们所了解的,WebFlux 使用几个线程以异步和非阻塞方式处理大量请求。我们必须小心使用那些乍一看似乎是无 I/O 操作但实际上隐藏了与操作系统的特定交互的方法。如果不适当注意这些方法,我们可能会显着降低整个系统的性能。因此,对 WebFlux 仅使用非阻塞操作至关重要。然而,这样的要求给反应式系统的开发带来了很多限制。例如, 整个 Java 开发工具包是为 Java 生态系统组件之间的命令式同步交互而设计的。因此, 很多阻塞操作没有非阻塞、异步的类比,这使得很多非阻塞、反应式系统开发变得复杂。虽然 WebFlux 为我们提供了更高的吞吐量和更低的延迟,但我们必须非常关注我们正在使用的所有操作和库。

此外,在复杂计算是我们服务的 central 操作的情况下,简单的基于线程的处理模型优于非阻塞、异步处理模型。此外,如果与 I/O 交互的所有操作都是阻塞的,那么我们不会像使用非阻塞 I/O 那样获得那么多好处。此外,用于事件处理的非阻塞和异步算法的复杂性可能是多余的,因此 Web MVC 中的简单线程模型将比 WebFlux 更有效。

尽管如此,对于没有此类限制或特定用例的情况,并且我们有大量的 I/O 交互,非阻塞和异步的 WebFlux 将大放异彩。

不同处理模型对内存消耗的影响

框架分析的另一个关键组成部分是比较 内存使用情况。回想一下我们在 Chapter 1 中对每个连接模型的 Thread 的讨论,  ;为什么选择 Reactive Spring?,我们知道,我们不是为微小事件的对象分配内存,而是分配一个巨大的专用 线程 对于每个新连接。我们应该记住的第一件事是 Thread 为其堆栈保留一些空间。实际堆栈大小取决于操作系统和 JVM 配置。默认情况下,对于大多数在 64 位上运行的常见服务器,VM 堆栈大小为 1 MB。

笔记

事件是指有关系统状态变化的信号,例如打开的连接或数据可用性。

对于高负载场景,使用这种技术,我们将有很高的内存消耗。最多,将整个 1 MB 堆栈与请求和响应正文一起保留会产生不合理的开销。如果专用线程池受到限制,将导致吞吐量和平均延迟下降。所以,在 Web MVC 中,我们必须平衡内存使用和系统吞吐量。相比之下,正如我们从上一节中了解到的,WebFlux 可以使用固定数量的 Thread 实例来处理更多的请求,同时 使用 一个可预测的数量的记忆。要全面了解在以前的测量中如何使用内存,请查看内存使用比较:

读书笔记《hands-on-reactive-programming-in-spring-5》WebFlux 异步非阻塞通信

图 6.22。 WebFlux 和 Web MVC 的内存使用对比 

在上图中,实线表示 Web MVC,虚线表示 WebFlux。在这种情况下, 越低越好。 需要注意的是,这两个应用程序都会被赋予额外的 JVM 参数——Xms26GB 和  Xmx26GB。这意味着两个应用程序都可以访问相同数量的专用内存。但是,对于 Web MVC,内存使用量随着并行化程度的提高而增长。如本节开头所述,通常的 线程 堆栈大小为 1 MB。在我们的例子中,Thread 堆栈大小设置为 -Xss512K,因此每个新线程需要额外的约 512 KB 内存。因此,对于 thread-per-connection 模型,我们的内存使用效率很低。 

相比之下,对于 WebFlux,尽管并行化,内存使用仍是稳定的。这意味着 WebFlux 更优化地消耗内存。换句话说,这意味着通过 WebFlux,我们可以使用更便宜的服务器。

为确保这是正确的假设,让我们尝试运行一个小型实验, 再次验证内存使用的可预测性以及它如何在不可预测的情况下帮助我们。对于这个测试,我们将尝试分析我们将在使用 Web MVC 和 WebFlux 的云基础设施上花费多少。

为了衡量系统的上限,我们将进行压力测试并验证我们的系统能够处理多少请求。在运行我们的 Web 应用程序时,我们将启动一个 Amazon EC2 t2.small 实例,它有一个虚拟 CPU 和 2 GB RAM。操作系统将是带有 JDK 1.8.0_144 和 VM 25.144-b01 的 Amazon Linux。对于第一轮测量,我们将使用 Spring Boot 2.0.x 和带有 Tomcat 的 Web MVC。此外,为了模拟网络调用和其他 I/O 活动,这是现代系统的常见组件,我们将使用 以下 天真的代码:

@RestController
@SpringBootApplication
public class BlockingDemoApplication {
   ...
   @GetMapping("/endpoint")
public String get() throws InterruptedException {
Thread.sleep(1000);
return "Hello";
}
}

要运行我们的应用程序,我们将使用以下命令:

java -Xmx2g 
     -Xms1g
     -Dserver.tomcat.max-threads=20000 
-Dserver.tomcat.max-connections=20000 
-Dserver.tomcat.accept-count=20000 
-jar blocking-demo-0.0.1-SNAPSHOT.jar

因此,通过上述配置,我们将检查我们的系统是否可以无故障处理多达 20,000 个用户。如果我们运行负载测试,我们将得到以下结果:

同时请求数

平均延迟(毫秒)

100

1,271

1,000

1,429

10,000

OutOfMemoryError/Killed

这些结果可能会随着时间而变化,但平均而言它们是相同的。正如我们所见,2 GB 的内存不足以处理每个连接 10,000 个独立线程。当然,通过 JVM 和 Tomcat 的具体配置调优玩玩,或许可以稍微改善一下我们的结果,但这并不能解决不合理的内存浪费问题。通过保持相同的应用程序服务器并仅切换到 Servlet 3.1 上的 WebFlux,我们可能会看到显着的改进。新的 Web 应用程序如下所示:

@RestController
@SpringBootApplication
public class TomcatNonBlockingDemoApplication {
   ...
   @GetMapping("/endpoint")
public Mono<String> get() {
return Mono.just("Hello")
                 .delaySubscription(Duration.ofSeconds(1));
}
}

在这种情况下,与 I/O 的交互模拟将是异步和非阻塞的,这很容易通过 fluent Reactor 3 API 获得。

笔记

请注意,WebFlux 的默认服务器引擎是 Reactor-Netty。因此,为了切换到 Tomcat Web 服务器,我们必须从 WebFlux 中排除 spring-boot-starter-reactor-netty 并提供对 < code class="literal">spring-boot-starter-tomcat 模块。

要运行新堆栈,我们将使用 以下 命令:

java -Xmx2g 
     -Xms1g
     -Dserver.tomcat.accept-count=20000 
-jar non-blocking-demo-tomcat-0.0.1-SNAPSHOT.jar

同样,我们为 Java 应用程序分配所有 RAM,但在本例中,我们使用默认线程池大小,即 200 个线程。通过运行相同的测试,我们将得到以下结果:

同时请求数

平均延迟(毫秒)

100

1,203

1,000

1,407

10,000

9,661

正如我们所观察到的,在这种情况下,我们的应用程序显示出更好的结果。我们的结果仍然不理想,因为一些高负载的用户将不得不等待相当长的时间。为了改善结果,让我们检查真正反应式服务器的吞吐量和延迟,即 Reactor-Netty。

由于运行新 Web 应用程序的代码和命令是相同的,我们只介绍基准测试结果:

同时请求数

平均延迟(毫秒)

1,000

1,370

10,000

2,699

20,000

6,310

 

正如我们所看到的,结果要好得多。首先,对于 Netty,我们选择了一次至少 1000 个连接的吞吐量。上限设置为 20,000。这足以表明,Netty 作为服务器在相同配置下的性能是 Tomcat 的两倍。仅此比较就表明,基于 WebFlux 的解决方案可能会降低基础设施成本,因为现在我们的应用程序可以安装在更便宜的服务器上并以更有效的方式消耗资源。

WebFlux 模块带来的另一个好处是能够更快地处理传入的请求正文,并且内存消耗更少。当传入的正文是 collection 元素并且我们的系统可以单独处理每个项目时,此功能会打开:

读书笔记《hands-on-reactive-programming-in-spring-5》WebFlux 异步非阻塞通信

图 6.23。 WebFlux 以小块处理大量数据

笔记

要了解有关反应式消息编码和解码的更多信息,请参阅此链接: https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html#webflux-codecs .

从上图中我们可以看出,系统只需要一小部分请求体即可开始处理数据。当我们向客户端发送响应正文时,也可以实现同样的效果。我们不必等待整个响应体,而是可以开始将每个元素写入网络。下面展示了我们如何使用 WebFlux 实现这一点:

@RestController
@RequestMapping("/api/json")
class BigJSONProcessorController {

   @GetMapping(
      value = "/process-json",
produces = MediaType.APPLICATION_STREAM_JSON_VALUE
   )
public Flux<ProcessedItem> processOneByOne(Flux<Item> bodyFlux) {
return bodyFlux
.map(item -> processItem(item))
         .filter(processedItem -> filterItem(processedItem));
}
}

正如我们从前面的代码中看到的那样,这些惊人的功能无需破解 Spring WebFlux 模块的内部即可获得,并且可以通过使用可用的 API 来实现。此外,这种处理模型的使用使我们能够更快地返回第一个响应,因为从将第一个项目上传到网络和接收到响应之间的时间等于以下内容:

读书笔记《hands-on-reactive-programming-in-spring-5》WebFlux 异步非阻塞通信

笔记

请注意,streaming 数据处理技术不允许我们预测 内容长度 的响应体,这可能被认为是一个缺点。

相比之下,Web MVC 需要将整个 request 上传到内存中。只有在那之后它才能处理传入的正文:

读书笔记《hands-on-reactive-programming-in-spring-5》WebFlux 异步非阻塞通信

图 6.24。 Web MVC 一次处理大量数据

像在 WebFlux 中那样被动地处理数据是不可能的,因为  @Controller  的通常声明如下所示:

@RestController
@RequestMapping("/api/json")
class BigJSONProcessorController {

   @GetMapping("/process-json") 
public List<ProcessedItem> processOneByOne(
      List<Item> bodyList
   ) {
return bodyList
.stream()
         .map(item -> processItem(item))
         .filter(processedItem -> filterItem(processedItem))
         .collect(toList());
}
}

在这里,方法声明明确要求将完整的请求主体转换为特定项目的集合。从数学的角度来看,平均处理时间 等于 如下:

读书笔记《hands-on-reactive-programming-in-spring-5》WebFlux 异步非阻塞通信

 

同样,将第一个结果返回给用户需要处理整个请求正文并将结果聚合到集合中。只有在那之后,我们的系统才能向客户端发送响应。这意味着 WebFlux 使用的内存比 Web MVC 少得多。 WebFlux 将能够比 Web MVC 更快地返回第一个响应,并且能够处理无限的数据流。

处理模型对可用性的影响

我们对 Web MVC 和 WebFlux 的比较应该包含一些定性和定量指标。衡量的最常见的定性指标之一是学习曲线。 Web MVC 是一个著名的框架,在企业领域有十多年的活跃使用。它依赖于最简单的编程范式,即命令式编程范式。对于企业来说,这意味着如果我们开始一个基于普通 Spring 5 和 Web MVC 的新项目,找到熟练的开发人员会容易得多,教新人的成本也会低得多。相比之下,使用 WebFlux,情况就会大不相同。首先,WebFlux 是一项新技术,尚未充分证明自己,可能 潜在地 有很多错误和漏洞。底层的异步、非阻塞编程范式也可能是一个问题。首先,很难调试异步、非阻塞代码,Netflix 将 Zuul 迁移到新的编程模型的经验证明了这一点。

笔记

异步编程是基于回调的,由事件循环驱动。尝试跟踪请求时,事件循环的堆栈跟踪毫无意义。这是因为事件和回调被处理并且很少有工具可以帮助调试。边缘情况、未处理的异常和错误处理的状态更改会产生悬空资源,从而导致 ByteBuf 泄漏、文件描述符泄漏、丢失响应等。事实证明,这些类型的问题很难调试,因为很难知道哪个事件没有正确处理或没有正确清理。欲了解更多信息,请访问 https://medium.com/netflix-techblog/zuul-2-the-netflix-journey-to-asynchronous-non-blocking-systems-45947377fb5c

此外,从业务角度来看,寻找对异步和非阻塞编程有深入了解的高技能工程师可能是不合理的,尤其是使用 Netty 堆栈。从一开始就教新开发人员需要花费大量时间和金钱,并且不能保证他们会完全理解。幸运的是,这个问题的某些部分通过使用 Reactor 3 得到了解决,这使得构建有意义的转换流程更简单,并隐藏了异步编程中最困难的部分。不幸的是,Reactor 并不能解决所有问题,而且对于企业而言,对人员和风险技术进行如此不可预测的财务投资可能不值得。

关于定性分析的另一个要点是将现有解决方案迁移到新的反应式堆栈。尽管从框架开发之初,Spring 团队就一直在尽最大努力提供平滑迁移,但仍然很难预测所有迁移情况。例如,那些依赖 JSP、Apache Velocity 或类似服务器端渲染技术的人将需要迁移整个 UI 相关代码。此外,许多现代框架依赖 ThreadLocal, 这使得平滑移动到异步、非阻塞编程 具有挑战性。除此之外,还有很多与数据库相关的问题,在Chapter 7反应式数据库访问

WebFlux 的应用


在前面的部分中,我们了解了 WebFlux 的设计及其新功能的基础。我们还对 WebFlux 和 Web MVC 进行了细粒度的比较。我们从不同的角度了解了它们的优缺点。最后,在本节中,我们将尝试对 WebFlux 的应用有一个清晰的了解。

基于微服务的系统

WebFlux 的第一个明显应用是在微服务系统中。与单体相比,典型微服务系统最独特的特征是丰富的I/O通信。 I/O 的存在,尤其是阻塞 I/O,降低了整个系统的延迟和吞吐量。每个连接线程模型中的争用和一致性不会显着提高系统性能。这意味着对于服务间调用很重要的系统或特定服务,WebFlux 将是最有效的解决方案之一。这种服务的一个例子是支付流编排服务。

通常,一个简单的操作,比如账户间的转账,背后都有一个隐藏的、错综复杂的机制,其中包括一套检索、验证,然后是实际的传输执行操作。例如,当我们使用 PayPal 汇款时,第一步可能是检索汇款人和收款人的账户。然后,由于 PayPal 可以将资金从任何国家转移到任何国家,因此验证转移不会违反这些国家的法律很重要。每个帐户可能有自己的限制和限制。最后,收款人可能有内部 PayPal 账户或外部信用卡或借记卡,因此,根据账户类型,我们可能需要额外调用外部系统:

读书笔记《hands-on-reactive-programming-in-spring-5》WebFlux 异步非阻塞通信

图 6.25。 PayPal 支付流程的示例实现

 

通过在如此复杂的流程中配置非阻塞、异步通信,我们可以有效地处理其他请求并有效地利用计算机资源。

处理连接速度慢的客户端的系统

WebFlux 的第二个应用是构建旨在与网络连接缓慢或不稳定的客户端的移动设备一起工作的系统。为了理解为什么 WebFlux 在这方面很有用,让我们提醒自己在处理慢速连接时发生了什么。问题是 transfering 数据从客户端到服务器可能需要大量时间,并且相应的响应也可能需要很多时间。使用每连接线程模型,随着连接客户端数量的增加,系统崩溃的可能性更高。例如,使用 拒绝服务 (DoS) 攻击,黑客 很容易让 我们的服务器不可用。

相比之下,WebFlux 允许我们在不阻塞工作线程的情况下接受连接。这样,慢速连接不会导致任何问题。 WebFlux 将在等待传入的请求正文时继续接收其他连接而不会阻塞。 Reactive Streams 抽象允许我们在需要时使用数据。这意味着服务器可以根据网络的准备情况来控制事件消耗。

流媒体或实时系统

WebFlux 的另一个有用的应用是实时和流系统。要了解 WebFlux 为何能在这方面发挥作用,让我们提醒自己什么是实时和流系统。

首先,这些系统的特点是延迟和高吞吐量。对于流式系统,大部分数据都是从服务器端传出的,因此 客户端扮演消费者的角色。 通常来自客户端的事件少于来自服务器端的事件。但是,对于在线游戏等实时系统来说,传入的数据量等于传出的数据量。

使用非阻塞通信可以实现低延迟和高吞吐量。正如我们从前几节中了解到的,非阻塞、异步通信允许有效地利用资源。基于 Netty 或类似框架的系统显示了最高吞吐量和最低 延迟。然而,这种反应式框架有其自身的缺点,即使用通道和回调的复杂交互模型。

尽管如此,响应式编程是解决这两个问题的优雅解决方案。正如我们在 第 4 章, Project Reactor - Reactive 的基础应用程序,反应式编程,尤其是反应式库,如 Reactor 3,帮助我们构建一个异步的、非阻塞的流程,而底层代码库的复杂性和可接受的学习曲线只有很小的开销.两种解决方案都集成到 WebFlux 中。使用 Spring Framework 可以让我们轻松构建这样的系统。

WebFlux 在行动

为了了解我们如何在实际场景中使用 WebFlux,我们将构建一个简单的 Web 应用程序连接到使用 WebClient 的远程 Gitter Streams API,使用 Project Reactor API 转换数据,然后使用 SSE 将转换后的消息广播到世界。下图显示了系统的示意图:

读书笔记《hands-on-reactive-programming-in-spring-5》WebFlux 异步非阻塞通信

图 6.26。流式应用程序的示意图设计

上图可以这样描述:

  1. 这是与 Gitter API 的集成点。从上图中我们可以看出,我们的服务器和 Gitter 之间的通信是流式的。因此,反应式编程自然适合那里。
  2. 这是系统中我们需要处理传入消息并将它们转换为不同视图的点。
  3. 这是我们缓存接收到的消息并将它们广播到每个连接的客户端的地方。
  4. 这是连接的浏览器的表示。

正如我们所见,该系统中有四个核心组件。为了构建这个系统,我们将创建以下类和接口:

  • ChatServeice: This is the interface responsible for wiring communication with a remote server. It provides the ability to listen to messages from that server.
  • GitterService: This is the implementation of the ChatService interface that connects to the Gitter streaming API in order to listen to new messages.
  • InfoResource: This is the handler class that handles user requests and responds with a stream of messages.

实现系统的第一步是分析ChatService接口。以下示例显示了所需的方法:

interface ChatService<T> {

Flux<T> getMessagesStream();

Mono<List<T>> getMessagesAfter(String messageId);
}

前面的示例界面涵盖了与消息阅读和收听相关的最低要求功能。在这里,getMessagesStream 方法在聊天中返回无限的新消息流,而 getMessagesAfter 允许我们检索一个具有特定消息 ID 的消息列表。

在这两种情况下,Gitter 都通过 HTTP 提供对其消息的访问。这意味着我们可以使用普通的 WebClient。以下是我们如何实现 getMessagesAfter 并访问远程服务器的示例:

Mono<List<MessageResponse>> getMessagesAfter(                      //
   String messageId                                                //
) {                                                                //
   ...                                                             //
return webClient                                                // (1)
      .get()                                                       // (2)
      .uri(...)                                                    // (3)
      .retrieve()                                                  // (4)
      .bodyToMono(                                                 // (5)
         new ParameterizedTypeReference<List<MessageResponse>>() {}//
      )                                                            //
      .timeout(Duration.ofSeconds(1))                              // (6)
      .retryBackoff(Long.MAX_VALUE, Duration.ofMillis(500));       // (7)
}

前面的代码示例展示了我们如何组织与 Gitter 服务的普通请求-响应交互。在这里,在 (1) 处, 我们使用 WebClient 实例来执行 GET HTTP 方法调用 (2) 到远程 Gitter 服务器 (3)。然后 检索(4)点的信息,并使用WebClient DSL将其转换为MessageResponse ( 5) 。然后,为了提供与外部服务通信的弹性,我们为 (6)< /code>,如果出现错误,请在 (7) 点重试调用。

与流式 Gitter API 通信就这么简单。下面展示了我们如何连接到 Gitter 服务器的 JSON 流 (application/stream+json) 端点:

public Flux<MessageResponse> getMessagesStream() {                 //
return webClient                                                //
      .get()                                                       // (1)
      .uri(...)                                                    //
      .retrieve()                                                  //
      .bodyToFlux(MessageResponse.class)                           // (2)
      .retryBackoff(Long.MAX_VALUE, Duration.ofMillis(500));       //
}                                                                  //

正如我们在前面的代码中看到的,我们使用与前面相同的 API,如 (1) 处所示。我们所做的唯一更改是隐藏的 URI,以及我们映射到 Flux 而不是 Mono< /code>,如 (2) 处所示。 在底层,< code class="literal">WebClient 使用 Decoder 容器中可用的。如果我们有一个无限流,这允许我们动态转换元素,而无需等待流结束。

最后,为了将两个流合并为一个并缓存它们,我们可以实现以下代码,它提供了一个实现InfoResource 处理程序:

@RestController                                                    // (1)
@RequestMapping("/api/v1/info")                                    //
public class InfoResource {                                        //

   final ReplayProcessor<MessageVM> messagesStream                 // (2)
      = ReplayProcessor.create(50);                                //

public InfoResource(                                            // (3)
      ChatService<MessageResponse> chatService                     //
   ) {                                                             //
     Flux.mergeSequential(                                         // (3.1)
            chatService.getMessageAfter(null)                      // (3.2)
                       .flatMapIterable(Function.identity())       //
chatService.getMessagesStream()                        // (3.3)
         )                                                         //
         .map(...)                                                 // (3.4)
         .subscribe(messagesStream);                               // (3.5)
}

@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)       // (4)
public Flux<MessageResponse> stream() {                         //
return messagesStream;                                       // (4.1)
}                                                               //
}                                                                  //

前面的代码可以解释如下:

  1. 这是用 @RestController注释的类的声明。
  2. 这是 ReplayProcessor 字段声明。我们可能还记得 第 4 章, Project Reactor - 基金会对于反应式应用程序, ReplayProcessor 允许我们缓存预定义数量的元素并将最新元素重播给每个新订阅者。
  1. 在这里,我们声明了 InfoResource 类的构造函数。在构造函数中,我们构建了一个处理流程,它合并了来自 Gitter 的最新消息流(显示在 3.13.2 )。在 ID 为空的情况下,Gitter 返回最新的 30 条消息。处理流程还近乎实时地监听新消息流,如点 (3.3) 所示。然后,所有消息都映射到视图模型,如点 (3.4) 所示,并且流立即被 订阅重播处理器。这意味着一旦构造了 InfoResource bean,我们就连接到 Gitter 服务,缓存最新消息,并开始监听更新。请注意 mergeSequential 同时订阅两个流,但会开始发​​送 消息从第二个开始,但仅在第一个流完成时。由于第一个流是有限的,我们接收最新消息并开始从 getMessagesStream Flux。
  2. 这是一个处理程序方法声明,在到指定端点的每个新连接上都会调用它。在这里,我们可能只返回 ReplayProcessor 实例,显示在 (4.1) 点,因此它将共享最新缓存消息并在可用时发送新消息。

正如我们在前面的示例中所见,提供复杂的功能,例如以正确的顺序合并流,或缓存最新的 50 条消息并将它们动态广播给所有订阅者,不需要大量的工作或编写代码。 Reactor 和 WebFlux 涵盖了最难的部分,让我们 只需 编写业务逻辑。这可以实现与 I/O 的高效非阻塞交互。因此,我们可以通过使用这个强大的工具包来实现 一个高吞吐量和低延迟的系统。

 

概括


在本章中,我们了解到 WebFlux 是旧的 Web MVC 框架的有效替代品。我们还了解到 WebFlux 使用相同的技术来声明请求处理程序(使用众所周知的 @RestController@Controller)。除了标准的处理程序声明之外,WebFlux 还引入了一个使用 RouterFunction 的轻量级函数式端点声明。很长一段时间以来,Spring 框架的用户都无法使用现代的响应式 Web 服务器,例如 Netty,以及非阻塞的 Undertow 功能。借助 WebFlux Web 框架,这些技术可以使用相同的、熟悉的 API 来使用。由于 WebFlux 是基于异步非阻塞通信的,所以这个框架依赖于 Reactor 3,它是模块的核心组件。

我们还探索了新的 WebFlux 模块引入的变化。其中包括基于 Reactor 3 Reactive Types 对用户和服务器之间通信的更改;改变服务器和外部服务之间的通信,特别是使用新的 WebClient 技术;和一个新的 WebSocketClient,它允许客户端-服务器通过 WebSocket 进行通信。此外,WebFlux 是一个跨库框架,这意味着这里支持任何基于 Reactive Streams 的库,并且可以替换默认的 Reactor 3 库或任何其他首选库。

之后本章从不同的角度介绍了 WebFlux 和 Web MVC 的详细对比。总而言之,在大多数情况下,WebFlux 是高负载 Web 服务器的正确解决方案,并且在所有性能结果中,它的性能是 Web MVC 的两倍。我们研究了使用 WebFlux 模块的业务收益,并考虑了 WebFlux 如何简化工作。我们还研究了这项技术的缺陷。

最后,我们了解了 WebFlux 是最合适的解决方案的一些用例。这些案例是微服务系统、实时流系统、在线游戏和其他类似应用领域,其重要特征包括低延迟、高吞吐量、低内存占用和高效的 CPU 利用率。

虽然我们已经了解了 Web 应用程序的核心方面,但还有一个更重要的部分,那就是与数据库的交互。在下一章中,我们将介绍与数据库进行响应式通信的主要特性,哪些数据库支持响应式通信,以及在没有响应式支持的情况下我们应该做什么。