读书笔记《building-restful-web-services-with-spring-5-second-edition》春季的熔剂和单通道(反应堆支架)
假设我们的应用程序中发生了 百万 用户事务。明年,它将增加到 1000 万,因此我们需要扩大规模。这样做的传统方法是添加足够的服务器(水平扩展)。
如果我们可以选择使用相同的服务器进行扩展,而不是进行水平扩展,该怎么办?是的,反应式编程将帮助我们做到这一点。反应式编程是关于同步和事件驱动的非阻塞应用程序,它不需要大量线程来垂直(在 JVM 内)而不是水平(通过集群)扩展。
响应式类型并非旨在更快地处理请求。但是,他们更关注请求并发,尤其是有效地从远程服务器请求数据。借助 Reactive 类型的支持,您将获得更高质量的服务。与在等待结果时阻塞当前线程的传统处理相比,Reactive API 仅请求可以消耗的数据量。反应式 API 处理数据流,而不仅仅是一一处理单个元素。
总体而言,反应式编程是关于非阻塞、事件驱动的应用程序,可以使用少量线程进行扩展,并以背压作为主要组件,以确保生产者(发射者)不会压倒消费者(接收者)。
Java 8 引入了 Reactive Core,它实现了 Reactive 编程模型,并建立在 Reactive Streams 规范之上,这是一个构建 Reactive 应用程序的标准。由于 lambda 语法为事件驱动方法提供了更大的灵活性,Java 8 提供了支持 Reactive 的最佳方法。此外,Java 的 lambda 语法使我们能够创建和生成小型且独立的异步任务。 Reactive Streams 的主要目标之一是解决背压问题。我们将在本章后面的部分详细讨论背压。
Java 8 Streams 和 Reactive Streams 之间的主要区别在于 Reactive 是一种推送模型,而 Java 8 Streams 侧重于拉取。在 Reactive Streams 中,根据消费者的需求和数量,所有的事件都会被推送给消费者。
响应式编程模型支持是自上次发布以来 Spring 5 的最佳特性。此外,在 Akka 和 Play 框架的支持下,Java 8 为响应式应用程序提供了更好的平台。
Reactor 建立在 Reactive Streams 规范之上。 Reactive Streams 是四个 Java 接口的捆绑包:
- Publisher
- Subscriber
- Subscription
- Processor
Publisher 将向在 Publisher 注册的订阅者发布数据项流。 Publisher 使用执行器将项目发布到 Subscriber。此外,Publisher 确保每个订阅的 Subscriber 方法调用是严格排序的。
Subscriber 仅在请求时才消费项目。您可以随时使用 Subscription 取消接收过程。
Subscription 充当 Publisher 和 Subscriber 之间的消息中介。
Processor 表示一个处理阶段,可以包括 Subscriber 和一个 Publisher . Processor 也可以发起背压和取消订阅。
背压是一种机制,它授权接收器定义它想要从发射器(数据提供者)获得多少数据。 Reactive Streams 的主要目标是处理背压。它允许:
- The control to go to the receiver, to get data after it is ready to be processed
- Defining and controlling the amount of data to be received
- Efficient handling of the slow emitter / fast receiver or fast emitter / slow receiver scenarios
截至 2017 年 9 月,Spring 宣布 5 全面上市。Spring 5 引入了一个名为 Spring WebFlux 的响应式 Web 框架。它是一个使用 Reactor 来支持 Reactive Streams API 的非阻塞 Web 框架。
传统上,阻塞线程消耗资源,非阻塞异步编程有必要发挥更好的作用。 Spring 技术团队引入了一种非阻塞异步编程模型来处理大量并发请求,特别是对于延迟敏感的工作负载。这个概念将主要用于移动应用程序和微服务。此外,此 WebFlux 将是具有许多客户端和不均匀工作负载的场景的最佳解决方案。
要了解 Reactive 组件(例如 Flux 和 Mono)的实际部分,我们必须创建自己的 REST API 并开始实现我们 API 中的 Flux 和 Mono 类。在本章中,我们将构建一个返回 Aloha 的简单 REST Web 服务。在进入实现部分之前,我们将关注创建 RESTful Web 服务所涉及的组件。
在本节中,我们将介绍以下主题:
- Flux and Mono—introduction of Spring 5: Functional Web Framework components
- Flux and Mono—in the REST API
Flux 是 Reactor 中的主要类型之一。 Flux 相当于 RxJava Observable,能够发射零个或多个项目,然后可选地完成或失败。
Flux 是实现 Publisher 接口的 Reactive 类型之一Reactive Streams 宣言。 Flux 的主要作用是处理数据流。 Flux 主要表示 N 个元素的流。
在第一章中,我们介绍了 Ticket 和 User,这两个与我们的 Web 服务相关的类。由于 Ticket 类比 User 类稍微复杂一些,我们将使用  User 类来理解 Reactive 组件。
由于 Spring 5 中的 Reactive 还没有完全稳定,我们将仅在几章中讨论 Reactive。因此,我们将为基于 Reactive 的 REST API 创建一个单独的包。此外,我们将在现有的 pom.xml 文件中添加基于响应式的依赖项。
首先,我们必须添加所有 Reactive 依赖项。在这里,我们将在现有的 pom.xml 文件中添加代码:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.packtpub.restapp</groupId>
  <artifactId>ticket-management</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>
  <name>ticket-management</name>
  <description>Demo project for Spring Boot</description>  
