vlambda博客
学习文章列表

读书笔记《building-restful-web-services-with-spring-5-second-edition》春季的熔剂和单通道(反应堆支架)

Chapter 3. Flux and Mono (Reactor Support) in Spring

在本章中,我们将引导读者了解更多在 Spring 5 中支持 Reactor 的实用方法,包括 Flux 和 Mono。用户将获得 Flux 和 Mono 的实践经验,结果是简单的 JSON。

我们将在本章中介绍以下主题:

  • Reactive programming and benefits
  • Reactive Core and Streams
  • Flux and Mono in Spring REST
  • User classes with Reactive—REST

Benefits of Reactive programming


假设我们的应用程序中发生了 百万 用户事务。明年,它将增加到 1000 万,因此我们需要扩大规模。这样做的传统方法是添加足够的服务器(水平扩展)。

如果我们可以选择使用相同的服务器进行扩展,而不是进行水平扩展,该怎么办?是的,反应式编程将帮助我们做到这一点。反应式编程是关于同步和事件驱动的非阻塞应用程序,它不需要大量线程来垂直(在 JVM 内)而不是水平(通过集群)扩展。

响应式类型并非旨在更快地处理请求。但是,他们更关注请求并发,尤其是有效地从远程服务器请求数据。借助 Reactive 类型的支持,您将获得更高质量的服务。与在等待结果时阻塞当前线程的传统处理相比,Reactive API 仅请求可以消耗的数据量。反应式 API 处理数据流,而不仅仅是一一处理单个元素。

总体而言,反应式编程是关于非阻塞、事件驱动的应用程序,可以使用少量线程进行扩展,并以背压作为主要组件,以确保生产者(发射者)不会压倒消费者(接收者)。

Reactive Core and Streams

Java 8 引入了 Reactive Core,它实现了 Reactive 编程模型,并建立在 Reactive Streams 规范之上,这是一个构建 Reactive 应用程序的标准。由于 lambda 语法为事件驱动方法提供了更大的灵活性,Java 8 提供了支持 Reactive 的最佳方法。此外,Java 的 lambda 语法使我们能够创建和生成小型且独立的异步任务。 Reactive Streams 的主要目标之一是解决背压问题。我们将在本章后面的部分详细讨论背压。

Java 8 Streams 和 Reactive Streams 之间的主要区别在于 Reactive 是一种推送模型,而 Java 8 Streams 侧重于拉取。在 Reactive Streams 中,根据消费者的需求和数量,所有的事件都会被推送给消费者。

响应式编程模型支持是自上次发布以来 Spring 5 的最佳特性。此外,在 Akka 和 Play 框架的支持下,Java 8 为响应式应用程序提供了更好的平台。

Reactor 建立在 Reactive Streams 规范之上。 Reactive Streams 是四个 Java 接口的捆绑包:

  • Publisher
  • Subscriber
  • Subscription
  • Processor

Publisher 将向在 Publisher 注册的订阅者发布数据项流。 Publisher 使用执行器将项目发布到 Subscriber。此外,Publisher 确保每个订阅的 Subscriber 方法调用是严格排序的。

Subscriber 仅在请求时才消费项目。您可以随时使用 Subscription 取消接收过程。

Subscription 充当 PublisherSubscriber 之间的消息中介。

Processor 表示一个处理阶段,可以包括 Subscriber 和一个 Publisher . Processor 也可以发起背压和取消订阅。

Note

Reactive Streams 是异步流处理的规范,这意味着所有事件都可以异步生成和消费。

Back pressures and Reactive Streams

背压是一种机制,它授权接收器定义它想要从发射器(数据提供者)获得多少数据。 Reactive Streams 的主要目标是处理背压。它允许:

  • The control to go to the receiver, to get data after it is ready to be processed
  • Defining and controlling the amount of data to be received
  • Efficient handling of the slow emitter / fast receiver or fast emitter / slow receiver scenarios

WebFlux

截至 2017 年 9 月,Spring 宣布 5 全面上市。Spring 5 引入了一个名为 Spring WebFlux 的响应式 Web 框架。它是一个使用 Reactor 来支持 Reactive Streams API 的非阻塞 Web 框架。

传统上,阻塞线程消耗资源,非阻塞异步编程有必要发挥更好的作用。 Spring 技术团队引入了一种非阻塞异步编程模型来处理大量并发请求,特别是对于延迟敏感的工作负载。这个概念将主要用于移动应用程序和微服务。此外,此 WebFlux 将是具有许多客户端和不均匀工作负载的场景的最佳解决方案。

Basic REST API

要了解 Reactive 组件(例如 Flux 和 Mono)的实际部分,我们必须创建自己的 REST API 并开始实现我们 API 中的 Flux 和 Mono 类。在本章中,我们将构建一个返回 Aloha 的简单 REST Web 服务。在进入实现部分之前,我们将关注创建 RESTful Web 服务所涉及的组件。

在本节中,我们将介绍以下主题:

  • Flux and Mono—introduction of Spring 5: Functional Web Framework components
  • Flux and Mono—in the REST API

