vlambda博客
学习文章列表

读书笔记《building-microservices-with-spring》实现反应式设计模式

Chapter 7. Implementing Reactive Design Patterns

在本章中,我们将探讨 Spring 5 框架最重要的特性之一,即响应式模式编程。 Spring 5 框架通过 Spring Web 反应模块引入了这个新特性。我们将在本章中讨论这个模块。在此之前,让我们看一下响应式模式。什么是响应式模式,为什么它现在越来越流行?我将从微软公司首席执行官 Satya Nadella 的以下声明开始我对反应模式的讨论:

现在的每个企业都是软件公司,是数字公司。

我们将在这里讨论的主题如下:

  • Why reactive pattern?
  • The reactive pattern principles
  • Blocking calls
  • Non-blocking calls
  • Back-pressure
  • Implementing the reactive pattern using the Spring Framework
  • The Spring web reactive module

Understanding application requirement over the years


如果您回到 10 到 15 年,与过去相比,互联网用户非常很少,面向最终用户的在线门户网站也少得多。我们今天所拥有的。如今,我们无法想象没有电脑或没有任何在线系统的生活。简而言之,我们已经变得非常依赖个人和商业用途的计算机和在线计算。每种商业模式都在朝着数字化方向发展。印度总理纳伦德拉·达莫达达斯·莫迪先生发起了一项数字印度运动,以确保通过改善在线基础设施、增加互联网连接以及使国家在技术领域获得数字化授权,以电子方式向公民提供政府服务.

这一切都意味着互联网用户的数量正在急剧增加。根据爱立信移动报告,

物联网 (IoT) 有望在 2018 年超越手机成为最大的联网设备类别。

移动互联网用户的增长势头迅猛,而且没有任何迹象表明这种增长很快会放缓。在这些领域,根据定义,服务器端必须同时处理数百万个连接的设备。下表将当今的基础架构和应用程序要求与 10 年前的要求进行了比较:

要求现在十年前

服务器节点

需要超过 1000 个节点。

十个节点就足够了。

响应时间

需要几毫秒来处理请求并发送回响应。

反应了几秒钟。

维护停机时间

目前,不需要或零维护停机时间。

需要数小时的维护停机时间。

数据量

当前应用程序的数据从 PB 增加到 TB。

数据以 GB 为单位。

您可以在上表中看到资源需求的差异。这些要求已经增加,因为我们现在期望在第二秒内立即得到响应。与此同时,交给计算机的任务的复杂性也增加了。这些任务不仅仅是数学意义上的纯粹计算,而且还要求从大量数据中提取响应。因此,现在我们必须通过设计多核 CPU 形式的单台计算机来关注此类系统的性能,并可能结合到多插槽服务器中。我们首先想到的是使系统响应。它是反应性特征中的第一个——反应性。我们将在本章中探讨更多内容,以及以下主题:

  • Why reactive pattern
  • Reactive pattern principles
  • Blocking calls
  • Non-blocking calls
  • Back-pressure
  • Implementing reactive pattern using the Spring Framework
  • Spring Web reactive module
  • Implementing reactive at server side
  • Implementing reactive at client side
  • Request and response body type conversion

本章将教您如何使系统在面对任何可变负载、部分中断、程序故障等时做出响应。如今,系统分布在不同的节点上以有效地服务请求。

让我们详细看一下上述主题。

Understanding the reactive pattern


如今,现代应用程序必须更多健壮、更有弹性、更灵活,并且能够更好地满足组织的要求,因为在最近几年,对应用程序的要求发生了巨大变化。正如我们在上表中看到的那样,10 到 15 年前,一个大型应用程序有 10 个服务器节点,服务请求所需的响应时间以秒为单位,我们需要几个小时的停机时间来维护和部署,而数据以千兆字节为单位。但是今天,一个应用程序需要数千个服务器节点,因为它可以通过多个渠道(例如移动设备)访问。服务器响应预计在毫秒内,部署和维护的停机时间接近 0%。数据已从 TB 增加到 PB。

十年前的系统无法满足当今应用的要求;我们需要一个能够在应用程序级别或系统级别满足所有用户需求的系统,这意味着我们需要一个响应式系统。响应性是响应式模式的属性之一。我们需要一个必须具有响应性、弹性、弹性和消息驱动的系统。我们将这些系统称为反应系统。这些系统更灵活、松耦合且可扩展。

系统必须对故障做出反应并保持可用,也就是说,它应该是弹性的,并且系统必须对可变负载条件做出反应,而不是过载。系统应该对事件做出反应——事件驱动或消息驱动。如果所有这些属性都与一个系统相关联,那么它就是响应式的,也就是说,如果一个系统对其用户做出反应,它就是响应式的。要创建反应式系统,我们必须关注系统级别和应用程序级别。让我们首先看看所有反应性特征。

The reactive pattern traits