<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<dependencyManagement>
   <dependencies>
    <dependency>
      <groupId>io.projectreactor</groupId>
      <artifactId>reactor-bom</artifactId>
      <version>Bismuth-RELEASE</version>
            <type>pom</type>
            <scope>import</scope>
    </dependency>
        </dependencies>
  </dependencyManagement>
  <dependencies>
      <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-web</artifactId>
      <version>5.0.1.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
      <version>1.5.7.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-tomcat</artifactId>
      <version>1.5.7.RELEASE</version>
    </dependency>  
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>2.9.2</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-web</artifactId>
      <version>5.0.0.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-webmvc</artifactId>
      <version>5.0.1.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
      <version>1.5.7.RELEASE</version> 
    </dependency>     
    <dependency>
      <groupId>org.reactivestreams</groupId>
      <artifactId>reactive-streams</artifactId>
    </dependency>
    <dependency>
      <groupId>io.projectreactor</groupId>
      <artifactId>reactor-core</artifactId>
    </dependency>
    <dependency>
      <groupId>io.projectreactor.ipc</groupId>
      <artifactId>reactor-netty</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.tomcat.embed</groupId>
      <artifactId>tomcat-embed-core</artifactId>
      <version>8.5.4</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-context</artifactId>
      <version>5.0.0.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-webflux</artifactId>
      <version>5.0.0.RELEASE</version>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>
    </plugins>
  </build>
</project> 
      Note
对于 Reactive 相关的工作,您可以使用现有项目,也可以创建新项目以避免与 Non-Reactive (plain) REST API 冲突。可以使用https://start.spring.io获取基础项目,然后使用上述配置更新 Maven 文件。
在前面的 POM 配置中,我们在现有依赖项之上添加了 Reactor 依赖项(如下所述):
- reactive-streams
- reactor-core
- reactor-netty
- tomcat-embed-core
- spring-webflux
这些是使用 Reactors 所需的库。
User 类组件如下:
- userid
- username
- user_email
- user_type(admin, general user, CSR)
在这里,我们有四个变量用于 User 类。为了更容易理解 Reactive 组件,我们只使用了两个变量(userid, username)。让我们创建一个只有 userid 和 username 的 POJO 类。
User POJO 类如下:
package com.packtpub.reactive;
public class User {
  private Integer userid;
  private String username;  
  public User(Integer userid, String username){
    this.userid = userid;
    this.username = username;
  }
  public Integer getUserid() {
    return userid;
  }
  public void setUserid(Integer userid) {
    this.userid = userid;
  }
  public String getUsername() {
    return username;
  }
  public void setUsername(String username) {
    this.username = username;
  } 
} 
      在前面的类中,我在实例化时使用了两个变量和一个构造函数来填充变量。此外,getter/setter 用于访问这些变量。
让我们为 User 类创建一个响应式存储库:
package com.packtpub.reactive;
import reactor.core.publisher.Flux;
public interface UserRepository {
  Flux<User> getAllUsers();
} 
      在前面的代码中,我们为 User 和只有一个方法的类,称为 getAllUsers。通过使用这种方法,我们应该能够检索到用户列表。现在不谈 Flux,后面会讲。