Flux

Flux 是 Reactor 中的主要类型之一。 Flux 相当于 RxJava Observable,能够发射零个或多个项目,然后可选地完成或失败。

Flux 是实现 Publisher 接口的 Reactive 类型之一Reactive Streams 宣言。 Flux 的主要作用是处理数据流。 Flux 主要表示 N 个元素的流。

Note

Flux 是一个发布者,一个特定 Plain Old Java Object (POJO ) 类型。

Mono

Mono 是另一种类型的 Reactor 最多只能发出一个项目。只想发出完成信号的异步任务可以使用 Mono。 Mono 主要处理一个元素的流,而不是 Flux 的 N 元素。

Flux 和 Mono 在使用某些操作时通过强制转换为相关类型来利用这种语义。例如,将两个 Mono 连接在一起会产生一个 Flux;另一方面,在 Flux<T> 上调用 single() 将返回一个 Mono <T>

Flux 和 Mono 都是 Reactive Streams (RS) 发布者< span>implementations 并符合 Reactive-pull back pressure。

Note

Mono 用于特定场景,例如仅产生一个响应的 HTTP 请求。在这种情况下,使用 Mono 将是正确的选择。像前面提到的场景一样,为 HTTP 请求返回 Mono 比返回 Flux 更好,因为它仅提供与零项或一项上下文相关的运算符。 Mono 可以用来表示只有完成概念的无值异步进程。

User class with Reactive – REST


在第一章中,我们介绍了 TicketUser,这两个与我们的 Web 服务相关的类。由于 Ticket 类比 User 类稍微复杂一些,我们将使用 User 类来理解 Reactive 组件。

由于 Spring 5 中的 Reactive 还没有完全稳定,我们将仅在几章中讨论 Reactive。因此,我们将为基于 Reactive 的 REST API 创建一个单独的包。此外,我们将在现有的 pom.xml 文件中添加基于响应式的依赖项。

首先,我们必须添加所有 Reactive 依赖项。在这里,我们将在现有的 pom.xml 文件中添加代码:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.packtpub.restapp</groupId>
  <artifactId>ticket-management</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>
  <name>ticket-management</name>
  <description>Demo project for Spring Boot</description>  
<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<dependencyManagement>
   <dependencies>
    <dependency>
      <groupId>io.projectreactor</groupId>
      <artifactId>reactor-bom</artifactId>
      <version>Bismuth-RELEASE</version>
            <type>pom</type>
            <scope>import</scope>
    </dependency>
        </dependencies>
  </dependencyManagement>
  <dependencies>
      <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-web</artifactId>
      <version>5.0.1.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
      <version>1.5.7.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-tomcat</artifactId>
      <version>1.5.7.RELEASE</version>
    </dependency>  
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>2.9.2</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-web</artifactId>
      <version>5.0.0.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-webmvc</artifactId>
      <version>5.0.1.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
      <version>1.5.7.RELEASE</version> 
    </dependency>     
    <dependency>
      <groupId>org.reactivestreams</groupId>
      <artifactId>reactive-streams</artifactId>
    </dependency>
    <dependency>
      <groupId>io.projectreactor</groupId>
      <artifactId>reactor-core</artifactId>
    </dependency>
    <dependency>
      <groupId>io.projectreactor.ipc</groupId>
      <artifactId>reactor-netty</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.tomcat.embed</groupId>
      <artifactId>tomcat-embed-core</artifactId>
      <version>8.5.4</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-context</artifactId>
      <version>5.0.0.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-webflux</artifactId>
      <version>5.0.0.RELEASE</version>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>
    </plugins>
  </build>
</project>

Note

对于 Reactive 相关的工作,您可以使用现有项目,也可以创建新项目以避免与 Non-Reactive (plain) REST API 冲突。可以使用https://start.spring.io获取基础项目,然后使用上述配置更新 Maven 文件。

在前面的 POM 配置中,我们在现有依赖项之上添加了 Reactor 依赖项(如下所述):

  • reactive-streams
  • reactor-core
  • reactor-netty
  • tomcat-embed-core
  • spring-webflux

这些是使用 Reactors 所需的库。

User 类组件如下:

  • userid
  • username
  • user_email
  • user_type (admin, general user, CSR)

在这里,我们有四个变量用于 User 类。为了更容易理解 Reactive 组件,我们只使用了两个变量(userid, username)。让我们创建一个只有 useridusername 的 POJO 类。

User POJO 类如下:

package com.packtpub.reactive;
public class User {
  private Integer userid;
  private String username;  
  public User(Integer userid, String username){
    this.userid = userid;
    this.username = username;
  }
  public Integer getUserid() {
    return userid;
  }
  public void setUserid(Integer userid) {
    this.userid = userid;
  }
  public String getUsername() {
    return username;
  }
  public void setUsername(String username) {
    this.username = username;
  } 
}

在前面的类中,我在实例化时使用了两个变量和一个构造函数来填充变量。此外,getter/setter 用于访问这些变量。

让我们为 User 类创建一个响应式存储库:

package com.packtpub.reactive;
import reactor.core.publisher.Flux;
public interface UserRepository {
  Flux<User> getAllUsers();
}

在前面的代码中,我们为 User 和只有一个方法的类,称为 getAllUsers。通过使用这种方法,我们应该能够检索到用户列表。现在不谈 Flux,后面会讲。

可以看到这个UserRepository是一个接口。为了使用这个存储库,我们需要有一个具体的类来实现这个接口。让我们为这个 Reactive 存储库创建一个具体的类:

package com.packtpub.reactive;
import java.util.HashMap;
import java.util.Map;
import reactor.core.publisher.Flux;
public class UserRepositorySample implements UserRepository {  
  // initiate Users
  private Map<Integer, User> users = null;  
  // fill dummy values for testing
  public UserRepositorySample() {
    // Java 9 Immutable map used
    users = Map.of(
      1, (new User(1, "David")),
      2, (new User(2, "John")),
      3, (new User(3, "Kevin"))
    ); 
  }
  // this method will return all users
  @Override
  public Flux<User> getAllUsers() {
    return Flux.fromIterable(this.users.values());
  }
}

Note

由于 Java 9 提供了不可变映射,我们可以在代码中使用不可变映射。然而,这些不可变对象仅适用于本章,因为我们不对现有条目进行任何更新。在下一章中,我们将使用常规地图,因为我们需要在 CRUD 操作中对其进行编辑。

目前,我们能够从具体类中获取用户列表。现在我们需要一个 web 处理程序来检索控制器中的用户。现在让我们创建一个处理程序:

package com.packtpub.reactive;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import static org.springframework.http.MediaType.APPLICATION_JSON;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class UserHandler {
  private final UserRepository userRepository;  
  public UserHandler(UserRepository userRepository){
    this.userRepository = userRepository;
  }  
  public Mono<ServerResponse> getAllUsers(ServerRequest request){
    Flux<User> users = this.userRepository.getAllUsers();
    return ServerResponse.ok().contentType(APPLICATION_JSON).body(users, User.class); 
  }
}

最后,我们必须创建一个服务器来保存 REST API。在以下代码中,我们的 Server 类将创建一个 REST API 来获取用户:

package com.packtpub.reactive;
import static org.springframework.http.MediaType.APPLICATION_JSON;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RequestPredicates.POST;
import static org.springframework.web.reactive.function.server.RequestPredicates.accept;
import static org.springframework.web.reactive.function.server.RequestPredicates.contentType;
import static org.springframework.web.reactive.function.server.RequestPredicates.method;
import static org.springframework.web.reactive.function.server.RequestPredicates.path;
import static org.springframework.web.reactive.function.server.RouterFunctions.nest;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import static org.springframework.web.reactive.function.server.RouterFunctions.toHttpHandler;
import java.io.IOException;
import org.springframework.http.HttpMethod;
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.ipc.netty.http.server.HttpServer;
public class Server {
  public static final String HOST = "localhost";
  public static final int PORT = 8081;
  public static void main(String[] args) throws InterruptedException, IOException{
    Server server = new Server(); 
    server.startReactorServer();
    System.out.println("Press ENTER to exit.");
    System.in.read();
  }  
  public void startReactorServer() throws InterruptedException {
    RouterFunction<ServerResponse> route = routingFunction();
    HttpHandler httpHandler = toHttpHandler(route);
    ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
    HttpServer server = HttpServer.create(HOST, PORT);
    server.newHandler(adapter).block();
  }
  public RouterFunction<ServerResponse> routingFunction() {
    UserRepository repository = new UserRepositorySample();
    UserHandler handler = new UserHandler(repository);
    return nest (
        path("/user"),
        nest(
          accept(APPLICATION_JSON),
          route(GET("/{id}"), handler::getAllUsers)
          .andRoute(method(HttpMethod.GET), handler::getAllUsers)
        ).andRoute(POST("/").and(contentType(APPLICATION_JSON)), handler::getAllUsers));
  }
}

我们将在接下来的章节中更多地讨论我们是如何做到这一点的。只需确保您能够理解代码正在运行,并且您可以通过访问 API 在浏览器上看到输出。

运行 Server.class 你会看到日志:

Press ENTER to exit.

现在您可以在浏览器/SoapUI/Postman 或任何其他客户端中访问 API:

http://localhost:8081/user/

由于我们为 Reactive 服务器使用了 8081 端口,因此我们只能访问 8081 而不是 8080:

[ 
  { 
    "userid": 100, 
    "username": "David" 
  },
  { 
    "userid": 101, 
    "username": "John" 
  },
  { 
    "userid": 102, 
    "username": "Kevin" 
  }, 
]

Summary


到目前为止,我们已经了解了如何设置 Maven 构建来支持我们的 Web 服务的基本实现。此外,我们了解了 Maven 如何在第三方库管理、Spring Boot 和一个基本的 Spring REST 项目中发挥作用。在接下来的章节中,我们将讨论更多关于 Spring REST 端点和 Reactor 支持的信息。