以下是响应式模式的原理

  • Responsive: This is the goal of each application today.
  • Resilient: This is required to make an application responsive.
  • Scalable: This is also required to make an application responsive; without resilience and scalability, it is impossible to achieve responsiveness.
  • Message-driven: A message-driven architecture is the base of a scalable and resilient application, and ultimately, it makes a system responsive. Message-driven either based on the event-driven or actor-based programming model.

前面提到的几点是响应式模式的核心原则。让我们详细探讨响应式模式的每个原理,并了解为什么必须将所有这些原理一起应用才能为现代上下文应用程序构建一个具有高质量软件的响应式系统,该系统能够在毫秒内处理数百万个并行请求,而无需任何失败。我们先通过下图来了解这些原理:

读书笔记《building-microservices-with-spring》实现反应式设计模式

正如您在上图中所见,要使系统具有响应性,我们需要可扩展性和弹性。为了使系统具有可扩展性和弹性,我们需要应用程序的事件驱动或消息驱动架构。最终,这些原则、可扩展性、弹性和事件驱动架构使系统能够响应客户端。让我们详细了解这些属性。

Responsiveness

当我们说一个系统或一个应用程序是响应式的,它意味着该应用程序或系统快速响应所有用户在一个在所有条件下都给定时间,无论是好还是坏。它确保了始终如一的积极用户体验。

系统的可用性和实用性需要响应能力。响应式系统意味着当系统发生故障时,无论是由于外部系统还是流量高峰,故障都会被快速检测到,并在短时间内有效处理,而用户不知道故障。最终用户必须能够通过提供快速且一致的响应时间与系统进行交互。用户在与系统交互期间不得遇到任何故障,并且必须为用户提供一致的服务质量。这种一致的行为解决了故障并建立了最终用户对系统的信心。各种条件下的快速和积极的用户体验使系统响应迅速。它取决于反应式应用程序或系统的另外两个特征,即弹性和可扩展性。另一个特征,即事件驱动或消息驱动架构,为响应式系统提供了整体基础。下图说明了一个响应式系统:

读书笔记《building-microservices-with-spring》实现反应式设计模式

如上图所示,响应式系统依赖于系统的弹性和可扩展性,而这些依赖于其事件驱动架构。让我们看看反应式应用程序的其他特征。

Resilience

当我们设计和开发一个系统时,我们考虑了所有条件——好的和坏的。如果我们只考虑好的条件,那么我们倾向于实施一个几天后可能会失败的系统。重大应用程序故障会导致停机和数据丢失,并损害您的应用程序在市场上的声誉。

因此,我们必须关注每一个条件,以确保 应用程序 在所有条件下的响应能力。这样的系统或应用程序被称为弹性系统。

每个系统都必须具有弹性以确保响应能力。如果一个系统没有弹性,它在发生故障后将无响应。因此,系统也必须在面对故障时做出响应。在整个系统中,应用程序或系统的任何组件都可能存在故障。因此,系统中的每个组件必须相互隔离,以便在某个组件发生故障时,我们可以在不损害整个系统的情况下恢复它。单个组件的恢复是通过复制来实现的。如果一个系统是有弹性的,那么它必须具有复制、遏制、隔离和委托。请看下图,它说明了反应式应用程序或系统的弹性特征:

读书笔记《building-microservices-with-spring》实现反应式设计模式

正如您在上图中所见,弹性是通过复制、包含、隔离和委派来实现的。让我们详细讨论这些要点:

  • Replication: This ensures high-availability, where necessary, at the time of component failure.
  • Isolation: This means that the failure of each component must be isolated, which is achieved by decoupling the components as much as possible. Isolation is needed for a system to self-heal. If your system has isolation in place, then you can easily measure the performance of each component, and check the memory and CPU usage. Moreover, the failure of one component won't impact the responsiveness of the overall system or application.
  • Containment: The result of decoupling is containment of the failure. It helps avoid failure in the system as a whole.
  • Delegation: After failure, the recovery of each component is delegated to another component. It is possible only when our system is composable.

现代应用程序不仅依赖于内部基础设施,而且还通过网络协议与其他 Web 服务集成。因此,我们的应用程序必须在其核心具有弹性,以便在相反条件下的各种现实世界中保持响应。我们的应用程序不仅必须在应用程序级别而且在系统级别具有弹性。

让我们看看反应式模式的另一个原理。

Scalable

弹性和可扩展性共同使系统始终响应。可扩展系统或弹性系统可以在变化的工作负载下轻松升级。通过增加和减少分配用于服务这些输入的资源,可以使反应系统按需扩展。它通过为应用程序的可扩展性提供相关的实时性能来支持多种扩展算法。我们可以通过使用具有成本效益的软件和廉价的商品硬件(例如,云)来实现可扩展性。

