读书笔记《building-restful-web-services-with-spring-5-second-edition》春季的熔剂和单通道(反应堆支架)
假设我们的应用程序中发生了 百万 用户事务。明年,它将增加到 1000 万,因此我们需要扩大规模。这样做的传统方法是添加足够的服务器(水平扩展)。
如果我们可以选择使用相同的服务器进行扩展,而不是进行水平扩展,该怎么办?是的,反应式编程将帮助我们做到这一点。反应式编程是关于同步和事件驱动的非阻塞应用程序,它不需要大量线程来垂直(在 JVM 内)而不是水平(通过集群)扩展。
响应式类型并非旨在更快地处理请求。但是,他们更关注请求并发,尤其是有效地从远程服务器请求数据。借助 Reactive 类型的支持,您将获得更高质量的服务。与在等待结果时阻塞当前线程的传统处理相比,Reactive API 仅请求可以消耗的数据量。反应式 API 处理数据流,而不仅仅是一一处理单个元素。
总体而言,反应式编程是关于非阻塞、事件驱动的应用程序,可以使用少量线程进行扩展,并以背压作为主要组件,以确保生产者(发射者)不会压倒消费者(接收者)。
Java 8 引入了 Reactive Core,它实现了 Reactive 编程模型,并建立在 Reactive Streams 规范之上,这是一个构建 Reactive 应用程序的标准。由于 lambda 语法为事件驱动方法提供了更大的灵活性,Java 8 提供了支持 Reactive 的最佳方法。此外,Java 的 lambda 语法使我们能够创建和生成小型且独立的异步任务。 Reactive Streams 的主要目标之一是解决背压问题。我们将在本章后面的部分详细讨论背压。
Java 8 Streams 和 Reactive Streams 之间的主要区别在于 Reactive 是一种推送模型,而 Java 8 Streams 侧重于拉取。在 Reactive Streams 中,根据消费者的需求和数量,所有的事件都会被推送给消费者。
响应式编程模型支持是自上次发布以来 Spring 5 的最佳特性。此外,在 Akka 和 Play 框架的支持下,Java 8 为响应式应用程序提供了更好的平台。
Reactor 建立在 Reactive Streams 规范之上。 Reactive Streams 是四个 Java 接口的捆绑包:
Publisher
Subscriber
Subscription
Processor
Publisher
将向在 Publisher
注册的订阅者发布数据项流。 Publisher
使用执行器将项目发布到 Subscriber
。此外,Publisher
确保每个订阅的 Subscriber
方法调用是严格排序的。
Subscriber
仅在请求时才消费项目。您可以随时使用 Subscription
取消接收过程。
Subscription
充当 Publisher
和 Subscriber
之间的消息中介。
Processor
表示一个处理阶段,可以包括 Subscriber
和一个 Publisher
. Processor
也可以发起背压和取消订阅。
背压是一种机制,它授权接收器定义它想要从发射器(数据提供者)获得多少数据。 Reactive Streams 的主要目标是处理背压。它允许:
- The control to go to the receiver, to get data after it is ready to be processed
- Defining and controlling the amount of data to be received
- Efficient handling of the slow emitter / fast receiver or fast emitter / slow receiver scenarios
截至 2017 年 9 月,Spring 宣布 5 全面上市。Spring 5 引入了一个名为 Spring WebFlux 的响应式 Web 框架。它是一个使用 Reactor 来支持 Reactive Streams API 的非阻塞 Web 框架。
传统上,阻塞线程消耗资源,非阻塞异步编程有必要发挥更好的作用。 Spring 技术团队引入了一种非阻塞异步编程模型来处理大量并发请求,特别是对于延迟敏感的工作负载。这个概念将主要用于移动应用程序和微服务。此外,此 WebFlux 将是具有许多客户端和不均匀工作负载的场景的最佳解决方案。
要了解 Reactive 组件(例如 Flux 和 Mono)的实际部分,我们必须创建自己的 REST API 并开始实现我们 API 中的 Flux 和 Mono 类。在本章中,我们将构建一个返回 Aloha
的简单 REST Web 服务。在进入实现部分之前,我们将关注创建 RESTful Web 服务所涉及的组件。
在本节中,我们将介绍以下主题:
- Flux and Mono—introduction of Spring 5: Functional Web Framework components
- Flux and Mono—in the REST API
Flux 是 Reactor 中的主要类型之一。 Flux 相当于 RxJava Observable,能够发射零个或多个项目,然后可选地完成或失败。
Flux 是实现 Publisher
接口的 Reactive 类型之一Reactive Streams 宣言。 Flux 的主要作用是处理数据流。 Flux 主要表示 N 个元素的流。
在第一章中,我们介绍了 Ticket
和 User
,这两个与我们的 Web 服务相关的类。由于 Ticket
类比 User
类稍微复杂一些,我们将使用 User
类来理解 Reactive 组件。
由于 Spring 5 中的 Reactive 还没有完全稳定,我们将仅在几章中讨论 Reactive。因此,我们将为基于 Reactive 的 REST API 创建一个单独的包。此外,我们将在现有的 pom.xml
文件中添加基于响应式的依赖项。
首先,我们必须添加所有 Reactive 依赖项。在这里,我们将在现有的 pom.xml
文件中添加代码:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.packtpub.restapp</groupId> <artifactId>ticket-management</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>ticket-management</name> <description>Demo project for Spring Boot</description> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-bom</artifactId> <version>Bismuth-RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-web</artifactId> <version>5.0.1.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <version>1.5.7.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> <version>1.5.7.RELEASE</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.2</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-web</artifactId> <version>5.0.0.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-webmvc</artifactId> <version>5.0.1.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <version>1.5.7.RELEASE</version> </dependency> <dependency> <groupId>org.reactivestreams</groupId> <artifactId>reactive-streams</artifactId> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> </dependency> <dependency> <groupId>io.projectreactor.ipc</groupId> <artifactId>reactor-netty</artifactId> </dependency> <dependency> <groupId>org.apache.tomcat.embed</groupId> <artifactId>tomcat-embed-core</artifactId> <version>8.5.4</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.0.0.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-webflux</artifactId> <version>5.0.0.RELEASE</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
Note
对于 Reactive 相关的工作,您可以使用现有项目,也可以创建新项目以避免与 Non-Reactive (plain) REST API 冲突。可以使用https://start.spring.io获取基础项目,然后使用上述配置更新 Maven 文件。
在前面的 POM 配置中,我们在现有依赖项之上添加了 Reactor 依赖项(如下所述):
reactive-streams
reactor-core
reactor-netty
tomcat-embed-core
spring-webflux
这些是使用 Reactors 所需的库。
User
类组件如下:
userid
username
user_email
user_type
(admin, general user, CSR)
在这里,我们有四个变量用于 User
类。为了更容易理解 Reactive 组件,我们只使用了两个变量(userid
, username
)。让我们创建一个只有 userid
和 username
的 POJO 类。
User
POJO 类如下:
package com.packtpub.reactive; public class User { private Integer userid; private String username; public User(Integer userid, String username){ this.userid = userid; this.username = username; } public Integer getUserid() { return userid; } public void setUserid(Integer userid) { this.userid = userid; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } }
在前面的类中,我在实例化时使用了两个变量和一个构造函数来填充变量。此外,getter/setter 用于访问这些变量。
让我们为 User
类创建一个响应式存储库:
package com.packtpub.reactive; import reactor.core.publisher.Flux; public interface UserRepository { Flux<User> getAllUsers(); }
在前面的代码中,我们为 User
和只有一个方法的类,称为 getAllUsers
。通过使用这种方法,我们应该能够检索到用户列表。现在不谈 Flux,后面会讲。
可以看到这个UserRepository
是一个接口。为了使用这个存储库,我们需要有一个具体的类来实现这个接口。让我们为这个 Reactive 存储库创建一个具体的类:
package com.packtpub.reactive; import java.util.HashMap; import java.util.Map; import reactor.core.publisher.Flux; public class UserRepositorySample implements UserRepository { // initiate Users private Map<Integer, User> users = null; // fill dummy values for testing public UserRepositorySample() { // Java 9 Immutable map used users = Map.of( 1, (new User(1, "David")), 2, (new User(2, "John")), 3, (new User(3, "Kevin")) ); } // this method will return all users @Override public Flux<User> getAllUsers() { return Flux.fromIterable(this.users.values()); } }
Note
由于 Java 9 提供了不可变映射,我们可以在代码中使用不可变映射。然而,这些不可变对象仅适用于本章,因为我们不对现有条目进行任何更新。在下一章中,我们将使用常规地图,因为我们需要在 CRUD 操作中对其进行编辑。
目前,我们能够从具体类中获取用户列表。现在我们需要一个 web 处理程序来检索控制器中的用户。现在让我们创建一个处理程序:
package com.packtpub.reactive; import org.springframework.web.reactive.function.server.ServerRequest; import org.springframework.web.reactive.function.server.ServerResponse; import static org.springframework.http.MediaType.APPLICATION_JSON; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class UserHandler { private final UserRepository userRepository; public UserHandler(UserRepository userRepository){ this.userRepository = userRepository; } public Mono<ServerResponse> getAllUsers(ServerRequest request){ Flux<User> users = this.userRepository.getAllUsers(); return ServerResponse.ok().contentType(APPLICATION_JSON).body(users, User.class); } }
最后,我们必须创建一个服务器来保存 REST API。在以下代码中,我们的 Server
类将创建一个 REST API 来获取用户:
package com.packtpub.reactive; import static org.springframework.http.MediaType.APPLICATION_JSON; import static org.springframework.web.reactive.function.server.RequestPredicates.GET; import static org.springframework.web.reactive.function.server.RequestPredicates.POST; import static org.springframework.web.reactive.function.server.RequestPredicates.accept; import static org.springframework.web.reactive.function.server.RequestPredicates.contentType; import static org.springframework.web.reactive.function.server.RequestPredicates.method; import static org.springframework.web.reactive.function.server.RequestPredicates.path; import static org.springframework.web.reactive.function.server.RouterFunctions.nest; import static org.springframework.web.reactive.function.server.RouterFunctions.route; import static org.springframework.web.reactive.function.server.RouterFunctions.toHttpHandler; import java.io.IOException; import org.springframework.http.HttpMethod; import org.springframework.http.server.reactive.HttpHandler; import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter; import org.springframework.web.reactive.function.server.RouterFunction; import org.springframework.web.reactive.function.server.ServerResponse; import reactor.ipc.netty.http.server.HttpServer; public class Server { public static final String HOST = "localhost"; public static final int PORT = 8081; public static void main(String[] args) throws InterruptedException, IOException{ Server server = new Server(); server.startReactorServer(); System.out.println("Press ENTER to exit."); System.in.read(); } public void startReactorServer() throws InterruptedException { RouterFunction<ServerResponse> route = routingFunction(); HttpHandler httpHandler = toHttpHandler(route); ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler); HttpServer server = HttpServer.create(HOST, PORT); server.newHandler(adapter).block(); } public RouterFunction<ServerResponse> routingFunction() { UserRepository repository = new UserRepositorySample(); UserHandler handler = new UserHandler(repository); return nest ( path("/user"), nest( accept(APPLICATION_JSON), route(GET("/{id}"), handler::getAllUsers) .andRoute(method(HttpMethod.GET), handler::getAllUsers) ).andRoute(POST("/").and(contentType(APPLICATION_JSON)), handler::getAllUsers)); } }
我们将在接下来的章节中更多地讨论我们是如何做到这一点的。只需确保您能够理解代码正在运行,并且您可以通过访问 API 在浏览器上看到输出。
运行 Server.class
你会看到日志:
Press ENTER to exit.
现在您可以在浏览器/SoapUI/Postman 或任何其他客户端中访问 API:
http://localhost:8081/user/
由于我们为 Reactive 服务器使用了 8081
端口,因此我们只能访问 8081
而不是 8080
:
[ { "userid": 100, "username": "David" }, { "userid": 101, "username": "John" }, { "userid": 102, "username": "Kevin" }, ]