读书笔记《hands-on-reactive-programming-in-spring-5》反应式流 - 新流的标准
第 3 章反应式流 - 新流的标准
在本章中,我们将讨论上一章中提到的一些问题,以及多个响应式库在一个项目中相遇时出现的问题。我们还将深入研究反应系统中的背压控制。在这里,我们将监督 RxJava 提出的解决方案及其局限性。我们将探索 Reactive Streams 规范如何解决这些问题,学习该规范的要点。我们还将介绍新规范带来的反应性景观 变化 。最后,为了巩固我们的知识,我们将构建一个简单的应用程序并在其中组合几个反应式库。
本章涵盖以下主题:
- Common API problems
- Backpressure control problems
- Reactive Stream examples
- Technology compatibility problems
- Reactive Streams inside JDK 9
- Advanced concepts of Reactive Streams
- Reinforcement of reactive landscape
- Reactive Streams in action
每个人的反应
在前面的章节中,我们已经学到了很多关于Spring响应式编程的令人兴奋的东西,以及 RxJava的作用 ;在它的故事中播放。我们还研究了使用反应式编程来实现反应式系统的必要性。我们还看到了对响应式环境和 RxJava 可用替代方案的简要概述,这使得快速开始响应式编程成为可能。
API的不一致问题
一方面,extensive 的竞争库列表,例如 RxJava 和 Java Core 库的特性,例如 < code class="literal">CompletableStage,让我们可以选择编写代码的方式。 例如,我们可能依赖 RxJava 的 API 来编写 一个 流程正在处理的项目。因此,要构建一个简单的异步请求-响应交互,依赖 CompletableStage
就绰绰有余了。或者,我们可以使用特定于框架的类 例如 org.springframework.util.concurrent.ListenableFuture
来构建组件之间的异步交互并简化该框架的工作。
另一方面, 丰富的选择 可能很容易使系统过于复杂。例如,两个依赖于异步非阻塞通信概念但具有不同 API 的库的存在导致提供额外的实用程序类,以便将一个回调转换为另一个回调,反之亦然:
上述代码中的编号点解释如下:
- 这是
async
数据库客户端的接口声明,它是异步数据库访问可能的客户端接口的代表性示例。
- 这是
ListenableFuture
到CompletionStage
适配器方法的实现。在(2.1)
点,为了提供对CompletionStage
的手动控制,我们创建了它的直接实现,称为CompletableFuture
通过不带参数的构造函数。为了提供与ListenableFuture
的集成,我们必须添加回调(2.2)
, 这里我们直接复用CompletableFuture
的API。 - 这是
CompletionStage
到ListenableFuture
适配器方法实现。 点(3.1)
我们声明ListenableFuture
的具体实现叫做SettableListenableFuture
。这允许我们在(3.2)
点手动提供CompletionStage
执行的结果。 - 这是
RestController
的类声明。这里在(4.1)
这一点,我们声明了request handler方法,它异步执行并返回ListenableFuture
来处理ListenableFuture
的结果 以非阻塞方式执行。反过来,为了存储AsyncRestTemplate
的执行结果,我们必须将其适配到CompletionStage
(4.2)
。最后,为了满足 支持的API,我们不得不采用 为ListenableFuture
再次(4.3)
存储的结果ListenableFuture代码>。
从前面的示例中可以看出,没有与 Spring Framework 4.x 直接集成ListenableFuture
and CompletionStage
.此外,该示例并不是反应式编程的常见用法的例外情况。许多库和框架为组件之间的异步通信提供了自己的接口和类,其中包括普通的请求-响应通信以及流处理框架。在许多情况下,为了解决这个问题并使几个独立的库兼容,我们必须提供自己的适应并在一些地方重用它。此外,我们自己的改编可能包含错误并需要额外的维护。
笔记
在 Spring Framework 5.x 中,ListenableFuture
的 API 得到了扩展,并提供了一个名为 completable
的附加方法来解决那种不兼容。请查看以下链接以了解更多信息:https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/util/concurrent/ListenableFuture.html#completable --。
这里的核心问题在于,没有一种单一的方法可以让图书馆供应商构建他们的 对齐的 API。例如,我们可能在第 2 章, Reactive Programming in Spring - 基本概念,RxJava 被许多框架所重视,例如 Vert.x、Ratpack、Retrofit 等。
反过来,它们都为 RxJava 用户提供了支持并引入了额外的模块,从而可以轻松地集成现有项目。 乍一看,这很奇妙,因为引入 RxJava 1.x 的项目列表广泛 并包括用于 Web、桌面或移动开发的框架。然而,在对开发人员需求的支持背后,许多隐藏的陷阱影响着库供应商。当多个 RxJava 1.x 兼容库在一个地方相遇时,通常会发生的第一个问题是粗略的版本不兼容。由于 RxJava 1.x 随着时间的推移发展得非常快,许多库供应商没有机会将他们的依赖关系更新到新版本。有时,更新会带来许多 内部更改,最终导致某些版本不兼容。因此,拥有依赖于不同版本的 RxJava 1 的不同库和框架可能会导致一些不必要的问题。第二个问题与第一个类似。 RxJava 的定制是非标准化的。在这里, 自定义 指的是 提供Observable
的额外实现的能力或者特定的转换阶段,这在 RxJava 扩展开发过程中很常见。由于非标准化 API 和快速发展的内部结构,支持自定义实现是另一个挑战。
笔记
可以在以下链接中找到版本重大更改的一个很好的示例: https://github.com/ReactiveX/RxJava/issues/802。
拉与推
最后,要理解上一节描述的问题,我们还得回溯历史,分析一下最初的交互模型在源和它的订阅者之间。
在整个反应式格局演变的早期,所有库的设计都考虑到数据从源推送到订阅者。做出这个决定是因为在某些情况下纯拉模型效率不够高。这方面的一个例子是当网络上的通信出现在具有网络边界的系统中时。假设我们过滤一个巨大的 数据列表,但只从中提取前十个元素。通过采用 PULL
模型来解决这样的问题,我们得到以下代码:
注释代码再次解释如下:
- 这是
AsyncDatabaseClient
field 声明。在这里,使用该客户端,我们将异步、非阻塞通信与外部数据库连接起来。 - 这是
list
方法声明。这里我们通过返回CompletionStage
作为调用list
方法的结果来声明一个异步合约。反过来,为了聚合拉取结果并将其异步发送给调用者,我们声明Queue
和CompletableFuture
来存储接收到的值和然后手动发送收集到的Queue
稍后。在这里,在(2.1)
我们开始第一次调用pull
方法。
- 这是
pull
方法声明。在该方法中,我们调用AsyncDatabaseClient#getNextAfterId
来执行查询并异步接收结果。然后当接收到结果时,我们在点(3.1)
. 的情况下进行过滤的有效项目,我们将其聚合到队列中。此外,在(3.2)
点, 我们检查是否收集了足够的元素,将它们发送给调用者,然后退出拉取。如果上述任何一个if
分支被绕过,我们递归调用pull
方法再次(3.3)
。
从前面的代码可以看出,我们在服务和数据库之间使用了异步、非阻塞的交互。乍一看,这里并没有错。但是,如果我们看下图,我们会看到差距:
图 3.1。拉取处理流程示例
从上图中可以看出,逐个请求下一个元素会导致将请求从 Service 传递到数据库。从服务的角度来看,大部分的整体处理时间都浪费在空闲状态。即使那里没有使用资源,由于额外的网络活动,总处理时间也会增加一倍甚至三倍。而且,数据库不知道未来请求的数量,这意味着数据库无法提前生成数据,因此处于空闲状态。这意味着数据库正在等待新的请求,并且在将响应 传递给服务时效率低下,而服务正在处理传入的 响应 然后 然后 请求新的数据部分。
为了优化整体执行并保持拉取模型为一等公民,我们可以将拉取与批处理结合起来,如下中心示例的修改所示:
同样,以下键解释了代码:
- 这与前面示例中的
pull
方法声明相同。 - 这是
getNextBatchAfterId
执行。可能会注意到,AsyncDatabaseClient
方法 允许请求特定数量的元素,这些元素以List<Item> 的形式返回。 。反过来,当数据可用时,它们的处理方式几乎相同,只是创建了一个额外的
for-loop
用于分别处理批处理的每个元素(2.1)
。 - 这是递归的
pull
方法执行,其设计目的是在之前的 pull 中缺少项目的情况下检索额外的一批项目。
一方面,通过请求一批元素,我们可以显着提高 list
方法执行的性能 并减少整体处理时间。另一方面,交互模型还存在一些差距,可以通过分析下图来发现:
图 3.2。基于批处理的拉取处理流程示例
正如我们可能注意到的,我们的处理时间仍然有些低效。例如,当数据库查询数据时,客户端仍然处于空闲状态。反过来,发送一批元素比只发送一个元素需要更多的时间。最后,对整批元素的额外请求可能实际上是多余的。例如,如果 只有一个元素 剩下来完成处理并且下一批的第一个元素满足验证,那么其余的项目将被跳过并且完全冗余。
为了提供最终的优化,我们可能会请求一次数据,当它们可用时,源会异步推送它们。以下对代码的修改显示了如何实现这一点:
注释如下:
- 这是
list
方法声明。这里,Observable<Item>
返回类型标识元素正在被 推送。 - 这是查询流阶段。通过调用
AsyncDatabaseClient#getStreamOfItems
方法,我们订阅了一次数据库。在这里,在(2.1)
点我们过滤元素,并通过使用 运算符.take()
根据调用者的要求,获取特定数量的数据。
在这里,我们使用 RxJava 1.x 类作为一等公民来接收 pushed 元素。反过来,一旦满足所有要求,就会发送取消信号,并关闭与数据库的连接。当前的交互流程如下图所示:
图 3.3。 Push 处理流程示例
在上图中,再次优化了整体处理时间。在交互过程中,当服务等待第一个响应时,我们只有一个大空闲。在第一个元素到达后,数据库开始在后续元素到来时发送它们。反过来,即使处理可能比查询 the next 元素快一点,服务的整体空闲时间也很短。但是,一旦收集了 required 数量的元素,数据库仍可能生成 过多的元素,这些元素会被服务忽略。
流量控制问题
一方面, 前面的解释可能已经告诉我们 接受的核心原因 PUSH模型是通过将请求量降低到 最小化来优化 整体处理时间。这就是为什么 RxJava 1.x 和类似的库是为推送数据而设计的,这就是为什么流式传输成为分布式系统中组件之间通信的一种有价值的技术。
另一方面,仅结合 PUSH 模型,该技术有其局限性。我们可能记得第 1 章, 为什么选择 Reactive Spring?, 消息驱动通信的本质假设作为对请求的响应,服务可能会收到一个异步的、潜在的无限的消息流。这是棘手的部分,因为如果生产者不尊重消费者的吞吐量可能性,它可能会以以下两节所述的方式影响整个系统的稳定性。
慢生产者和快消费者
让我们从最简单的开始。假设我们有 生产者速度慢,消费者速度非常快。这种情况可能是由于生产者方面对未知消费者的一些精简假设而出现的。
一方面,这样的配置是一种特殊的商业假设。另一方面,实际运行时间可能不同,消费者的可能性可能会动态变化。例如,我们可能总是通过扩展生产者的数量来增加它们的数量,从而增加消费者的负载。
要解决这样的问题,我们最需要的是实际需求。不幸的是,纯 push 模型无法给我们这样的指标,因此动态增加系统的吞吐量是不可能的。
快生产者和慢消费者
第二个问题要复杂得多。假设我们有一个快速生产者和一个慢消费者。这里的问题是生产者发送的数据可能比消费者可以处理的多得多,这可能导致组件在压力下发生灾难性故障。
对于这种情况,一种直观的解决方案是将未处理的元素收集到队列中,这些元素可能位于生产者和消费者之间,甚至可能驻留在消费者端。即使消费者很忙,这种技术也可以通过处理前一个元素或部分数据来处理新数据。
使用队列处理推送数据的关键因素之一是选择具有适当特征的队列。一般来说,有三种常见的队列类型,在以下小节中进行讨论。
无界队列
第一个也是最明显的解决方案是提供一个queue,它的特点是无限大小,或者只是一个无界队列。在这种情况下,所有生成的元素首先存储在队列中,然后由实际订阅者排出。下面的大理石图描述了所提到的交互(图 3.4):
图 3.4. 无界队列示例
一方面,使用无界队列处理消息的主要好处是 消息的可传递性,这意味着消费者将在某个时间点处理所有存储的元素。
另一方面,通过成功实现消息的传递性,应用程序的弹性会降低,因为没有无限资源。例如,一旦达到内存限制,整个系统可能很容易崩溃。
有界丢弃队列
或者,为了避免内存溢出,我们可以使用一个队列,如果它已满,它可能会忽略传入的消息。下面的大理石图描绘了一个 queue,它的大小为 2 个元素,其特点是溢出时丢弃元素(图表 3.5 ):
图 3.5。具有两个项目的 容量 的丢弃队列示例
一般来说,这种技术尊重资源的限制,并且可以根据资源的容量来配置队列的容量。反过来,当消息的重要性较低时,采用这种队列是一种常见的做法。业务案例的一个示例可能是数据集更改事件流。反过来,每个事件都会触发一些使用整个数据集聚合的统计重新计算,并且与传入的事件数量相比会花费大量时间。在这种情况下,唯一重要的是数据集发生了变化。知道哪些数据受到影响并不重要。
笔记
前面提到的示例考虑了删除最新元素的最简单策略。一般来说,有一些策略可以选择要删除的元素。例如,按优先级丢弃、丢弃最旧的等等。
有界阻塞队列
另一方面,在每条消息都很重要的情况下,这种技术可能不可接受。例如,在支付系统中, 每个用户的 提交的 支付 必须被处理,并且不允许丢弃一些。因此,我们不是丢弃消息并保留有界 queue 作为处理推送数据的方法,而是一旦达到限制,可能会阻止生产者。以能够阻塞生产者为特征的队列通常称为阻塞队列。使用具有三个元素容量的阻塞队列进行交互的示例如下图所示:
图 3.6.三项容量阻塞队列示例
不幸的是,这种技术否定了系统的所有异步行为。一般来说,一旦生产者达到队列的限制,它将开始被阻塞并处于该状态,直到消费者耗尽一个元素 并且队列中的空闲空间变得可用。然后我们可以得出结论,最慢消费者的吞吐量限制了系统的整体吞吐量。随后,除了否定异步行为之外,该技术还否定了 有效的资源利用。 因此,如果我们想要实现所有三个:弹性、弹性和响应性,任何情况都是不可接受的。
此外,队列的存在可能使系统的整体设计复杂化,并增加了在上述解决方案之间寻找权衡的额外责任,这是另一个挑战。
一般来说,纯 push 模型的不受控制的语义可能会导致许多不希望的情况。这就是为什么 Reactive Manifesto 提到 机制的重要性,它允许系统 优雅地 响应负载,或者换句话说 需要 背压控制机制。
不幸的是,响应式库类似于 RxJava 1.x,并且不提供这样的标准化功能。 没有明确的 API 可以允许开箱即用地控制背压。
笔记
需要指出的是,在纯 push 模式下,可以使用批处理来稳定生产率。 RxJava 1.x 提供了诸如 .window
or .buffer
这样的操作符,使得在运行期间收集元素成为可能。指定时间段到相应的子流或集合。这种技术显示性能突增的一个示例是对数据库的批量插入或批量更新。不幸的是,并非所有服务都支持批处理操作。因此,这种技术的应用确实受到限制。
Reactive Streams 规范的基础知识
Reactive Streams 规范定义了四个 主要接口: Publisher
, < code class="literal">订阅者、订阅
和处理器
。由于该倡议独立于任何组织发展,它作为一个单独的 JAR 文件提供,其中所有接口都存在于 org.reactivestreams
包中。
通常,指定的接口与我们之前的类似(例如,在 RxJava 1.x 中)。在某种程度上,这些反映了 RxJava 中著名的类。前两个接口类似于 Observable
-Observer
,类似于经典的发布者-订阅者模型。因此,前两个被命名为 Publisher
和 Subscriber
。要检查这两个接口是否类似于 Observable
和 Observer
,让我们考虑它们的声明:
前面的代码描述了 Publisher
接口的内部结构。可能会注意到,只有一种方法可以注册 Subscriber
。与 Observable
(旨在提供有用的 DSL)相比,Publisher
代表一个简单的标准化入口点 Publisher
和 Subscriber
连接。与 Publisher
相比, Subscriber
端更像是一个冗长的 API,与我们在 中的几乎相同;来自 RxJava 的Observer
接口:
我们可能已经注意到,除了与 RxJava Observer
中的方法相同的三个方法外,规范还为我们提供了一个新的附加方法,称为 onSubscribe
。
onSubscribe
方法是一种概念上新的 API 方法,它为我们提供了一种通知订阅者
订阅成功。反过来,该方法的传入参数向我们介绍了一个名为 Subscription 的新合约。 为了理解这个想法,让我们仔细看看界面:
正如我们可能已经注意到的,Subscription
提供了控制元素生成的基础。类似于 RxJava 1.x 的Subscription#unsubscribe()
,这里我们有 cancel()
方法,允许我们取消订阅从流中,甚至完全取消发布。但是,取消功能带来的最显着改进在于新的request
方法。 Reactive Stream 规范引入了request
方法来扩展能力Publisher
和Subscriber
之间的交互。现在,为了通知 Publisher
应该推送多少数据,Subscriber
应该通知 的大小。 通过 request
方法的需求, 并且可以确保传入元素的数量不超过限制。让我们看一下下面的弹珠图来了解底层机制:
图 3.7。背压机制
从上图中可以看出,Publisher
现在 保证元素的新部分 只有在订户
要求他们。 Publisher
的整体实现取决于Publisher
,从纯粹的阻塞等待到复杂的仅根据订阅者
的请求生成数据的机制。但是,我们 现在 因为我们有上述保证,所以不必支付额外排队的费用。
此外,与纯 push模型相反,规范为我们提供了混合push- pull 模型,允许适当控制背压。
为了理解混合模型的威力,让我们回顾一下我们之前从数据库流式传输的示例,看看这种技术是否像以前一样高效:
关键如下:
- 这是列表方法声明。这里我们遵循 Reactive Streams 规范并返回
Publisher<>
接口作为通信的一等公民。 - 这是
AsyncDatabaseClient#getStreamOfItems
方法的执行。这里我们使用一个更新的方法,它返回Publisher<>
。在(2.1)
点, 我们实例化了Take
和Filter
运算符,它们接受应采用的元素数量。此外,我们传递 一个自定义Predicate
实现,这使得验证流中的传入项目成为可能。 - 此时,我们 返回之前创建的
TakeFilterOperator
实例。请记住,即使运算符具有不同的类型,它仍然扩展了Publisher
接口。
反过来,必须清楚地了解我们自定义的 TakeFilterOperator
的内部结构。以下代码扩展了该运算符的内部结构:
前面代码的关键点在下面的列表中解释:
- 这是
TakeFilterOperator
类声明。此类扩展Publisher<>
。另外,后面...
隐藏了类的构造函数及相关字段。 - 这是
Subscriber#subscribe
方法的实现。通过考虑实现,我们可以得出结论,要为流提供额外的逻辑,我们必须将实际的Subscriber
包装到扩展相同接口的适配器类中。 - 这是
TakeFilterOperator.TakeFilterInner
类声明。这个类实现了Subscriber
接口并扮演着最重要的角色,因为它作为实际的Subscriber
传递给主源。一旦在onNext
中接收到元素,它就会被过滤并传输到下游的Subscriber
。反过来,与Subscriber
接口一起,TakeFilterInner
类实现Subscription
接口,可以转移到下游Subscriber
从而控制所有下游需求。注意这里,Queue
是ArrayBlockingQueue
的实例 大小等于采取
。创建扩展Subscriber
和Subscription
接口的内部类的技术是实现中间转换阶段的经典方法。 - 这是构造函数声明。正如这里可能注意到的,除了
take
和predicate
参数,我们还有actual
通过调用subscribe()
方法订阅了TakeFilterOperator
的订阅者实例。 - 这是
Subscriber#onSubscribe
方法的实现。此处最有趣的元素位于(5.1)
处。这里我们执行了第一个Subscription#request
到远程数据库,这通常发生在第一个onSubscribe
方法期间调用。 - 这是
Subscriber#onNext
调用,它包含元素处理声明所需的有用参数列表。
- 这是元素声明的处理流程。在这里,我们在该处理中有四个关键点。一旦
remaining
应该取的元素个数大于零,实际的Subscriber
已经请求了数据,元素有效,队列中没有元素,那么我们可以直接将该元素发送到下游(7.1)
. 如果需求尚未显示,或者队列中有东西,我们必须将该元素排队(以保持顺序)并稍后交付它(7.2) 。在 元素无效的情况下,我们必须增加
过滤
元素的数量(7.3) . 最后,如果
remaining
元素个数为零,那么我们有到取消
,(7.4)
订阅
并完成直播. - 这是请求声明的附加数据的机制。在这里,如果
filtered
elements 的数量达到限制,我们会从数据库中请求额外的部分数据,而不会阻塞整个过程。 - 这是
Subscriber
和Subscriptions
方法的其余部分。
一般来说,当与数据库的连接是有线的并且TakeFilterOperator
实例已经收到 Subscription
, 第一个请求将指定数量的元素发送到数据库。紧接着,数据库开始生成指定数量的元素,并在它们到来时推送它们。反过来,TakeFilterOperator
的逻辑指定了应该请求额外数据部分的情况。一旦发生这种情况,下一部分数据的新非阻塞请求将从服务发送到数据库。这里需要注意的是 Reactive Streams规范直接指定Subscription#request
应该是非阻塞性执行,这意味着不推荐阻塞操作或任何在该方法中停止调用者执行线程的操作。
笔记
要获取有关上述行为的更多信息,请参阅以下链接: https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.2/README.md#3.4。
最后,下图描绘了服务与数据库之间的整体交互:
图 3.8。混合推挽处理流程
从上图中可能会注意到,由于 Reactive Streams 规范的原因,数据库中的第一个元素可能会稍晚到达Publisher
和 Subscriber
之间的交互合同。请求数据的新部分不需要 中断 或阻塞正在进行的元素处理。因此,整个处理时间几乎不受影响。
另一方面,在某些情况下,纯 push 模型更可取。幸运的是,Reactive Streams 足够灵活。除了动态 push-pull 模型外,规范还提供了单独的 push
和 pull 模型也是如此。根据文档,要实现纯 push 模型,我们可以考虑请求等于 to 263的需求- 1 (java.lang.Long.MAX_VALUE
)。
笔记
这个数字可能被认为是无限的,因为对于当前或预期的硬件, 在合理的时间内 实现 263-1的需求是不可行的 (每纳秒 1 个元素需要 292 年)。因此, 允许发布者在此之后停止跟踪需求: https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.2/README.md#3.17。
相反,要切换到纯 pull 模型,我们可能会在每次 Subscriber 时请求一个新元素#onNext
已被调用。
反应式流规范在行动
一般来说,正如我们在上一节中可能注意到的那样,尽管 Reactive Streams 规范中的接口很简单, 整体概念足够复杂。因此,我们将在一个日常示例中学习这三个接口的中心思想和概念行为。
让我们以新闻订阅为例,以及如何通过新的 Reactive Streams 接口使其变得更智能。考虑以下用于为新闻服务创建 Publisher
的代码:
现在让我们创建一个 Subscriber
并将其订阅到 NewsService
:
通过在 newsService
实例上调用subscribe()
,我们表现出获取最新消息的愿望。通常,在发送任何新闻摘要之前,高质量的服务会发送一封祝贺信,其中包含有关订阅和订阅取消的信息。此操作与我们的Subscriber#onSubscribe()
方法完全相同,该方法通知 Subscriber
订阅成功并赋予他们取消订阅的能力。由于我们的服务遵循 Reactive Streams 规范的规则,它允许客户端一次选择尽可能多的新闻文章。只有在客户端通过调用 Subscription#request
指定摘要的第一部分的数量后,新闻服务才会开始通过Subscriber#onNext
方法,然后订阅者可以阅读新闻。
在这里,e最终意味着在现实生活中我们可能会推迟阅读时事通讯 到晚上或周末,这意味着我们手动检查收件箱中的新闻。从订阅者的角度来看,该逻辑是在 NewsServiceSubscriber#eventuallyReadDigests()
的支持下实现的。一般来说,这种行为意味着用户的收件箱收集新闻摘要,而通常的服务订阅模型很容易溢出订阅者的收件箱。反过来,当新闻服务不经意地向订阅者发送消息而订阅者没有阅读它们时,通常会发生邮件服务提供商将新闻服务电子邮件地址列入黑名单的情况。此外,在这种情况下, Subscriber
可能会错过重要的摘要。即使这没有发生,订阅者也不会因为邮箱里满是来自新闻服务的未读摘要而感到高兴。因此,为了保持订阅者的幸福感,新闻服务需要提供传递新闻的策略。假设时事通讯的阅读状态可以确认服务。在这里,一旦我们确保所有消息都被读取,我们可能会提供一些特定的逻辑,以便仅在前一个消息已被读取时才发送新的新闻摘要。这种机制可以很容易地在规范中实现。下一段代码公开了整个提到的机制的示例:
关键如下:
- 这是实现
Subscriber
的NewsServiceSubscriber
类声明。在这里,除了普通的类定义,我们还有有用字段的列表(例如mailbox
由一个Queue
表示, 或代表当前订阅的subscription
field);换句话说,客户端和新闻服务之间的协议。 - 这是
NewsServiceSubscriber
构造函数声明。在这里,构造函数接受一个名为take
的参数,它表示用户可以 潜在地 一次或在不久的时间阅读的新闻摘要的大小。 - 这是
Subscriber#onSubscribe
方法实现。在这里,在(3.1)
点,连同存储接收到的Subscription
,我们发送较早的 偏好用户的新的读取吞吐量 到服务器。 - 这是
Subscriber#onSubscribe
方法实现。新摘要处理的整个逻辑很简单,只是将消息放入Queue
邮箱的过程。 - 这是
Subscriber#onError
和Subscriber#onComplete
方法声明。这些方法在订阅终止时调用。 - 这是公共
eventuallyReadDigest
方法声明。首先,要表明邮箱可能是空的,我们依赖Optional
。反过来,作为第一步,在(6.1)
点,我们尝试从邮箱中获取最新的未读新闻摘要。如果邮箱中没有可用的未读时事通讯,我们返回Optional.empty()
,(6.4)
。在有可用摘要的情况下,我们会减少计数器(6.2)
,它表示 以前 被新闻服务请求 的未读消息的数量。如果我们仍在等待一些消息,我们返回已完成的Optional
。否则,我们另外请求摘要的新部分并重置剩余新消息的计数器(6.3)
。
由于规范,第一次调用调用 onSubscribe()
,将 Subscription
存储在本地,然后通知 Publisher
关于他们准备通过 request()
方法接收新闻通讯的信息。反过来,当第一个摘要到来时,它被存储在 队列中 以供将来阅读,这通常发生在真实邮箱中。毕竟,当订阅者已经阅读了收件箱中的所有摘要时,Publisher
将被通知该事实并准备新的新闻部分。反过来,如果新闻服务更改订阅策略(在某些情况下意味着完成当前用户订阅),那么订阅者将通过onComplete
方法。然后将要求客户端接受新策略并自动重新订阅服务。当onError
可以被处理的一个例子是 (当然)一个包含用户偏好信息的意外删除的数据库。在这种情况下,它可能会被视为失败,然后订阅者会收到一封道歉信,并被要求重新订阅具有新偏好的服务。最后, eventuallyReadDigest
的实现无非是真实用户的操作,例如打开邮箱、查看新邮件、阅读信件并将其标记为已读,或者只是当没有新的交互时关闭邮箱。
正如我们所见,Reactive Streams 乍一看自然适用于并解决无关业务案例的问题。仅仅通过提供这样的机制,我们就可以让我们的订阅者满意,并且不会进入邮箱提供商的黑名单。
处理器概念的引入
我们已经了解了构成 Reactive Streams 规范的主要三个接口。我们还看到了提议的机制如何改进通过电子邮件发送新闻摘要的新闻服务。但是,在本节的开头,就提到了规范中有四个核心接口。最后一个是 Publisher
和 Subscriber
的组合,称为 Processor
。让我们看一下以下实现的代码:
对比 Publisher
和Subscriber
,它们是start< /em> 和 end 点按定义,Processor
是为了添加一些Publisher
和 Subscriber
之间的处理阶段。由于 Processor
可能代表一些转换逻辑,这使得流式管道行为和业务逻辑流更容易理解。 Processor
使用的光辉示例可能是可以在自定义操作符中描述的任何业务逻辑,也可能是提供流数据的额外缓存,等等.为了更好地理解 Processor
的概念应用,让我们考虑一下NewsServicePublisher
可以如何改进 ;处理器
接口。
可能隐藏在NewsServicePublisher
背后的最简单的逻辑是数据库访问以及时事通讯准备和随后对所有订阅者的多播:
图 3.9。新闻服务的邮件流示例
在此示例中,NewsServicePublisher
被拆分为四个附加组件:
- 这是
DBPublisher
阶段。在这里,Publisher
负责提供对数据库的访问并返回最新的帖子。 - 这是
NewsPreparationOperator
阶段。这个阶段是一个中间转换,负责聚合所有消息,然后,当从主源发出完成信号时,将所有消息组合到摘要中。请注意,由于聚合性质,此运算符始终产生一个元素。聚合假设存在存储,它可能在队列中或任何其他用于存储接收到的元素的集合中。 - 这是
ScheduledPublisher
阶段。这个阶段负责调度周期性任务。在前面提到的情况下,计划任务是 查询数据库(DBPublisher
),处理结果 并将接收到的数据合并到下游。请注意,ScheduledPublisher
实际上是一个无限流,合并后的Publisher
的完成被忽略。在缺少来自下游的请求的情况下,这个Publisher
通过< code class="literal">Subscriber#onError 方法。 - 这是
SmartMulticastProcessor
阶段。这个Processor
在流程中起着至关重要的作用。首先,它缓存最新的摘要。反过来,该阶段支持多播,这意味着无需为每个Subscriber
单独创建相同的流。此外,如前所述,SmartMulticastProcessor
包括一个智能邮件跟踪机制,并且 仅 为那些阅读过之前摘要的人发送新闻通讯。 - 这些是实际的订阅者,实际上是
NewsServiceSubscriber
。
一般来说,上图显示了简单的 NewsServicePublisher
背后可能隐藏的内容。反过来,该示例公开了 Processor
的实际应用。可能会注意到,我们有三个转换阶段,但只有其中一个是 the Processor
。
首先,在我们只需要从 A 到 B 的简单转换的情况下,我们不需要公开Publisher
和 订阅者
。 Subscriber
接口的存在意味着一旦Processor
订阅了upstream,元素就可以开始来了到 Subscriber#onNext
方法,并且 可能 由于缺少下游Subscriber
而可能丢失。反过来,使用这种技术,我们必须记住一个事实,即 处理器
应该在它订阅主出版商
。
然而,这使业务流程过于复杂,并且不允许我们轻松创建适合任何情况的可重用运算符。此外,Processor
实现的构造引入了对独立(来自主Publisher
)管理的额外努力;Subscriber
和适当的背压实现(例如,如果需要,使用队列)。随后,由于将处理器
作为普通运算符的实现过于复杂,这可能会导致性能下降或仅仅降低整个流的吞吐量。
由于我们知道我们只想将 A 转换为 B,因此我们只想在实际的 Subscriber
调用 Publisher# 时启动流程订阅
,我们不想过度复杂化内部实现。因此,多个 Publisher 实例的组合——接受上游作为构造函数的参数并简单地提供适配器逻辑——非常符合要求。
反过来,当我们需要多播元素时,无论是否有订阅者,Processor
都会发光。它还允许某种突变,因为它实现了 Subscriber
接口,该接口有效地允许诸如缓存之类的突变。
重视我们已经看到 TakeFilterOperator
运算符 and NewsServiceSubscriber
的实现这一事实,我们可以肯定 Publisher
、 Subscriber
、 和 Processor
的大多数实例的内部结构代码> 与上述示例类似。因此,我们不深入每个类的内部细节,只考虑所有组件的最终组成:
关键如下:
- 这是发布者、运营商和 处理器 声明。这里,
newsLetterSubscribersStream
表示订阅邮件列表的用户的无限流。反过来,在(1.1)
我们声明Supplier<?扩展 Publisher<NewsLetter>>
,它提供DBPublisher
NewsPreparationOperator
。 - 这是
SmartMulticastProcessor
到ScheduledPublisher<NewsLetter>
订阅。该操作立即启动调度程序,而后者又订阅内部Publisher
。 - 这是
newsLetterSubscribersStream
订阅。这里我们声明匿名类来实现Subscriber
。反过来,在(3.1)
我们将每个新传入的Subscriber
订阅到处理器,该处理器进行多播 所有订阅者的摘要。
在此示例中,我们将所有处理器组合在一个链中,按顺序将它们彼此包装或制作 组件 订阅 彼此。
笔记
一般来说,Publisher
/Processor
的实现是一个挑战。因此,我们跳过该章中提到的操作符或源代码的实现的详细解释。不过,要了解更多关于编写我们自己的 Publisher
所需的陷阱、模式和步骤,请参阅以下链接:https://medium.com/@olehdokuka/mastering-own-reactive-流实现部分 1-publisher-e8eaf928a78c。
总而言之,我们已经介绍了 Reactive Streams 标准的基础知识。我们已经看到了在 RxJava 等库中表达的反应式编程思想向标准接口集的转变。与此同时,我们看到 提到的接口 很容易 允许我们在系统内的组件之间定义一个异步和非阻塞的交互模型。最后,当采用 Reactive Streams 规范时,我们不仅能够在高级架构级别上构建响应式系统,而且还能够在较小组件级别上构建。
Reactive Streams 技术兼容性套件
乍一看,Reactive Streams 似乎并不棘手,但实际上它确实包含很多隐藏的陷阱。除了 Java 接口之外,该规范还包括许多记录在案的实现规则——也许这是最具挑战性的一点。这些规则严格限制了每个接口,并且保留规范中提到的所有行为至关重要。这允许进一步集成来自不同供应商的实现,这不会导致任何问题。这是形成这些规则的基本点。不幸的是,构建一个涵盖所有极端情况的适当测试套件可能比正确实现接口需要更多的时间。另一方面,开发人员需要一个通用工具来验证所有行为并确保反应式库是标准化的并且相互兼容。幸运的是,Konrad Malawski 已经为此实施了一个工具包,其名称为 Reactive Streams Technology Compatibility Kit - 或简称为 TCK。
笔记
要了解有关 TCK 的更多信息,请参阅以下链接:https://github.com/reactive-streams/reactive-streams-jvm/tree/master/tck。
TCK 保护所有 Reactive Streams 语句并根据指定规则测试相应的实现。本质上,TCK 是一堆 TestNG 测试用例,应该扩展并准备由相应的Publisher
或Subscriber
进行验证. TCK 包含一个完整的测试类列表,旨在涵盖 Reactive Streams 规范中定义的所有规则。实际上,所有测试都被命名为对应于指定的规则。例如,可能在 org.reactivestreams.tck.PublisherVerification
中找到的示例测试用例之一如下:
关键如下:
- 这是对经过测试的发布者的手动订阅。 Reactive Streams 的 TCK 提供了自己的测试类,允许验证特定行为。
- 这是期望的声明。从前面的代码中可能会注意到,这里我们根据规则 1.01 对给定
Publisher
的行为进行了特定验证。在这种情况下,我们会验证Publisher
无法发出比Subscriber
请求的元素更多的信号。 - 这是
Subscription
的取消阶段。一旦测试通过或失败,为了关闭打开的资源并完成交互,我们使用ManualSubscriber
取消订阅Publisher
代码> API。
上述测试的重要性隐藏在验证Publisher
的任何实现都应该提供的交互的基本保证之后。此外, PublisherVerification
中的所有测试用例 确保给定的Publisher
在某种程度上符合 Reactive Streams 规范。在这里,在某种程度上 意味着不可能在完整尺寸中验证所有规则。此类规则的示例是规则 3.04,它指出请求不应执行无法进行有意义测试的繁重计算。
发布者验证
随着对Reactive Streams TCK的importance的理解,还需要掌握工具包使用的基本知识。为了获得有关此工具包如何工作的基本知识,我们将验证我们的新闻服务的组件之一。由于Publisher
是我们系统的重要组成部分,我们将从分析开始。我们记得,TCK 提供 org.reactivestreams.tck.PublisherVerification
检查Publisher
的基本行为。一般来说,PublisherVerification
是一个抽象类,它要求我们只扩展两个方法。让我们看一下下面的例子,以了解如何编写之前开发的 NewsServicePublisher
的验证:
关键如下:
- 这是
NewsServicePublisherTest
类声明,它扩展了PublisherVerification
类。 - 这是无参数构造函数声明。需要注意的是
PublisherVerification
没有默认构造函数,并要求 实现它的人提供TestEnvironment< /code> 负责测试的具体配置,如超时配置、调试日志等。
- 这是
createPublisher
方法实现。此方法负责生成Publisher
,它会生成给定数量的元素。反过来,在我们的例子中,为了满足测试的要求,我们必须用一定数量的新闻条目填充数据库(3.1)
. - 这是
createFailedPublisher
方法实现。在这里,与createPublisher
方法相反,我们必须提供一个失败的NewsServicePublisher
实例。我们有一个失败的Publisher
的选项之一是当 数据源不可用时,这在我们的例子中会导致失败NewsServicePublisher
,(4.1)
。
上述测试扩展了运行 NewsServicePublisher
验证所需的基本配置。 假设 Publisher< /code> 足够灵活,能够提供给定数量的元素。 换句话说,测试可以告诉
Publisher
它应该产生多少元素 ;以及它是否应该失败或正常工作。 另一方面,有很多具体情况 Publisher
仅限于一个元素。例如,我们可能还记得,NewsPreparationOperator
只用一个元素响应 与来自上游的传入元素的数量无关。
通过简单地 按照上面提到的测试配置,我们无法检查那个Publisher
的准确性 因为许多测试用例假设 流中存在多个元素.幸运的是,Reactive Streams TCK 尊重这种极端情况,并允许设置一个名为 maxElementsFromPublisher()
的附加方法,该方法返回一个值,该值指示产生的元素的最大数量:
一方面,通过覆盖该方法,需要多个元素的测试将被跳过。另一方面,响应式流规则的覆盖范围减少了,可能需要实现自定义测试用例。
订阅者验证
上述配置是开始测试生产者行为所需的 minimum。然而,除了 Publisher
的实例,我们还有 Subscriber
的实例应该被测试以及。幸运的是, Reactive Stream 规范中的那组规则没有 Publisher
的规则复杂,但仍然需要满足所有要求。
有两种不同的测试套件来测试 NewsServiceSubscriber
。第一个称为 org.reactivestreams.tck.SubscriberBlackboxVerification
,它允许验证Subscriber
,而无需了解或修改其内部结构。当 Subscriber
来自外部代码库时,黑盒验证是一个有用的测试工具包,并且没有合法的方式来扩展该行为。另一方面,黑盒验证仅涵盖少数规则,并不能确保实现的完全正确性。要查看如何检查 NewsServiceSubscriber
,让我们先实现 Blackbox 验证测试:
关键如下:
- 这是
NewsServiceSubscriberTest
类声明,它扩展了SubscriberBlackboxVerification
测试套件。 - 这是默认的构造函数声明。在这里,与
PublisherVerification
相同,我们被要求提供特定的TestEnvironment
。 - 这是
createSubscriber
方法的实现。在这里,该方法返回NewsServiceSubscriber
实例,应根据规范对其进行测试。
- 这是
createElement
方法的实现。在这里,我们需要提供一个方法的实现,它扮演一个新元素工厂的角色,并根据需要生成一个新的NewsLetter
实例。 - 这是
triggerRequest
方法实现。由于黑盒测试假设无法访问内部,这意味着我们无法直接访问隐藏在Subscriber< 中的
Subscription
/代码>。随后,这意味着我们必须以某种方式通过手动使用给定的 API(5.1)
.
前面的示例显示了用于 Subscriber
验证的可用 API。除了两个必需的方法, createSubscriber
和 createElement
,还有一个额外的方法来处理 < code class="literal">Subscription#request 外部方法。在我们的例子中,这是一个有用的附加功能,可以让我们模拟真实的用户活动。
第二个测试套件称为 org.reactivestreams.tck.SubscriberWhiteboxVerification
。这是与 前一个类似的验证,但要通过 验证, 订阅者
应该提供与 WhiteboxSubscriberProbe
:
关键如下:
- 这是
NewsServiceSubscriberWhiteboxTest
类声明,它扩展了SubscriberWhiteboxVerification
测试套件。 - 这是
createSubscriber
方法的实现。此方法的工作原理与 Blackbox 验证相同,并返回Subscriber
实例,但这里有一个名为WhiteboxSubscriberProbe
的附加参数。 ;在这种情况下,WhiteboxSubscriberProbe
代表了一种机制,可以实现对输入信号的需求和捕获的嵌入式控制。与 Blackbox 验证相比,通过在NewsServiceSubscriber
,(2.2)
,(2.3)
,(2.4)
,(2.5)
,测试套件不仅能够发送需求,而且能够验证需求是否得到满足并且所有元素也已收到。反过来,需求调节机制也比以前更加透明。在这里,在(2.2)
点,我们实现了SubscriberPuppet
,它适应对接收到的订阅
。
我们可以看到,与黑盒验证相反,白盒需要扩展Subscriber
,在内部提供额外的钩子。虽然白盒测试涵盖了更广泛的规则,以确保被测试的 Subscriber
的正确行为, 当我们想让一个类最终成为防止它被扩展。
验证之旅的最后一部分是 处理器
的测试。为此,TCK 为我们提供了 org.reactivestreams.tck.IdentityProcessorVerification
。这个测试套件可以验证一个Processor
,它接收和产生相同类型的元素。在我们的示例中,只有 martMulticastProcessor
以这种方式运行。由于测试套件应该验证 Publisher
和Subscriber
的行为,IdentityProcessorVerification
继承了与Publisher
和 Subscriber
测试类似的配置。因此,我们不会深入了解整个测试的实现细节,而是考虑 SmartMulticastProcessor
验证所需的其他方法:
关键如下:
- 这是
SmartMulticastProcessorTest
类定义,它扩展了IdentityProcessorVerification
。 - 这是默认的构造函数定义。正如我们可能从代码中注意到的那样,(以及
TestEnvironment
配置,在该示例中被跳过)我们传递了一个附加参数,该参数指示 处理器必须缓冲而不丢弃。由于我们知道我们的Processor
只支持一个元素的 缓冲,我们必须在开始任何验证之前手动提供该数字。
前面的示例显示了 SmartMulticastProcessor
验证的基本配置。由于 IdentityProcessorVerification
扩展了 SubscriberWhiteboxVerification
和 PublisherVerification
,所以一般配置是从它们中的每一个合并而来的。
概括地说,我们概述了有助于验证实现的反应式运算符的指定行为的基本测试集。在这里,TCK 可以被认为是初始集成测试。尽管如此,我们应该记住,除了 TCK 验证之外,每个 operator 都应该仔细测试其所需的行为自己的。
笔记
要了解更多关于验证的信息,请访问原始 TCK 页面https://github.com/reactive-streams/reactive-streams-jvm/tree/master/tck。要查看更多 TCK 使用示例,请访问以下 Ratpack 存储库 https://github.com/ratpack/ratpack/tree/master/ratpack-exec/src/test/groovy/ratpack/stream/tck。在以下链接中还有用于验证 RxJava 2 的更广泛的 TCK 使用示例列表:https://github.com/ReactiveX/RxJava/tree/2.x/src/test/java/io/reactivex/tck .
JDK 9
同样,JDK 实施团队也看到了 的价值。规范首次发布后不久,Doug Lee 提出了在 JDK 9 中添加上述接口的提议。该提议得到了当前 Stream API 仅提供拉模型和 推送模型 是这里的缺失点:
"...t没有最好的流利度async/parallel API。 CompletableFuture/CompletionStage 最好地支持期货的延续风格编程, java.util.Stream < span class="emphasis">最好支持(多阶段,可能并行)对集合元素的“拉”式操作。到目前为止,缺少的一个类别是对项目的“推式”操作,因为它们变成可从活跃的来源获得。”
笔记
请注意,在底层,Java Stream API 使用 Spliterator
, 这只不过是 Iterator
的修改版本,能够并行执行。我们可能还记得, Iterator
不是为推动而设计的,而是为拉动 Iterator#next
方法。类似地, Spliterator
有 tryAdvance
方法,它是 Iterator
的 hasNext
和 next
方法。因此,我们可以得出结论,通常 Stream API 是基于拉取的。
该提案的主要目标是为 JDK 中的反应流指定接口。根据提案, Reactive Streams 规范中定义的所有接口都在 java.util.concurrent.Flow
类中作为静态子类提供。一方面,这种改进意义重大,因为 Reactive Streams 成为了 JDK 标准。另一方面,许多供应商已经依赖 org.reactivestreams.*
包中提供的规范。 因为大多数供应商(例如 RxJava)都支持JDK 的几个版本,不可能只实现这些接口和以前的接口。因此,这一改进体现了与 JDK 9+ 兼容并以某种方式将一种规范转换为另一种规范的额外要求。
幸运的是,Reactive Streams 规范为此提供了一个额外的模块,它允许将 Reactive Streams 类型转换为 JDK Flow
types:
关键如下:
- 这些是
import
定义。从导入语句中可能会注意到,我们从原始 Reactive Streams 库中导入了Publisher
和Flow
,它是 Reactive Streams 的所有接口的访问点,但移植到 JDK 9。 - 这是
Flow.Publisher
实例定义。在这里,我们从 JDK 9 定义Publisher
的实例。反过来,在(2.1)
点,我们使用FlowAdapters.toPublisher
原始 Reactive Streams 库中将Flow.Publisher
转换为org.reactivestreams.Publisher
。此外,出于演示目的,在(2.2)
行,我们使用FlowAdapters.toFlowPublisher
方法来转换org.reactivestreams.Publisher
返回Flow.Publisher
。
前面的示例展示了我们如何轻松地将 Flow.Publisher
转换为 org.reactivestreams.Publisher
。应该注意 该示例与实际业务用例无关,因为在本书出版时,没有在 JDK 9 Flow API 之上从头开始编写的知名响应式库。因此,没有必要从 Reactive Streams 规范迁移为支持 JDK 6 及更高版本的外部库。然而,在未来,一切都 很可能 改变,反应式库的新迭代肯定会在反应式流规范之上编写并移植到JDK 9。
笔记
请注意,适配器功能是作为单独的库提供的。要查看所有可用的库,请查看以下链接:http:// /www.reactive-streams.org/#jvm-interfaces-completed。
高级 - 反应式流中的异步和并行
在previous 部分中,我们讨论了概念性行为 的反应流。但是,没有提到反应管道的异步和非阻塞行为。因此,让我们深入研究 Reactive Streams 标准并分析这些行为。
一方面,Reactive Streams API 在规则 2.2 和 3.4 中规定,所有由Publisher
产生并由Subscriber
应该是非阻塞和非阻塞的。因此,我们可以确定我们可以有效地利用处理器的一个节点或一个内核,具体取决于执行环境。
另一方面,所有处理器或内核的有效利用需要并行化。对 Reactive Streams 规范中的并行化概念的通常理解可以解释为 Subscriber#onNext
方法的并行调用。不幸的是,规范在规则 1.3 中声明 on***
方法的调用 必须在线程中发出信号-安全 方式并且——如果由多个线程执行——使用 外部同步。这假定对所有 on***
方法进行序列化或简单的顺序调用。反过来,这意味着我们不能创建像 ParallelPublisher
这样的东西并对流中的元素进行并行处理。
因此,问题是:我们如何有效地利用资源?为了找到答案,我们不得不分析一下常见的流处理管道:
图 3.10。 Source 和 Destination 之间具有一些业务逻辑的处理流程示例
可能会注意到,通常的处理管道——连同数据源和最终目的地——包括一些处理或转换阶段。反过来,每个处理阶段可能会花费大量的处理时间并拖延其他执行。
在这种情况下,解决方案之一是在阶段之间传递异步消息。对于内存流处理,这意味着执行的一部分绑定到一个 Thread
,另一部分绑定到另一个 Thread
.例如,最终的元素消耗可能是一个 CPU 密集型任务,将在单独的 Thread
上进行合理处理:
图 3.11。 Source with Processing 和 Destination 之间的异步边界示例
一般来说,通过在两个独立的 Thread
之间拆分处理,我们在阶段之间设置了异步边界。反过来,通过这样做,我们可以并行化元素的整体处理,因为两个 Thread
可以彼此独立地工作。为了实现并行化,我们必须应用一个数据结构,例如 Queue
,来适当地解耦处理。因此,线程 A 内的处理独立地提供项目 to,以及线程内的Subscriber
B 独立消费来自的项目,同样的Queue
。
在线程之间拆分处理会导致数据结构中的额外开销。当然, 由于 Reactive Streams 规范,这样的数据结构总是被绑定的。反过来,数据结构中的项目数通常等于 Subscriber
从它的 请求的批次的大小发布者
,以及这个 取决于系统的一般容量。
除此之外,针对 API 实现者和开发人员的主要问题是流处理部分应该附加到哪个异步边界?这里至少会出现三个简单的选择。第一种情况是处理流附加到源资源(图 3.11),并且所有操作都发生在与源相同的边界内。在这种情况下,所有数据都被一个一个地同步处理,因此一个项目在被发送到另一个线程
上的处理之前要经过所有处理阶段的转换。与 first 情况的异步边界的第二种和相反配置是当处理附加到 Destination,或 Consumer 的 Thread
,并且可以在 元素的生产 是 CPU-密集的任务。
第三种情况发生在生产和消费是 CPU 密集型任务时。因此,运行 中间转换的最有效方式是在单独的线程
对象上运行它:
图 3.12。管道各组件之间的异步边界示例
正如我们在上图中看到的,每个处理阶段都可能绑定到一个单独的线程。一般来说,有很多方法可以配置数据流的处理。每个案例都与其最适合的条件相关。例如,当源资源的负载低于目标资源时,第一个示例有效。因此,将转换操作放在源边界内是有利可图的,反之亦然,当目标消耗的资源少于源时,处理目标边界中的所有数据是合乎逻辑的。此外,有时转换可能是资源消耗最高的操作。在这种情况下,最好将转换与 Source 和 Destination 分开。
尽管如此,重要的是要记住,在不同线程之间拆分处理不是免费的,应该在合理的资源消耗以实现边界(线程
和额外的数据结构)和高效之间取得平衡。元素处理。反过来,实现这种平衡是另一个挑战,如果没有库的 有用的 API,很难克服其实现和管理。
幸运的是,RxJava 和 Project Reactor 等反应式库提供了这样的 API。我们现在不会详细介绍提议的功能,但会在第 4 章, Project Reactor - 响应式应用程序的基础。
反应式景观的转变
JDK 9 包含 Reactive Streams 规范这一事实加强了它的重要性,并开始改变行业。开源软件行业的领导者(例如 Netflix、Red Hat、Lightbend、MongoDB、Amazon 等)已经开始在他们的产品中采用这种出色的解决方案。
RxJava 变身
通过这种方式,RxJava 提供了一个额外的模块,允许我们轻松地将一个 reactive 类型转换为另一个。下面看看如何将Observable<T>
转换为Publisher<T>
并采用rx .Subscriber
到org.reactivestreams.Subscriber
假设我们有一个应用程序使用 RxJava 1.x 和 Observable
作为组件之间的中心通信类型,如下例所示:
然而,随着 Reactive Streams 规范的发布,我们决定遵循标准并从以下特定依赖项中抽象出我们的接口:
可能会注意到,我们很容易将 Observable
替换为 Publisher
。但是,实现的重构可能比仅仅替换返回类型要花费更多的时间。幸运的是,我们可以随时 轻松地 将现有的 Observable
适配到 Publisher
,如下例所示:
关键如下:
- 这是
RxLogService
类声明。该类代表旧的基于 Rx 的实现。在(1.1)
我们使用 RxNettyHttpClient
,它允许与外部服务异步交互, 使用包装在基于 RxJava 的 API 中的 Netty 客户端来阻止时尚。 - 这是外部请求执行。在这里,使用创建的
HttpClient
实例, 我们从外部服务请求日志流,将传入的元素转换为String< /代码> 实例。
- 这是
rxStream
对Publisher
的改编 使用RxReactiveStreams
图书馆。
正如可能注意到的那样,RxJava 的开发人员很关心我们,并提供了一个额外的 RxReactiveStreams
类,从而可以转换 Observable< /code> 进入 Reactive Streams 的
Publisher
。此外,随着 Reactive Streams 规范的出现,RxJava 开发人员还提供了对背压的非标准化支持,这使得转换后的 Observable
能够兼容 Reactive Streams规范。
随着Observable
到Publisher
的转换,我们也可以转换rx.Subscriber< /code> 到
org.reactivestreams.Subscriber
。例如,日志流 以前 存储在文件中。为此,我们有自定义的Subscriber
,它负责 I/O 交互。反过来,迁移到 Reactive Streams 规范的代码变形如下所示:
关键如下:
- 这是
RxFileService
类声明。 - 这是
writeTo
方法实现,它接受Publisher
作为组件之间交互的中心类型。 - 这是基于 RxJava 的
AsyncFileSubscriber
实例声明。 - 这是
内容
订阅。为了重用基于 RxJava 的Subscriber
,我们使用相同的RxReactiveStreams
实用程序类对其进行调整。
从前面的示例中我们可以看到, RxReactiveStreams
提供了广泛的转换器列表,使得将 RxJava API 转换为 Reactive Streams API 成为可能。
同样,任何 Publisher
Observable
:
一般来说,RxJava 开始以某种方式遵循 Reactive Streams 规范。不幸的是,由于向后兼容,无法实施规范,并且没有计划扩展 反应式流规范 用于 RxJava 1.x 未来。此外,从 2018 年 3 月 31st 开始,不再有支持 RxJava 1.x 的计划。
幸运的是,RxJava 的第二次迭代带来了新的希望。第二版库之父 Dávid Karnok 显着改进了整个库的设计,并引入了一种符合 Reactive Streams 规范的附加类型。除了 Observable
(由于向后兼容性而保持不变)之外,RxJava 2 还提供了称为 Flowable
的新反应类型。
Flowable
reactive 类型提供与 Observable
相同的 API,但扩展了 org.reactivestreams。 Publisher
从一开始。如下一个示例所示,它与 fluent API 合并, Flowable
可以转换为任何常见的 RxJava 类型并返回到与 Reactive Streams 兼容的类型:
正如我们可能注意到的,将 Flowable
转换为 Observable
是一个运算符的简单应用。但是,要将 Observable
back 转换为 Flowable
,需要提供一些可用的背压策略。在 RxJava 2 中,Observable
被设计为push-only流。因此,保持转换后的Observable
符合 Reactive Streams 规范至关重要。
笔记
BackpressureStrategy
指的是生产者不尊重消费者需求时所采取的策略。换句话说,BackpressureStrategy
定义了当我们有一个快速生产者和一个慢消费者时流的行为。我们可能还记得,在本章的开头,我们讨论了相同的案例并考虑了三个中心策略。这些策略包括元素的无限缓冲、溢出时丢弃元素或基于消费者需求不足而阻塞生产者。一般来说,BackapressureStrategy
以某种方式反映了所有描述的策略,除了阻塞生产者的策略。它还提供了诸如 BackapressureStrategy.ERROR
之类的策略,当需求不足时,它会向消费者发送错误并自动断开连接。我们不会在本章中详细介绍每个策略,但会在第 4 章, Project Reactor - 响应式应用程序的基础。
Vert.x 调整
随着 RxJava 的转型,其余的 reactive 库和框架供应商也开始采用 Reactive Streams 规范。遵循规范,Vert.x
包含一个额外的模块,该模块提供对 Reactive Streams API 的支持。以下示例演示了此添加:
关键如下:
- 这是请求处理程序声明。这是一个通用请求处理程序,允许处理发送到服务器的任何请求。
- 这是
Subscriber
和 HTTP 响应声明。这里ReactiveReadStream
实现了org.reactivestreams.Subscriber
和ReadStream
,它允许将任何Publisher
转换为与Vert.x
API 兼容的数据源。 - 这是处理流程声明。在该示例中,我们引用了新的基于 Reactive Streams 的
LogsService
接口,并使用Flowable
API。 - 这是订阅阶段。一旦声明了处理流程,我们就可以订阅
ReactiveReadStream
到Flowable
。 - 这是一个响应准备阶段。
- 这是发送给客户端的最终响应。在这里,
Pump
类在复杂的背压控制机制中发挥着重要作用,以防止底层WriteStream
buffer变得太满了。
正如我们所见,Vert.x
没有提供流畅的 API 来编写元素处理流。但是,它提供了一个 API,允许将任何 Publisher
转换为 Vert.x
API,从而保持来自 Reactive Streams 的复杂背压管理到位。
Ratpack 改进
除了 Vert.x,另一个著名的 Web 框架 Ratpack 也提供对 Reactive Streams 的支持。与 Vert.x
相比,Ratpack 直接支持 Reactive Streams .例如,在 Ratpack 案例中发送日志流如下所示:
关键如下:
- 这是服务器启动的动作和请求处理程序声明。
- 这是日志流声明。
- 这是
ServerSentEvents
的准备工作。在这里,提到的类在映射阶段发挥作用,它将Publisher
中的元素转换为服务器发送事件的代表。我们可能已经注意到,ServerSentEvents
要求映射器函数声明,它描述了如何将元素映射到特定的Event
字段。 - 这是流到 I/O 的渲染。
从示例中我们可以看出,Ratpack 在核心中提供了对 Reactive Streams 的支持。现在,可以重用相同的 LogService#stream
方法,而无需提供额外的类型转换或额外模块的要求来添加对特定反应库的支持。
此外,与仅提供对 Reactive Streams 规范的简单支持的 Vert.x
相比,Ratpack 提供了自己的规范接口实现。此功能在 ratpack.stream.Streams
类中可用,它类似于 RxJava API:
在这里, Ratpack 提供了一个静态工厂来将任何 Publisher
to TransformablePublisher
转换为使用熟悉的运算符和转换阶段灵活地处理事件流。
MongoDB 反应式流驱动程序
在前面的部分中,我们从响应式库和框架的角度概述了对 Reactive 流的支持。但是,该规范的应用领域不仅限于框架或反应式库。生产者和消费者之间的相同交互规则可以应用于通过数据库驱动程序与数据库进行通信。
通过这种方式,MongoDB 提供了一个基于 Reactive Streams 的驱动程序以及基于回调的驱动程序和 RxJava 1.x 驱动程序。反过来,MongoDB 提供了额外的、 流畅的 API 实现,它提供了某种基于书面转换的查询。例如,我们可能在新闻服务示例中看到的 DBPublisher
的内部实现可能通过以下方式实现:
关键如下:
- 这是
DBPublisher
类和相关字段声明。这里,publishedOnFrom
字段 指的是应该搜索新闻帖子的日期。 - 这是构造函数声明。在这里,
DBPublisher
的构造函数中接受的参数之一是配置的 MongoDB 客户端,即com.mongodb.reactivestreams。客户端.MongoClient
。 - 这是
Publisher#subscriber
方法的实现。在这里,我们在FindPublisher
简化了DBPublisher
的实现文字">(3.1) 和订阅 到给定的Subscriber
在点(3.3)
。 我们可能已经注意到,FindPublisher
公开了一个流畅的 API,允许使用函数式编程风格构建可执行查询。
除了对 Reactive Streams 标准的支持之外,基于 Reactive Streams 的 MongoDB 驱动程序还提供了一种简化的数据查询方式。我们不会详细介绍该驱动程序的实现和行为。相反,我们将在 第 7 章 中介绍这一点, 反应式数据库访问。
反应式技术的组合在行动
要了解有关这些技术的可组合性的更多信息,让我们尝试在一个基于 Spring Framework 4 的应用程序中组合几个 reactive 库。反过来,我们的应用程序基于重新访问的新闻服务功能,并通过一个普通的 REST 端点访问它。该端点负责从数据库和外部服务中查找新闻:
图 3.13。一个应用程序中的跨库通信示例
前面的图表向我们的系统介绍了三个反应式库。在这里,我们使用 Ratpack 作为 Web 服务器。使用 TransfromablePublisher
,这使我们能够 轻松地 组合和处理来自多个来源的结果。反过来,其中一个来源是 MongoDB,它 返回 FindPublisher
作为查询的结果。最后,这里我们可以访问外部的新服务并使用RxNetty HTTP客户端抓取一部分数据,返回< code class="literal">Observable 并因此适应 org.reactivestreams.Publisher
。
总结一下,我们在系统中有四个组件,第一个是Spring Framework 4,第二个是Retrofit,它起到了web框架的作用。最后,第三和第四是 RxNetty 和 MongoDB,用于提供对新闻的访问。我们不会详细介绍负责与外部服务通信的组件的实现,但我们将介绍端点的实现。这突出了 Reactive Streams 规范作为独立框架和库的可组合性标准的价值:
关键如下:
- 这是
NewsServiceApp
类声明。此类使用@SpringBootApplication
注释进行注释,该注释假定使用 Spring Boot 功能。反过来,在(1.1)
点有一个额外的@EnableRatpack
注释,它是ratpack-spring-boot
模块并为 Ratpack 服务器启用自动配置。 - 这是常见的 bean 声明。在这里,在
(2.1)
我们配置MongoClient
豆。在(2.2)
和(2.3)
这两个点都有新闻检索和查找的服务配置。 - 这是请求的处理程序声明。在这里,要创建一个 Ratpack 请求处理程序,我们必须用
Action<Chain>
声明一个Bean
类型,它允许在(3.1)
点提供处理程序的配置。 - 这是服务调用和结果聚合。在这里,我们执行服务的方法并使用 Ratpack
Streams
API(4.1)
合并返回的流。 - 这是合并流阶段的渲染。在这里,我们将所有元素异步缩减为一个列表,然后将该列表转换为特定的渲染视图,例如 JSON
(5.1)
。 - 这是主要方法的实现。在这里,我们使用一种常用技术来使 Spring Boot 应用程序栩栩如生。
前面的示例展示了 Reactive Streams 标准的强大功能。在这里,使用几个不相关的库的 API,我们可以轻松地构建一个处理流程并将结果返回给最终用户,而无需任何额外的努力来使一个库适应另一个库。该规则的唯一排除是 HttpNewsService
,它在 retrieveNews
方法执行的结果中 returns Observable
。尽管如此,我们可能还记得, RxReactiveStreams
为我们提供了一系列有用的方法,让我们能够 轻松 转换 RxJava 1.xObservable
到Publisher
。
概括
正如我们从前面的示例中了解到的,Reactive Streams 极大地提高了反应式库的可组合性。我们还了解到 验证Publisher
兼容性的最有用的方法就是应用技术兼容性测试工具包,它与 Reactive Streams规范一起提供.
同时,规范为Reactive Streams带来了pull-push通信模型。此添加解决了背压控制问题,同时通过提供使用哪种模型的选择来增强通信灵活性。
在 Reactive Streams Specification 被包含在 JDK9 中之后,它的重要性直线上升。然而,正如我们所了解的,这种改进带来了在规范的两个变体之间进行类型转换的一些开销。
正如我们从前几节中了解到的,Reactive Streams 规范允许操作员之间的多种通信方式。这种灵活性允许以不同的方式放置异步边界。但是,由于业务需求必须证明此类决策的合理性,因此它对响应式库的供应商提出了重要责任。此外,提供的解决方案应该足够灵活,可以从 API 端对其进行配置。
通过改变反应式流的行为,规范也改变了反应式格局。 Netflix、Redhead、Lightbend、Pivotal 等开源行业的领导者已经在他们的响应式库中实施了该规范。然而,对于 Spring Framework 用户而言,反应式世界中发生的最重要的变化是引入了名为 Project Reactor 的新反应式库.
Project Reactor 发挥着重要作用,因为它是新的响应式 Spring 生态系统的构建块。因此,在深入了解 新反应式 Spring 的 内部实现 之前,我们应该探索 Project Reactor 并熟悉其角色的重要性。在下一章中,我们将通过查看示例来了解 Project Reactor 的概念构成及其应用。