application 如果可以根据其使用情况进行扩展,则可以通过以下方式扩展:

  • scale-up: It makes use of parallelism in multi-core systems.
  • scale-out: It makes use of multi-server nodes. Location transparency and resilience are important for this.

最小化共享可变状态对于可扩展性非常重要。

Note

弹性和可扩展性都是一样的!可伸缩性是关于有效利用现有资源,而弹性是关于在系统需求发生变化时按需向应用程序添加新资源。因此,最终,系统无论如何都可以做出响应——通过使用系统的现有资源或通过向系统添加新资源。

让我们看看反应式模式的弹性和可扩展性的最终基础,即消息驱动架构。

Message-driven architecture

消息驱动架构是响应式应用程序的基础。消息驱动的应用程序 可以是事件驱动和基于参与者的应用程序。它也可以是两种架构的组合——事件驱动和基于参与者的架构。

在事件驱动架构中,事件和事件观察者扮演着主要角色。事件发生,但未定向到特定地址;事件监听器监听这些事件,并采取行动。但是在消息驱动的体系结构中,消息有一个正确的方向到达目的地。让我们看一下下图,它说明了消息驱动和事件驱动的架构:

读书笔记《building-microservices-with-spring》实现反应式设计模式

如上图所示,在事件驱动架构中,如果事件发生,那么侦听器会监听它。但是在消息驱动的通信中,一个生成的消息通信有一个可寻址的接收者和一个单一的目的。

异步消息驱动架构通过在组件之间建立限制来充当反应式系统的基础。它确保松散耦合、隔离和位置透明。组件之间的隔离完全依赖于它们之间的松耦合。隔离和松散耦合发展了弹性和弹性的基础。

一个大系统有多个组件。这些组件要么具有较小的应用程序,要么它们可能具有反应特性。这意味着响应式设计原则必须适用于规模的所有级别,以使大型系统可组合。

传统上,大型系统由多个线程组成,这些线程以共享的同步状态进行通信。它往往具有强耦合,难以组合,也容易阻塞舞台。但是,目前,所有大型系统都由松散耦合的事件处理程序组成。并且事件可以异步处理而不会阻塞。

让我们看看阻塞和非阻塞编程模型。

简单来说,反应式编程就是关于异步和事件驱动的非阻塞应用程序,并且需要少量线程来垂直而不是水平扩展。

Blocking calls


在一个系统中,一个调用可能持有 resources 而其他调用等待相同的资源。这些资源在其他资源完成使用时被释放。

让我们来看技术术语——实际上,阻塞调用是指应用程序或系统中的一些操作需要较长时间才能完成,例如文件 I/O 操作和使用阻塞驱动器访问数据库。下图是系统中 JDBC 操作的阻塞调用图:

读书笔记《building-microservices-with-spring》实现反应式设计模式

正如您在上图中所见,此处以红色显示的阻塞操作是用户调用 servlet 以获取数据的操作,然后移动到与 DB 服务器的 JDBC 和 DB 连接。在此之前,当前线程等待来自 DB 服务器的结果集。如果数据库服务器有延迟,那么这个等待时间可能会增加。这意味着线程执行取决于数据库服务器延迟。

让我们看看如何使它成为非阻塞执行。

Non-blocking calls


程序的非阻塞执行意味着线程竞争资源而不等待它。资源的非阻塞 API 允许调用 resources 而无需等待数据库访问和网络调用等阻塞调用。如果在调用时资源不可用,那么它会转移到其他工作而不是等待被阻塞的资源。当被阻止的资源可用时,系统会收到通知。

看一下下图,它显示了在没有阻塞线程执行的情况下访问数据的 JDBC 连接:

读书笔记《building-microservices-with-spring》实现反应式设计模式

如上图所示,线程执行不等待来自数据库服务器的结果集。线程为数据库服务器建立数据库连接和 SQL 语句。如果数据库服务器在响应中有延迟,那么线程会继续执行其他工作,而不是被阻塞等待资源变得可用。

Back-pressure


在过载情况下,反应式应用程序永远不会放弃。背压是反应式应用程序的一个关键方面。这是一种确保反应式应用程序不会压倒消费者的机制。它测试反应式应用程序的各个方面。它在任何负载下优雅地测试系统响应。

背压机制确保系统在负载下具有弹性。在背压条件下,系统通过应用其他 resources 来帮助分配负载,从而使其自身具有可扩展性。

到目前为止,我们已经看到了响应式模式的原理;这些是使系统在蓝天或灰色天空中响应所必需的。在接下来的部分中,让我们看看 Spring 5 如何实现响应式编程。

Implementing reactive with the Spring 5 Framework


Spring Framework 的最新 version 最突出的特性是新的响应式堆栈 Web 框架。反应式是带我们走向未来的更新。这一技术领域日新月异,这也是 Spring Framework 5.0 推出具有响应式编程能力的原因。此添加使最新版本的 Spring Framework 便于事件循环样式处理,从而可以使用少量线程进行扩展。

