读书笔记《building-restful-web-services-with-spring-5-second-edition》春季的熔剂和单通道(反应堆支架)
假设我们的应用程序中发生了 百万 用户事务。明年,它将增加到 1000 万,因此我们需要扩大规模。这样做的传统方法是添加足够的服务器(水平扩展)。
如果我们可以选择使用相同的服务器进行扩展,而不是进行水平扩展,该怎么办?是的,反应式编程将帮助我们做到这一点。反应式编程是关于同步和事件驱动的非阻塞应用程序,它不需要大量线程来垂直(在 JVM 内)而不是水平(通过集群)扩展。
响应式类型并非旨在更快地处理请求。但是,他们更关注请求并发,尤其是有效地从远程服务器请求数据。借助 Reactive 类型的支持,您将获得更高质量的服务。与在等待结果时阻塞当前线程的传统处理相比,Reactive API 仅请求可以消耗的数据量。反应式 API 处理数据流,而不仅仅是一一处理单个元素。
总体而言,反应式编程是关于非阻塞、事件驱动的应用程序,可以使用少量线程进行扩展,并以背压作为主要组件,以确保生产者(发射者)不会压倒消费者(接收者)。
Java 8 引入了 Reactive Core,它实现了 Reactive 编程模型,并建立在 Reactive Streams 规范之上,这是一个构建 Reactive 应用程序的标准。由于 lambda 语法为事件驱动方法提供了更大的灵活性,Java 8 提供了支持 Reactive 的最佳方法。此外,Java 的 lambda 语法使我们能够创建和生成小型且独立的异步任务。 Reactive Streams 的主要目标之一是解决背压问题。我们将在本章后面的部分详细讨论背压。
Java 8 Streams 和 Reactive Streams 之间的主要区别在于 Reactive 是一种推送模型,而 Java 8 Streams 侧重于拉取。在 Reactive Streams 中,根据消费者的需求和数量,所有的事件都会被推送给消费者。
响应式编程模型支持是自上次发布以来 Spring 5 的最佳特性。此外,在 Akka 和 Play 框架的支持下,Java 8 为响应式应用程序提供了更好的平台。
Reactor 建立在 Reactive Streams 规范之上。 Reactive Streams 是四个 Java 接口的捆绑包:
PublisherSubscriberSubscriptionProcessor
Publisher 将向在 Publisher 注册的订阅者发布数据项流。 Publisher 使用执行器将项目发布到 Subscriber。此外,Publisher 确保每个订阅的 Subscriber 方法调用是严格排序的。
Subscriber 仅在请求时才消费项目。您可以随时使用 Subscription 取消接收过程。
Subscription 充当 Publisher 和 Subscriber 之间的消息中介。
Processor 表示一个处理阶段,可以包括 Subscriber 和一个 Publisher . Processor 也可以发起背压和取消订阅。
背压是一种机制,它授权接收器定义它想要从发射器(数据提供者)获得多少数据。 Reactive Streams 的主要目标是处理背压。它允许:
- The control to go to the receiver, to get data after it is ready to be processed
- Defining and controlling the amount of data to be received
- Efficient handling of the slow emitter / fast receiver or fast emitter / slow receiver scenarios
截至 2017 年 9 月,Spring 宣布 5 全面上市。Spring 5 引入了一个名为 Spring WebFlux 的响应式 Web 框架。它是一个使用 Reactor 来支持 Reactive Streams API 的非阻塞 Web 框架。
传统上,阻塞线程消耗资源,非阻塞异步编程有必要发挥更好的作用。 Spring 技术团队引入了一种非阻塞异步编程模型来处理大量并发请求,特别是对于延迟敏感的工作负载。这个概念将主要用于移动应用程序和微服务。此外,此 WebFlux 将是具有许多客户端和不均匀工作负载的场景的最佳解决方案。
要了解 Reactive 组件(例如 Flux 和 Mono)的实际部分,我们必须创建自己的 REST API 并开始实现我们 API 中的 Flux 和 Mono 类。在本章中,我们将构建一个返回 Aloha 的简单 REST Web 服务。在进入实现部分之前,我们将关注创建 RESTful Web 服务所涉及的组件。
在本节中,我们将介绍以下主题:
- Flux and Mono—introduction of Spring 5: Functional Web Framework components
- Flux and Mono—in the REST API
Flux 是 Reactor 中的主要类型之一。 Flux 相当于 RxJava Observable,能够发射零个或多个项目,然后可选地完成或失败。
Flux 是实现 Publisher 接口的 Reactive 类型之一Reactive Streams 宣言。 Flux 的主要作用是处理数据流。 Flux 主要表示 N 个元素的流。
在第一章中,我们介绍了 Ticket 和 User,这两个与我们的 Web 服务相关的类。由于 Ticket 类比 User 类稍微复杂一些,我们将使用 User 类来理解 Reactive 组件。
由于 Spring 5 中的 Reactive 还没有完全稳定,我们将仅在几章中讨论 Reactive。因此,我们将为基于 Reactive 的 REST API 创建一个单独的包。此外,我们将在现有的 pom.xml 文件中添加基于响应式的依赖项。
首先,我们必须添加所有 Reactive 依赖项。在这里,我们将在现有的 pom.xml 文件中添加代码:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.packtpub.restapp</groupId>
<artifactId>ticket-management</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>ticket-management</name>
<description>Demo project for Spring Boot</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>Bismuth-RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>5.0.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>1.5.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
<version>1.5.7.RELEASE</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>5.0.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>5.0.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<version>1.5.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.ipc</groupId>
<artifactId>reactor-netty</artifactId>
</dependency>
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-core</artifactId>
<version>8.5.4</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.0.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webflux</artifactId>
<version>5.0.0.RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Note
对于 Reactive 相关的工作,您可以使用现有项目,也可以创建新项目以避免与 Non-Reactive (plain) REST API 冲突。可以使用https://start.spring.io获取基础项目,然后使用上述配置更新 Maven 文件。
在前面的 POM 配置中,我们在现有依赖项之上添加了 Reactor 依赖项(如下所述):
reactive-streamsreactor-corereactor-nettytomcat-embed-corespring-webflux
这些是使用 Reactors 所需的库。
User 类组件如下:
useridusernameuser_emailuser_type(admin, general user, CSR)
在这里,我们有四个变量用于 User 类。为了更容易理解 Reactive 组件,我们只使用了两个变量(userid, username)。让我们创建一个只有 userid 和 username 的 POJO 类。
User POJO 类如下:
package com.packtpub.reactive;
public class User {
private Integer userid;
private String username;
public User(Integer userid, String username){
this.userid = userid;
this.username = username;
}
public Integer getUserid() {
return userid;
}
public void setUserid(Integer userid) {
this.userid = userid;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
}
在前面的类中,我在实例化时使用了两个变量和一个构造函数来填充变量。此外,getter/setter 用于访问这些变量。
让我们为 User 类创建一个响应式存储库:
package com.packtpub.reactive;
import reactor.core.publisher.Flux;
public interface UserRepository {
Flux<User> getAllUsers();
}
在前面的代码中,我们为 User 和只有一个方法的类,称为 getAllUsers。通过使用这种方法,我们应该能够检索到用户列表。现在不谈 Flux,后面会讲。
可以看到这个UserRepository是一个接口。为了使用这个存储库,我们需要有一个具体的类来实现这个接口。让我们为这个 Reactive 存储库创建一个具体的类:
package com.packtpub.reactive;
import java.util.HashMap;
import java.util.Map;
import reactor.core.publisher.Flux;
public class UserRepositorySample implements UserRepository {
// initiate Users
private Map<Integer, User> users = null;
// fill dummy values for testing
public UserRepositorySample() {
// Java 9 Immutable map used
users = Map.of(
1, (new User(1, "David")),
2, (new User(2, "John")),
3, (new User(3, "Kevin"))
);
}
// this method will return all users
@Override
public Flux<User> getAllUsers() {
return Flux.fromIterable(this.users.values());
}
}
Note
由于 Java 9 提供了不可变映射,我们可以在代码中使用不可变映射。然而,这些不可变对象仅适用于本章,因为我们不对现有条目进行任何更新。在下一章中,我们将使用常规地图,因为我们需要在 CRUD 操作中对其进行编辑。
目前,我们能够从具体类中获取用户列表。现在我们需要一个 web 处理程序来检索控制器中的用户。现在让我们创建一个处理程序:
package com.packtpub.reactive;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import static org.springframework.http.MediaType.APPLICATION_JSON;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class UserHandler {
private final UserRepository userRepository;
public UserHandler(UserRepository userRepository){
this.userRepository = userRepository;
}
public Mono<ServerResponse> getAllUsers(ServerRequest request){
Flux<User> users = this.userRepository.getAllUsers();
return ServerResponse.ok().contentType(APPLICATION_JSON).body(users, User.class);
}
}
最后,我们必须创建一个服务器来保存 REST API。在以下代码中,我们的 Server 类将创建一个 REST API 来获取用户:
package com.packtpub.reactive;
import static org.springframework.http.MediaType.APPLICATION_JSON;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RequestPredicates.POST;
import static org.springframework.web.reactive.function.server.RequestPredicates.accept;
import static org.springframework.web.reactive.function.server.RequestPredicates.contentType;
import static org.springframework.web.reactive.function.server.RequestPredicates.method;
import static org.springframework.web.reactive.function.server.RequestPredicates.path;
import static org.springframework.web.reactive.function.server.RouterFunctions.nest;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import static org.springframework.web.reactive.function.server.RouterFunctions.toHttpHandler;
import java.io.IOException;
import org.springframework.http.HttpMethod;
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.ipc.netty.http.server.HttpServer;
public class Server {
public static final String HOST = "localhost";
public static final int PORT = 8081;
public static void main(String[] args) throws InterruptedException, IOException{
Server server = new Server();
server.startReactorServer();
System.out.println("Press ENTER to exit.");
System.in.read();
}
public void startReactorServer() throws InterruptedException {
RouterFunction<ServerResponse> route = routingFunction();
HttpHandler httpHandler = toHttpHandler(route);
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
HttpServer server = HttpServer.create(HOST, PORT);
server.newHandler(adapter).block();
}
public RouterFunction<ServerResponse> routingFunction() {
UserRepository repository = new UserRepositorySample();
UserHandler handler = new UserHandler(repository);
return nest (
path("/user"),
nest(
accept(APPLICATION_JSON),
route(GET("/{id}"), handler::getAllUsers)
.andRoute(method(HttpMethod.GET), handler::getAllUsers)
).andRoute(POST("/").and(contentType(APPLICATION_JSON)), handler::getAllUsers));
}
}
我们将在接下来的章节中更多地讨论我们是如何做到这一点的。只需确保您能够理解代码正在运行,并且您可以通过访问 API 在浏览器上看到输出。
运行 Server.class 你会看到日志:
Press ENTER to exit.
现在您可以在浏览器/SoapUI/Postman 或任何其他客户端中访问 API:
http://localhost:8081/user/
由于我们为 Reactive 服务器使用了 8081 端口,因此我们只能访问 8081 而不是 8080:
[
{
"userid": 100,
"username": "David"
},
{
"userid": 101,
"username": "John"
},
{
"userid": 102,
"username": "Kevin"
},
]