可以看到这个UserRepository是一个接口。为了使用这个存储库,我们需要有一个具体的类来实现这个接口。让我们为这个 Reactive 存储库创建一个具体的类:
package com.packtpub.reactive;
import java.util.HashMap;
import java.util.Map;
import reactor.core.publisher.Flux;
public class UserRepositorySample implements UserRepository {  
  // initiate Users
  private Map<Integer, User> users = null;  
  // fill dummy values for testing
  public UserRepositorySample() {
    // Java 9 Immutable map used
    users = Map.of(
      1, (new User(1, "David")),
      2, (new User(2, "John")),
      3, (new User(3, "Kevin"))
    ); 
  }
  // this method will return all users
  @Override
  public Flux<User> getAllUsers() {
    return Flux.fromIterable(this.users.values());
  }
} 
      Note
由于 Java 9 提供了不可变映射,我们可以在代码中使用不可变映射。然而,这些不可变对象仅适用于本章,因为我们不对现有条目进行任何更新。在下一章中,我们将使用常规地图,因为我们需要在 CRUD 操作中对其进行编辑。
目前,我们能够从具体类中获取用户列表。现在我们需要一个 web 处理程序来检索控制器中的用户。现在让我们创建一个处理程序:
package com.packtpub.reactive;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import static org.springframework.http.MediaType.APPLICATION_JSON;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class UserHandler {
  private final UserRepository userRepository;  
  public UserHandler(UserRepository userRepository){
    this.userRepository = userRepository;
  }  
  public Mono<ServerResponse> getAllUsers(ServerRequest request){
    Flux<User> users = this.userRepository.getAllUsers();
    return ServerResponse.ok().contentType(APPLICATION_JSON).body(users, User.class); 
  }
} 
      最后,我们必须创建一个服务器来保存 REST API。在以下代码中,我们的 Server 类将创建一个 REST API 来获取用户:
package com.packtpub.reactive;
import static org.springframework.http.MediaType.APPLICATION_JSON;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RequestPredicates.POST;
import static org.springframework.web.reactive.function.server.RequestPredicates.accept;
import static org.springframework.web.reactive.function.server.RequestPredicates.contentType;
import static org.springframework.web.reactive.function.server.RequestPredicates.method;
import static org.springframework.web.reactive.function.server.RequestPredicates.path;
import static org.springframework.web.reactive.function.server.RouterFunctions.nest;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import static org.springframework.web.reactive.function.server.RouterFunctions.toHttpHandler;
import java.io.IOException;
import org.springframework.http.HttpMethod;
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.ipc.netty.http.server.HttpServer;
public class Server {
  public static final String HOST = "localhost";
  public static final int PORT = 8081;
  public static void main(String[] args) throws InterruptedException, IOException{
    Server server = new Server(); 
    server.startReactorServer();
    System.out.println("Press ENTER to exit.");
    System.in.read();
  }  
  public void startReactorServer() throws InterruptedException {
    RouterFunction<ServerResponse> route = routingFunction();
    HttpHandler httpHandler = toHttpHandler(route);
    ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
    HttpServer server = HttpServer.create(HOST, PORT);
    server.newHandler(adapter).block();
  }
  public RouterFunction<ServerResponse> routingFunction() {
    UserRepository repository = new UserRepositorySample();
    UserHandler handler = new UserHandler(repository);
    return nest (
        path("/user"),
        nest(
          accept(APPLICATION_JSON),
          route(GET("/{id}"), handler::getAllUsers)
          .andRoute(method(HttpMethod.GET), handler::getAllUsers)
        ).andRoute(POST("/").and(contentType(APPLICATION_JSON)), handler::getAllUsers));
  }
} 
      我们将在接下来的章节中更多地讨论我们是如何做到这一点的。只需确保您能够理解代码正在运行,并且您可以通过访问 API 在浏览器上看到输出。
运行 Server.class 你会看到日志:
Press ENTER to exit. 
      现在您可以在浏览器/SoapUI/Postman 或任何其他客户端中访问 API:
http://localhost:8081/user/
由于我们为 Reactive 服务器使用了 8081 端口,因此我们只能访问 8081 而不是 8080:
[ 
  { 
    "userid": 100, 
    "username": "David" 
  },
  { 
    "userid": 101, 
    "username": "John" 
  },
  { 
    "userid": 102, 
    "username": "Kevin" 
  }, 
] 
     