Spring 5 框架通过在内部使用反应器来实现反应式编程模式,以获得自己的反应式支持。 reactor 是一个 Reactive Stream 实现,它扩展了基本的 Reactive Streams。 Twitter 已通过使用 Reactive Streams 实现为响应式传递。

Reactive Streams

Reactive Streams 提供了一个 protocol 或规则,用于具有非阻塞背压的异步流处理。 Java 9 也以 java.util.concurrent.Flow 的形式采用了该标准。 Reactive Streams 由四个简单的 Java 接口组成。这些接口是 PublisherSubscriberSubscription处理器 。但是 Reactive Streams 的主要目标是处理背压。如前所述,背压是一个允许接收器询问来自发射器的数据量的过程。

您可以使用以下 Maven 依赖项在应用程序开发中添加 Reactive Streams:

    <dependency> 
      <groupId>org.reactivestreams</groupId> 
      <artifactId>reactive-streams</artifactId> 
      <version>1.0.1</version> 
   </dependency> 
   <dependency> 
      <groupId>org.reactivestreams</groupId> 
      <artifactId>reactive-streams-tck</artifactId> 
      <version>1.0.1</version> 
   </dependency> 

前面的 Maven 依赖代码为您的应用程序中的 Reactive Streams 添加了所需的库。在接下来的部分中,我们将看到 Spring 如何在 Spring 的 web 模块和 Spring MVC 框架中实现 Reactive Streams。

Spring Web reactive module


从 Spring 5.0 Framework 开始,Spring 引入了一个新的响应式编程模块——spring-web-reactive 模块。它基于反应式流。基本上,这个模块使用带有响应式编程的 Spring MVC 模块,因此,您仍然可以将 Spring MVC 模块单独或与 spring-web-reactive 模块一起用于您的 Web 应用程序。

Spring 5.0 框架中的这个新模块包含对基于响应式 Web 功能的编程模型的支持。它还支持基于注解的编程模型。 Spring-web-reactive 模块包含对响应式 HTTP 和 WebSocket 客户端调用响应式服务器应用程序的支持。它还使响应式 Web 客户端能够通过响应式 HTTP 连接与响应式 Web 应用程序建立连接。

下图显示了一个 Spring-web-reactive 模块及其组件,这些组件为 Spring Web 应用程序提供响应式行为:

读书笔记《building-microservices-with-spring》实现反应式设计模式

如上图所示,有两个并行模块——一个用于传统的 Spring MVC 框架,另一个用于 Spring-reactive Web 模块。图中左侧是 Spring-MVC 相关组件,例如 @MVC 控制器、spring-web-mvc 模块Servlet API 模块,以及 Servlet Container。 在右侧图中是 spring-web-reactive 相关的组件,如 Router Functions、spring-web-reactive 模块、HTTP/Reactive Streams、Reactive 版本的 Tomcat 等。 Spring-web-reactive 相关组件,例如 Router Functions , spring-web-reactive 模块, HTTP/Reactive Streams , 反应版的Tomcat,等等。

在上图中,您必须关注模块的放置。同一级别的每个模块都有传统 Spring MVC 和 Spring-web-reactive 模块之间的比较。这些比较如下:

  • In the Spring web reactive modules, the Router functions are similar to the @MVC controllers in the Spring MVC modules such as the @Controller, @RestController, and @RequestMapping annotations.
  • The Spring-web-reactive module is parallel to the Spring-web-MVC modules.
  • In the traditional Spring MVC Framework, we use the Servlet API for the HttpServletRequest and HttpServletResponse in the servlet container. But in the Spring-web-reactive framework, we use HTTP/Reactive Streams, which creates HttpServerRequest and HttpServerResponse under the reactive support of the tomcat server.
  • We can user Servlet Container for the traditional Spring MVC Framework, but a reactive-supported server is required for the Spring-web-reactive application. Spring provides support for Tomcat, Jetty, Netty, and Undertow.

现在让我们看看如何使用 Spring Web 反应模块来实现反应式 Web 应用程序。

Implementing a reactive web application at the server side

Spring 响应式 Web 模块 支持 两种编程模型——基于注解或基于函数的编程模型。让我们看看这些模型在服务器端是如何工作的:

  • Annotations-based programming model: It is based on MVC annotations such as @Controller, @RestController, @RequestMapping, and many more. Annotations are supported by the Spring MVC framework for server-side programming for a web application.
  • Functional programming model: It is a new paradigm of programming supported by the Spring 5 Framework. It is based on the Java 8 Lambda style routing and handling. Scala also provides the functional programming paradigm.

以下是我们必须为基于 Spring Boot 的响应式 Web 应用程序添加的 Maven 依赖项:

    <parent> 
       <groupId>org.springframework.boot</groupId> 
       <artifactId>spring-boot-starter-parent</artifactId> 
       <version>2.0.0.M3</version> 
       <relativePath/> <!-- lookup parent from repository --> 
    </parent> 
 
    <properties> 
       <project.build.sourceEncoding>UTF-
        8</project.build.sourceEncoding> 
       <project.reporting.outputEncoding>UTF
        -8</project.reporting.outputEncoding> 
       <java.version>1.8</java.version> 
    </properties> 
 
    <dependencies> 
       <dependency> 
          <groupId>org.springframework.boot</groupId> 
          <artifactId>spring-boot-starter-webflux</artifactId> 
       </dependency> 
 
       <dependency> 
          <groupId>org.springframework.boot</groupId> 
          <artifactId>spring-boot-starter-test</artifactId> 
          <scope>test</scope> 
       </dependency> 
       <dependency> 
          <groupId>io.projectreactor</groupId> 
          <artifactId>reactor-test</artifactId> 
          <scope>test</scope> 
       </dependency> 
    </dependencies> 

正如您在前面的 Maven 依赖项配置文件中看到的那样,我们添加了 spring-boot-starter-webfluxreactor-test 应用程序的依赖项。

让我们基于 Annotation-based 编程模型创建一个响应式 Web 应用程序。

The Annotation-based programming model

响应端支持 Spring MVC 的 @Controller@RestController 等注解。到目前为止,传统的 Spring MVC 和带有响应式模块的 Spring web 之间没有区别。实际区别在@Controller注解配置声明之后开始,也就是我们进入Spring MVC内部工作时,从HandlerMapping开始HandlerAdapter

传统 Spring MVC 和 Spring Web 响应式之间的主要区别在于请求处理机制。没有响应式的 Spring MVC 使用 Servlet API 的阻塞 HttpServletRequestHttpServletResponse 接口处理请求,但是 Spring web 响应式框架是非阻塞的,并且在响应式 ServerHttpRequestServerHttpResponse 而不是 HttpServletRequest< /code> 和 HttpServletResponse

让我们看看下面这个带有反应式控制器的例子:

    package com.packt.patterninspring.chapter11.
      reactivewebapp.controller; 
 
    import org.reactivestreams.Publisher; 
    import org.springframework.beans.factory.annotation.Autowired; 
    import org.springframework.web.bind.annotation.GetMapping; 
    import org.springframework.web.bind.annotation.PathVariable; 
    import org.springframework.web.bind.annotation.PostMapping; 
    import org.springframework.web.bind.annotation.RequestBody; 
    import org.springframework.web.bind.annotation.RestController; 
 
    import com.packt.patterninspring.chapter11.
      reactivewebapp.model.Account; 
    import  com.packt.patterninspring.chapter11.
      reactivewebapp.repository.AccountRepository; 
 
    import reactor.core.publisher.Flux; 
    import reactor.core.publisher.Mono; 
 
    @RestController 
    public class AccountController { 
    
      @Autowired 
      private AccountRepository repository; 
  
      @GetMapping(value = "/account") 
      public Flux<Account> findAll() { 
        return repository.findAll().map(a -> new 
          Account(a.getId(), a.getName(),
           a.getBalance(), a.getBranch())); 
      } 
  
      @GetMapping(value = "/account/{id}") 
      public Mono<Account> findById(@PathVariable("id") Long id) { 
        return repository.findById(id) 
         .map(a -> new Account(a.getId(), a.getName(), a.getBalance(),
            a.getBranch())); 
      } 
  
      @PostMapping("/account") 
      public Mono<Account> create(@RequestBody 
        Publisher<Account> accountStream) { 
        return repository 
          .save(Mono.from(accountStream) 
          .map(a -> new Account(a.getId(), a.getName(), a.getBalance(),
             a.getBranch()))) 
          .map(a -> new Account(a.getId(), a.getName(), a.getBalance(),
             a.getBranch())); 
      } 
    } 
 

在前面AccountController.java的Controller代码中可以看到,我使用了相同的Spring MVC注解,例如@RestController声明一个控制器类,@GetMapping@PostMapping用于为GETPOST 分别请求方法。

让我们关注处理方法的 return 类型。这些方法 return 值作为 MonoFlux 类型。这些是反应器框架提供的反应蒸汽类型。此外,处理程序方法使用 Publisher 类型获取请求正文。

Reactor 是来自 Pivotal 开源团队的 Java 框架。它直接构建在 Reactive Streams 之上,因此不需要桥接器。 Reactor IO 项目为 Netty 和 Aeron 等低级网络运行时提供了包装器。 Reactor 是根据 David Karnok 的 Generations of Reactive 分类的“第 4 代”库。

让我们看一下使用函数式编程模型处理请求的同一个控制器类。

The functional programming model

函数式编程模型使用具有 functional 接口的 API,例如 RouterFunctionHandlerFunction。它使用带有路由和请求处理的 Java 8 Lambda 样式编程,而不是 Spring MVC 注释。它们是用于创建 Web 应用程序的简单但功能强大的构建块。

以下是功能请求处理的示例:

    package com.packt.patterninspring.chapter11.web.reactive.function; 
 
    import static org.springframework.http.MediaType.APPLICATION_JSON; 
    import static org.springframework.web.reactive.
      function.BodyInserters.fromObject; 
 
    import org.springframework.web.reactive.
      function.server.ServerRequest; 
    import org.springframework.web.reactive.
      function.server.ServerResponse; 
 
    import com.packt.patterninspring.chapter11.
      web.reactive.model.Account; 
    import com.packt.patterninspring.chapter11.
      web.reactive.repository.AccountRepository; 
 
    import reactor.core.publisher.Flux; 
    import reactor.core.publisher.Mono; 
 
    public class AccountHandler { 
 
      private final AccountRepository repository; 
 
      public AccountHandler(AccountRepository repository) { 
         this.repository = repository; 
      } 
 
      public Mono<ServerResponse> findById(ServerRequest request) { 
        Long accountId = Long.valueOf(request.pathVariable("id")); 
        Mono<ServerResponse> notFound = 
          ServerResponse.notFound().build(); 
        Mono<Account> accountMono =
         this.repository.findById(accountId); 
        return accountMono 
         .flatMap(account ->    ServerResponse.ok().contentType
         (APPLICATION_JSON).body(
            fromObject(account))) 
         .switchIfEmpty(notFound); 
      }  
    
      public Mono<ServerResponse> findAll(ServerRequest request) { 
       Flux<Account> accounts = this.repository.findAll(); 
       return ServerResponse.ok().contentType
       (APPLICATION_JSON).body(accounts, 
         Account.class); 
      } 
    
      public Mono<ServerResponse> create(ServerRequest request) { 
        Mono<Account> account = request.bodyToMono(Account.class); 
        return  ServerResponse.ok().build(this.
        repository.save(account)); 
      } 
    } 

在上述代码中,类文件 AccountHandler.java 基于函数式反应式编程模型。在这里,我使用了 reactor 框架来处理请求。 ServerRequestServerResponse 两个功能接口用于处理请求和生成响应。

让我们看看这个应用程序的 Repositories 类。以下 AccountRepositoryAccountRepositoryImpl 类对于基于注解和基于函数的编程模型的应用程序类型都是相同的。

让我们创建一个接口 AccountRepository.java 类,如下所示:

    package com.packt.patterninspring.chapter11.
      reactivewebapp.repository; 
  
    import com.packt.patterninspring.chapter11.
      reactivewebapp.model.Account; 
 
    import reactor.core.publisher.Flux; 
    import reactor.core.publisher.Mono; 
 
    public interface AccountRepository { 
    
      Mono<Account> findById(Long id); 
     
      Flux<Account> findAll(); 
     
      Mono<Void> save(Mono<Account> account); 
    }

前面的代码是一个接口,我们用AccountRepositoryImpl.java类来实现这个接口,如下:

    package com.packt.patterninspring.chapter11.
      web.reactive.repository; 
 
    import java.util.Map; 
    import java.util.concurrent.ConcurrentHashMap; 
 
    import org.springframework.stereotype.Repository; 
 
    import com.packt.patterninspring.chapter11.web.
      reactive.model.Account; 
 
    import reactor.core.publisher.Flux; 
    import reactor.core.publisher.Mono; 
 
    @Repository 
    public class AccountRepositoryImpl implements AccountRepository { 
     
      private final Map<Long, Account> accountMap = new 
      ConcurrentHashMap<>();  
     
      public AccountRepositoryImpl() { 
        this.accountMap.put(1000l, new Account(1000l,
        "Dinesh Rajput", 50000l,
          "Sector-1")); 
        this.accountMap.put(2000l, new Account(2000l, 
        "Anamika Rajput", 60000l,
          "Sector-2")); 
        this.accountMap.put(3000l, new Account(3000l, 
        "Arnav Rajput", 70000l,
           "Sector-3")); 
        this.accountMap.put(4000l, new Account(4000l,
       "Adesh Rajput", 80000l,
           "Sector-4")); 
      } 
    
      @Override 
      public Mono<Account> findById(Long id) { 
        return Mono.justOrEmpty(this.accountMap.get(id)); 
      } 
 
      @Override 
      public Flux<Account> findAll() { 
        return Flux.fromIterable(this.accountMap.values()); 
      } 
 
      @Override 
      public Mono<Void> save(Mono<Account> account) { 
        return account.doOnNext(a -> { 
          accountMap.put(a.getId(), a); 
          System.out.format("Saved %s with id %d%n", a, a.getId()); 
        }).thenEmpty(Mono.empty()); 
         // return accountMono; 
      } 
    } 

如您在前面的代码中所见,我们创建了 AccountRepository 类。这个类只有三个方法:findById()findAll()save( ) 。我们根据业务需求实现了这些方法。在这个存储库类中,我特别使用了 Flux 和 Mono 反应类型,使其成为基于反应的应用程序。

让我们为基于功能的编程模型创建服务器。在基于注解的编程中,我们使用简单的 tomcat 容器来部署 Web 应用程序。但是对于这种基于函数的编程,我们要创建一个Server类来启动Tomcat服务器或者Reactor服务器,如下:

    package com.packt.patterninspring.chapter11.web.reactive.function; 
 
    //Imports here 
 
    public class Server { 
 
      public static final String HOST = "localhost"; 
  
      public static final int TOMCAT_PORT = 8080; 
      public static final int REACTOR_PORT = 8181; 
    
      //main method here, download code for GITHUB 
    
      public RouterFunction<ServerResponse> routingFunction() { 
         AccountRepository repository = new AccountRepositoryImpl(); 
         AccountHandler handler = new AccountHandler(repository); 
 
         return nest(path("/account"), nest(accept(APPLICATION_JSON), 
           route(GET("/{id}"), handler::findById) 
           .andRoute(method(HttpMethod.GET), handler::findAll) 
           ).andRoute(POST("/").and(contentType
           (APPLICATION_JSON)), handler::create)); 
      } 
 
      public void startReactorServer() throws InterruptedException { 
         RouterFunction<ServerResponse> route = routingFunction(); 
         HttpHandler httpHandler = toHttpHandler(route); 
 
         ReactorHttpHandlerAdapter adapter = new
            ReactorHttpHandlerAdapter(httpHandler); 
         HttpServer server = HttpServer.create(HOST, REACTOR_PORT); 
         server.newHandler(adapter).block(); 
      } 
 
      public void startTomcatServer() throws LifecycleException { 
         RouterFunction<?> route = routingFunction(); 
         HttpHandler httpHandler = toHttpHandler(route); 
 
         Tomcat tomcatServer = new Tomcat(); 
         tomcatServer.setHostname(HOST); 
         tomcatServer.setPort(TOMCAT_PORT); 
         Context rootContext = tomcatServer.addContext("",
           System.getProperty("java.io.tmpdir")); 
         ServletHttpHandlerAdapter servlet = new
           ServletHttpHandlerAdapter(httpHandler); 
         Tomcat.addServlet(rootContext, "httpHandlerServlet", servlet); 
         rootContext.addServletMapping("/", "httpHandlerServlet"); 
         tomcatServer.start(); 
      } 
    } 

正如您在前面的 Server.java 类文件中看到的,我已经添加了 Tomcat 和 Reactor 服务器。 Tomcat 服务器使用端口 8080,但 Reactor 服务器使用端口 8181

这个 Server.java 类有三个方法。第一个方法 routingFunction() 负责使用 AccountHandler 类处理客户端请求。它取决于 AccountRepository 类。第二种方法,startReactorServer(),负责使用reactor服务器的ReactorHttpHandlerAdapter类来启动Reactor服务器。此类将 HttpHandler 类的对象作为构造函数参数来创建请求处理程序映射。同样,第三个方法startTomcatServer()负责启动Tomcat服务器。并且通过一个reactor适配器类绑定到HttpHandler对象, ServletHttpHandlerAdapter

您可以将此服务器类文件作为 Java 应用程序运行,并通过键入 URL http://localhost:8080/account/ 在浏览器上查看输出:

读书笔记《building-microservices-with-spring》实现反应式设计模式

您还可以为 Reactor 服务器键入带有端口 8181 的相同 URL,如下所示,您将获得相同的输出:

http://localhost:8181/account/

在本节中,您学习了如何使用 Spring-web-reactive 模块创建响应式 Web 应用程序。我们使用两种编程范式创建了 Web 应用程序:基于注释的和基于函数的。

在下一节中,我们将讨论客户端代码,以及客户端如何访问响应式 Web 应用程序。

Implementing a Reactive Client-Side application

Spring 5 框架引入了 functional 和响应式 WebClient。它是一个完全非阻塞和反应式的 Web 客户端,是 RestTemplate 的替代品。它以响应式 ClientHttpRequestClientHttpRespones 的形式创建网络输入和输出。它以 Flux<DataBuffer> 的形式创建请求和响应的主体,而不是 InputStream输出流

让我们看看 Web 客户端的代码,它创建了一个 Client.java 类:

    package com.packt.patterninspring.chapter11.web.reactive.function; 
 
    //Imports here 
 
    public class Client { 
 
      private ExchangeFunction exchange = ExchangeFunctions.create(new
        ReactorClientHttpConnector()); 
 
      public void findAllAccounts() { 
        URI uri = URI.create(String.format("http://%s:%d/account",
        Server.HOST,
          Server.TOMCAT_PORT)); 
        ClientRequest request = ClientRequest.method(HttpMethod.GET,  
        uri).build(); 
 
        Flux<Account> account = exchange.exchange(request) 
        .flatMapMany(response -> response.bodyToFlux(Account.class)); 
 
         Mono<List<Account>> accountList = account.collectList(); 
         System.out.println(accountList.block()); 
      } 
 
      public void createAccount() { 
        URI uri = URI.create(String.format("http://%s:%d/account",
        Server.HOST,
           Server.TOMCAT_PORT)); 
        Account jack = new Account(5000l, "Arnav Rajput", 500000l,
        "Sector-5"); 
 
        ClientRequest request = ClientRequest.method(HttpMethod.POST,
        uri) 
        .body(BodyInserters.fromObject(jack)).build(); 
 
        Mono<ClientResponse> response = exchange.exchange(request); 
 
        System.out.println(response.block().statusCode()); 
      } 
    }   

前面的类 Client.javaServer.java 的 Web 客户端类。它有两种方法。第一种方法是findAllAccounts()。它从帐户存储库中获取所有帐户。它使用 org.springframework.web.reactive.function.clientClientRequest 接口使用 http://localhost:8080/account/ URI 的请求"literal">GET http 方法。通过使用 org.springframework.web.reactive.function.clientExchangeFunction 接口,它调用服务器,并以 JSON 格式获取结果。类似地,另一个方法 createAccount() 通过使用带有 POST 方法的 URI 在服务器中创建一个新帐户 <代码类="literal">http://localhost:8080/account/。

让我们将 Client 类作为 Java 应用程序运行,然后在控制台上查看输出,如下所示:

读书笔记《building-microservices-with-spring》实现反应式设计模式

它创建一条新记录并以 JSON 列表的形式获取所有五条记录。

Note

AsyncRestTemplate 也支持非阻塞交互。主要区别在于它不支持非阻塞流,例如 Twitter 之一,因为从根本上说,它仍然基于并依赖于 InputStream输出流

在下一节中,我们将讨论响应式 Web 应用程序中的请求和响应正文参数。

Request and response body conversion


对于响应式 Web 应用程序,需要进行转换。 spring 核心模块提供反应式编码器和解码器,以实现字节流的序列化与类型化对象之间的序列化。

让我们看看以下请求 body 类型转换的示例。开发人员不需要强制进行类型转换——Spring Framework 会自动为您转换两种类型的方法:基于注释的编程和基于函数的编程。

  • Account account: This means that the account object is deserialized before the controller is called without blocking.
  • Mono<Account> account: This means that AccountController can use the Mono to declare logic. The account object is first deserialized, and then this logic is executed.
  • Flux<Account> accounts: This means that AccountController can use Flux in case of the input streaming scenario.
  • Single<Account> account: This is very similar to the Mono, but here the Controller uses RxJava.
  • Observable<Account> accounts: This is also very similar to Flux, but in this case, the Controller uses input streaming with RxJava.

在前面的列表中,您在反应式编程模型中看到了 conversion 类型的 Spring 框架。让我们在响应正文的示例中查看以下返回类型:

  • Account: This serializes without blocking the given Account; implies a synchronous, non-blocking controller method.
  • void: This is specific to the annotation-based programming model. Request handling completes when the method returns; this implies a synchronous, non-blocking controller method.
  • Mono<Account>: This serializes without blocking the given Account when the Mono completes.
  • Mono<Void>: This implies that request handling completes when the Mono completes.
  • Flux<Account>: This is used in the streaming scenario, possibly, the SSE depends on the requested content type.
  • Flux<ServerSentEvent>: This enables SSE streaming.
  • Single<Account>: The same, but uses RxJava.
  • Observable<Account>: The same, but uses the RxJava Observable type.
  • Flowable<Account>: The same, but uses the RxJava 2 Flowable type.

在前面的列表中,您已经看到了处理程序方法的返回类型。 Spring 框架在响应式编程模型中进行类型转换。

Summary


在本章中,您了解了响应式模式及其原理。它不是编程的新创新——它是一个非常古老的概念,但它非常适合现代应用程序的需求。

反应式编程有四个原则:响应性、弹性、弹性和消息驱动的架构。响应性意味着系统必须在所有条件下都响应:奇数条件和偶数条件。

Spring 5 框架通过使用 Reactor 框架和响应式流为响应式编程模型提供支持。 Spring 引入了新的响应式 Web 模块,即 spring-web-reactive。它通过使用 Spring MVC 的注解(例如 @Controller@RestController@RequestMapping,或者通过使用 Java 8 Lambda 表达式的函数式编程方法。

在本章中,我们使用 Spring Web 反应模块创建了一个 Web 应用程序。此应用程序的代码可在 GitHub 上找到。在下一章中,您将了解并发模式的实现。