vlambda博客
学习文章列表

读书笔记《hands-on-reactive-programming-in-spring-5》Project Reactor - 响应式应用程序的基础

第 4 章 Project Reactor - 响应式应用程序的基础

上一章中,我们查看了 Reactive Streams 规范的概述,以及它通过提供通用接口和用于数据交换的新拉-推模型来增强反应库的方式。

在本章中,我们将深入了解 Project Reactor,它是响应式领域中最著名的库,它已经成为 Spring Framework 生态系统的重要组成部分。我们将探索最重要、最常用的 Project Reactor API。该库功能多样且功能丰富,值得单独出一本书,不可能在一章中涵盖其整个 API。我们将深入了解 Reactor 库并使用它的强大功能来构建反应式应用程序。

在本章中,我们将讨论以下主题:

  • The history and motivation behind Project Reactor
  • Project Reactor's terminology and API
  • Advanced features of Project Reactor
  • The most crucial implementation details of Project Reactor
  • A comparison of the most frequently used reactive types
  • A business case implemented with the Reactor library

 

项目反应堆的简史


正如我们在上一章中看到的,Reactive Streams 规范 使得反应式库相互兼容,也解决了背压问题通过引入推拉数据交换模型。尽管 Reactive Streams 规范引入了重大改进,但它仍然只定义 API 和规则,并且不提供日常使用的库。本章介绍了 Reactive Streams 规范中最流行的实现之一,Project Reactor(或简称 Reactor)。然而,Reactor 库 自其早期版本以来已经发展了很多,现在已成为最先进的响应式库。让我们看看它的历史,看看 Reactive Streams 规范是如何塑造 API 和库的实现细节的。

项目反应堆版本 1.x

在处理 Reactive Streams 规范时,Spring Framework 团队的开发人员需要一个高吞吐量的数据处理框架,尤其是对于Spring XD 项目,其目标是简化大数据应用程序的开发。为了满足这一需求,Spring 团队启动了一个新项目。从一开始,它的设计就支持异步、非阻塞处理。该团队将其称为 Project Reactor。本质上,Reactor 版本 1.x 结合了消息处理的最佳实践,例如 Reactor 模式,以及函数式和响应式 programming 样式.

笔记

Reactor Pattern 是一种行为模式 它有助于异步事件处理和同步处理。这意味着所有事件都被排入队列,事件的实际处理稍后由单独的线程进行。事件被分派给所有相关方(事件处理程序)并同步处理。要了解有关反应堆模式的更多信息,请访问以下链接:http://www.dre.vanderbilt.edu/~schmidt/PDF/reactor-siemens.pdf

 

通过采用这些技术,Project Reactor 版本 1.x 使我们能够编写简洁的代码,如下所示:

Environment env =newEnvironment();                               // (1)
Reactor reactor = Reactors.reactor()                               // (2).env(env)                                //.dispatcher(Environment.RING_BUFFER)     // (2.1).get();                                  //

reactor.on($("channel"),                                           // (3)
           event -> System.out.println(event.getData()));          // 

Executors.newSingleThreadScheduledExecutor()                       // (4).scheduleAtFixedRate(                                     //()-> reactor.notify("channel", Event.wrap("test")),  // 0,100, TimeUnit.MILLISECONDS                         //
         );                                                        //

在前面的代码中,有几个概念点:

  1. 在这里,我们创建一个 Environment 实例。  Environment instance 是一个执行上下文,负责创建特定的Dispatcher。这可能会提供不同类型的调度器,从进程间调度器到分布式调度器。
  2.  Reactor 的一个实例被创建,它是反应堆模式的直接实现。在前面的示例代码中,我们使用了 Reactors 类,它是具体Reactor instances的流畅构建器.在这一点(2.1)我们使用Dispatcher 基于 RingBuffer 结构。要详细了解基于 RingBufferDispatcher 的内部结构和整体设计,请访问以下链接:https://martinfowler.com/articles/lmax.html .
  3. 这里, 声明了一个通道Selector 和一个 事件 消费者。此时,我们注册了一个事件处理程序 (在这种情况下,一个将所有接收到的事件打印到 System.out的lambda)。事件过滤使用字符串选择器进行,该选择器指示事件通道的名称。 Selectors.$ 提供了更广泛的标准选择,因此事件选择的最终表达式 可能更复杂。
  4. 这里,我们以定时任务的形式配置 Event 的生产者。在这一点上,我们使用 Java 的 ScheduledExecutorService 来安排发送 Event 到先前实例化的 Reactor 实例中的特定通道。

 

在后台,事件由 Dispatcher 处理,然后发送到目标点。根据 Dispatcher 实现,可以同步或异步处理事件。这提供了功能分解,并且通常以类似于 Spring Framework 事件处理方法的方式工作。此外,Reactor 1.x 提供了一堆有用的包装器,允许我们以清晰的流程组合事件的 处理

...                                                                // (1)
Stream<String> stream = Streams.on(reactor, $("channel"));         // (2)
stream.map(s ->"Hello world "+ s)                                // (3).distinct()                                                  //.filter((Predicate<String>) s -> s.length()>2)             //.consume(System.out::println);                               // (3.1)

Deferred<String, Stream<String>> input = Streams.defer(env);       // (4)

Stream<String> compose = input.compose()                           // (5)compose.map(m -> m +" Hello World")                               // (6)
       .filter(m -> m.contains("1"))                               //
       .map(Event::wrap)                                           //
.consume(reactor.prepare("channel"));                       // (6.1)for(int i =0; i <1000; i++){                                   // (7)
   input.accept(UUID.randomUUID().toString());                     //}                                                                  //

让我们分解前面的代码:

  1. 在这一点上,我们有一个 Environment 和一个 Reactor 创建,如前面的示例所示。
  2. 这里我们有 Stream creation。 Stream 允许构建函数转换链。通过将 Streams.on 方法应用于 Reactor 指定的 Selector,我们收到一个 Stream 对象附加到给定Reactor实例中的指定通道。
  3. 在这里,创建了处理流程。我们应用了一些中间操作,例如 mapfilter和 consume< /代码>。最后一个是终端操作符(3.1)
  4. 在这里,创建了 DeferredStream 。  Deferred class 是一个特殊的包装器,可以为 Stream 提供手动事件。在我们的例子中, Stream.defer 方法创建了 Reactor 类的一个额外实例。
  5. 至此,我们创建了一个Stream 实例。在这里,我们使用 compose< 从 Deferred 实例中检索 Stream /code> 方法就可以了。
  1. 至此,我们创建了一个反应式处理流程。管道组成的这一部分类似于我们在 (3) 处的内容。在 (6.1) 处,我们使用 Reactor API 代码的快捷方式,如下——e -> reactor.notify("channel", e)
  2. 在这里,我们为 Deferred实例提供了一个随机元素。

在前面的示例中,我们订阅了频道,然后逐步处理所有传入的事件。相比之下,在该示例中,我们使用反应式编程技术来构建声明式处理流程。在这里,我们提供了两个独立的处理阶段。此外,代码看起来像众所周知的 RxJava API,让 RxJava 用户更熟悉它。在某些时候,Reactor 1.x 与 Spring Framework 有很好的集成。 除了消息处理库,Reactor 1.x 还提供了一堆附加组件,例如 Netty 的附加组件。

总而言之,当时的 Reactor 1.x 在高速处理事件方面已经足够出色了。通过与 Spring Framework 的出色集成以及与 Netty 的组合,它使得开发提供异步和非阻塞消息处理的高性能系统成为可能。

然而,Reactor 1.x 也有它的缺点。首先,库没有背压控制。不幸的是,Reactor 1.x 的事件驱动实现除了阻塞生产者线程或跳过事件之外,没有提供控制背压的方法。此外,错误处理相当复杂。 Reactor 1.x 提供了几种处理错误和故障的方法。尽管 Reactor 1.x 的边缘很粗糙,但它被流行的 Grails web 框架使用。当然,这极大地影响了响应式库的下一次迭代。

项目反应堆版本 2.x

在 Reactor 1.x 首次正式发布后不久, Stephane Maldini 受邀参加 Reactive Streams特别兴趣小组 作为高性能消息处理系统方面的专家和 Project Reactor 联合负责人。正如我们可能猜到的那样,这个小组致力于 Reactive Streams 规范。在更好地了解 Reactive Streams 的性质并向 Rector 团队介绍了新知识后,  Stephane Maldini 和 Jon Brisbin 于 2015 年初宣布 Reactor 2.x 。 引用 Stephane Maldini 的话:< em>“Reactor 2 是 Reactive Streams 的第一枪”

 

Reactor 设计中最显着的变化是将 EventBus 和 Stream 功能提取到单独的模块中。此外,深度重新设计使新的 Reactor Streams 库完全符合 Reactive Streams 规范。 Reactor 团队极大地改进了 Reactor 的 API。例如,新的 Reactor API 与 Java Collections API 有更好的集成。

在第二个版本中,Reactor 的 Streams API 变得更加类似于 RxJava API。除了创建和使用流的简单附加功能外,还有许多用于背压管理、线程调度和弹性支持的有用附加功能,如下所示:

stream
   .retry()                                                          // (1).onOverflowBuffer()                                               // (2).onOverflowDrop()                                                 //.dispatchOn(newRingBufferDispatcher("test"))                     // (3)

前面的示例展示了三种简单的技术:

  1. 使用单行操作符retry, 我们将弹性引入到流程中,如果出现错误,上游操作应该重新运行。
  2. 使用onOverflowBuffer and onOverflowDrop 方法,当发布者仅支持推送模型(且不受消费者需求控制)。
  3. 同时,通过应用dispatchOn 运算符,我们专门使用了一个新的 Dispatcher 来处理该反应流。这保留了异步处理消息的能力。

 Reactor EventBus 也得到了改进。首先,负责发送消息的Reactor对象被重命名为EventBus。该模块也经过重新设计以支持 Reactive Streams 规范。

大约在那个时候,Stephane Maldini 遇到了 David Karnok,他正在积极研究他的论文高分辨率和透明的生产信息学。论文报告 in-在响应式流、响应式编程和 RxJava 领域进行深入研究。在密切合作中,Maldini 和 Karnok 将他们对 RxJava 和 Project Reactor 的想法和经验浓缩到一个名为 reactive-stream-commons 的库中。稍后,该库成为 Reactor 2.5 的基础,最后是 Reactor 3.x。

 

笔记

reactive-streams-commons 库的源代码可在以下 GitHub 项目页面上进行探索:https://github.com/reactor/reactive-streams-commons

经过一年的努力,Reactor 3.0 发布了。与此同时,一个漂亮的 相同 RxJava 2.0 浮出水面。与它的前身 RxJava 1.x 相比,后者与 Reactor 3.x 有更多的相似之处。这些库之间最显着的区别是 RxJava 面向 Java 6(包括 Android 支持),而 Reactor 3 选择了 Java 8 作为基准。同时,Reactor 3.x 塑造了 Spring Framework 5 的反应式变形 这就是为什么 Project Reactor 在本文的其余章节中被广泛使用的原因书。现在,我们将学习 Project Reactor 3.x 的 API 以及如何有效地使用它。

项目反应堆要点


从一开始,Reactor 库就是旨在省略 回调地狱 和深层嵌套代码构建异步管道。我们在 第 1 章 , 为什么选择 Reactive Spring? 在寻求线性代码的过程中,该库的作者使用装配线进行了类比:“您可以将响应式应用程序处理的数据视为通过装配线移动。Reactor 既是传送带又是工作站。”

该库的主要目标是提高代码的 可读性并引入 可组合性< /strong> 到使用 Reactor 库定义的工作流。公共 API 被设计为高级但非常通用,但同时它不会牺牲性能。 API 提供了一组丰富的操作符(组装类比中的 "workstations"),它们提供了比“裸”反应式最大的附加值流规范。

Reactor API 鼓励操作符链接,这使我们能够构建复杂的、可能可重用的执行图。需要注意的是,这样的执行图只定义了执行流,但在订阅者真正创建订阅之前什么都不会发生,所以 只有 订阅会触发实际的数据流

该库专为高效的数据操作而设计,包括本地和检索数据,因为异步请求具有潜在的错误 IO。这就是为什么 Project Reactor 中的错误处理运算符非常通用的原因,并且正如我们将在后面看到的那样,鼓励编写 弹性 代码。

我们已经知道,backpressure 是一个必不可少的属性,Reactive Streams 规范鼓励响应式库拥有,并且因为 Reactor 实现了规范,所以背压是Reactor 本身的一个中心主题。因此,当谈到使用 Reactor 构建的 Reactive Streams 时,数据会从发布者到订阅者向下游传输。同时,订阅和需求控制信号向上游传播,从订阅者到发布者:

读书笔记《hands-on-reactive-programming-in-spring-5》Project Reactor - 响应式应用程序的基础

图 4.1 通过反应流的数据流和订阅/需求信号传播

该库支持所有常见的背压传播模式,如下所示:

  • push only: When a subscriber asks for an effectively infinite number of elements withsubscription.request(Long.MAX_VALUE)
  • pull only: When a subscriber requests the next element only after receiving the previous one:subscription.request(1)
  • pull-push (sometimes called mixed): When a subscriber has real-time control over the demand and the publisher may adapt to the announced speed of data consumption.

 

此外,在适配不支持推拉操作模型的旧 API 时,Reactor 提供了许多 old-school backpressure 机制,即——缓冲、窗口化、消息丢弃、启动异常等等。所有这些技术将在本章后面介绍。 在某些情况下,上述策略允许在实际需求出现之前预取数据,从而提高系统的响应能力。此外,Reactor API 提供了足够的工具来消除用户活动的短暂峰值并防止系统过载。

Project Reactor 被设计为与并发无关,因此它不强制执行任何并发模型。同时,它提供了一组有用的调度程序来以几乎任何方式管理执行线程,如果所提出的调度程序都不符合要求,开发人员可以创建自己的调度程序,并具有完全的低级控制。本章后面还将介绍 Reactor 库中的线程管理。

现在,在对 Reactor 库进行简要概述之后,让我们将其添加到项目中并开始研究其到达 API。

将反应堆添加到项目中

在这里,我们假设读者已经熟悉 Reactive Streams 规范. 如果没有,在上一章里有简要介绍。 Reactive Streams 规范在当前上下文中是必不可少的,因为 Project Reactor 是在它之上构建的,并且 org.reactivestreams:reactive-streams 是 Project Reactor 的唯一强制性依赖项。

将 Project Reactor 作为依赖项添加到我们的应用程序就像在 build.gradle 文件中添加以下依赖项一样简单:

compile("io.projectreactor:reactor-core:3.2.0.RELEASE")

在撰写本文时,该库的最新版本是 3.2.0.RELEASE。此版本也用于 Spring Framework 5.1。

笔记

以下文章描述了将 Project Reactor 库添加到 Maven 项目的过程:https://projectreactor.io/docs/core/3.2.0.RELEASE/reference/#_maven_installation

通常还值得添加以下依赖项,因为它提供了测试响应式代码的必要工具集,显然,我们还需要通过单元测试来涵盖: 

testCompile("io.projectreactor:reactor-test:3.2.0.RELEASE")  

在本章中,我们将为 Reactive Streams 使用一些简单的测试技术。此外,第 9 章, 测试响应式应用程序,更详细地介绍了有关响应式代码测试的主题。

现在我们的应用程序类路径中有 Reactor,我们准备好试验 Reactor 的反应类型和操作符。

反应类型——通量和单声道

我们已经知道,Reactive Streams 规范只定义了四个接口:Publisher<T> 、 订阅者<T>、 订阅和 处理器<T,R>。或多或少,我们将按照这个列表并查看库提供的接口实现。

首先Project Reactor提供了 Publisher<T> 接口的两种实现:Flux<T> 和 单声道<T>。这种方法为响应式类型添加了额外的上下文含义。  在这里,研究响应式类型的行为(Flux and  Mono),我们将使用一些反应式操作符,而不会详细解释这些操作符的工作原理。本章稍后将介绍运算符。

通量

让我们用下面的 marble 图来描述数据如何流经 Flux 类:

读书笔记《hands-on-reactive-programming-in-spring-5》Project Reactor - 响应式应用程序的基础

图 4.2 Flux 流转换为另一个 Flux 流的示例

Flux 定义 一个普通 可以产生零个、一个或多个元素;甚至可能是无限数量的元素。它有以下公式:

onNext x 0..N [onError | onComplete]

在命令式世界中使用无限数据容器并不常见,但在函数式编程中却很常见。以下代码可能会产生一个简单的 endless Reactive Stream:

Flux.range(1, 5).repeat()

此流重复生成从 15 的数字(序列看起来像 -1, 2, 3, 4, 5, 1, 2,...)。这不是问题,也不会破坏内存,因为每个元素都可以转换和使用,而无需完成整个流的创建。此外,订阅者可以随时取消订阅,并有效地将 endless stream 转换为 有限流。

注意:试图收集 endless stream 发出的所有元素可能会导致 OutOfMemoryException .不建议在生产应用程序中这样做,但重现此类行为的最简单方法可能是使用以下代码:

Flux.range(1, 100)                                                  // (1)
   .repeat()                                                        // (2).collectList()                                                   // (3).block();                                                        // (4)

在前面的代码中,我们执行以下操作:

  1. range 运算符创建从 1100(含)。
  2. repeat 操作符在源流完成后一次又一次地订阅源响应流。因此,repeat 运算符订阅流运算符的结果,接收元素 1 到 100 和 onComplete 信号,并且然后 再次订阅,接收元素1100,然后以此类推,不停歇。
  1. 使用 collectList 运算符,我们试图将所有生成的元素收集到一个列表中。当然,因为 repeat 操作符会生成 endless stream,元素到达并增加size 的列表,因此它会消耗所有内存并导致应用程序失败并出现以下错误 - java.lang.OutOfMemoryError: Java heap space。我们的应用程序刚刚用完了可用的堆内存。
  2. block 运算符触发实际订阅并阻塞正在运行的线程,直到最终结果到达,在当前情况下,由于反应流是无止境的,这不可能发生。

单核细胞增多症

现在,让我们看看 Mono 类型与 不同 与 < code class="literal">Flux 类型:

读书笔记《hands-on-reactive-programming-in-spring-5》Project Reactor - 响应式应用程序的基础

图 4.3 Mono 流转换为另一个 Mono 流的示例

Flux相比,Mono 类型定义了一个可以产生最多一个元素,可以用下面的公式来描述:

onNext x 0..1 [onError | onComplete]

Flux 和Mono 之间的区别不仅使我们能够为方法签名引入额外的含义,而且还使Mono 由于跳过了冗余缓冲区和昂贵的同步,更有效的内部实现。

Mono<T> 在应用程序 API 最多返回一个元素的情况下可能很有用。因此,它可以轻松替换CompletableFuture<T>,提供非常相似的语义。当然,这两种类型有一些小的语义差异——CompletableFutureMono 不同,它不能在不发出值的情况下正常完成。此外,CompletableFuture 会立即开始处理,而 Mono 在订阅者出现之前什么都不做。 Mono 类型的好处在于提供了大量的响应式操作符,并且能够完美地整合到更大的响应式工作流程中。

此外,Mono 可以在需要通知客户端有关已完成操作的情况下使用。在这种情况下,我们可能会返回Mono  类型和信号onComplete() 处理完成时或onError() 如果失败。在这种情况下,我们不返回任何数据,而是发出通知,这反过来又可以用作进一步计算的触发器。

Mono 和Flux 不是分离的类型,可以很容易地相互“转换”。例如,Flux<T>.collectList() 返回Mono  和< code class="literal">Mono  returns Flux<T>。此外,该库足够智能,可以优化一些不改变语义的转换。例如,让我们考虑以下转换 ( Mono -> Flux -> Mono ):

Mono.from(Flux.from(mono))

当调用前面的代码时,它返回原始的 mono 实例,因为这在概念上是一个 no-ops 转换

RxJava 2 的响应式类型

尽管 RxJava 2.x 库和 Project Reactor 具有相同的基础,但 RxJava 2 有一组不同的 反应式发布者。由于这两个库实现了相同的想法,因此值得描述 RxJava 2 的不同之处,至少在反应类型方面。所有其他方面,包括反应式运算符、线程管理和错误处理,都非常相似。因此,或多或少熟悉其中一个库意味着熟悉它们。

 

第 2 章中所述, Spring 中的响应式编程 - 基本概念,RxJava 1.x 最初只有一种响应式类型:Observable。后来,添加了 SingleCompletable 类型。在版本 2 中,该库具有以下反应类型——Observable、 Flowable、 单个、 也许和 可完成。 让我们简要描述一下它们之间的区别,并将它们与 Flux/Mono 串联进行比较。

可观察的

RxJava 2 的 Observable 类型提供与 RxJava 1 中几乎相同的 semantics。但是,x 不再接受 null 值。另外,Observable 不支持背压,也没有实现 Publisher 接口。因此,它与 Reactive Streams 规范不直接兼容。因此,在将它用于具有许多元素(超过几千个)的流时要小心。另一方面,Observable 类型的开销比 Flowable 类型要少。它具有 toFlowable 方法,通过应用用户选择的背压策略将流转换为 Flowable 。

可流动的

Flowable 类型是 Reactor 的 字面量“通量” 类型。它实现了 Reactive Streams' Publisher。因此,它可以很容易地用于使用 Project Reactor 实现的反应式工作流,因为设计良好的 API 将使用 Publisher 类型的参数,而不是更特定于库的 通量。 

单身的

 Single 类型表示只产生一个元素的流。它不继承 Publisher 接口。它还有 toFlowable 方法,在这种情况下,不需要背压策略。 Single 更好地代表语义的 CompletableFuture 比 Reactor 的 Mono 类型。但是,它仍然不会在订阅发生之前开始处理。 

也许

为了实现与 Reactor 的 Mono 类型相同的语义,RxJava 2.x 提供了 Maybe 类型。但是,它不符合 Reactive Streams,因为 Maybe 没有实现 Publisher 接口。它具有 that 目的的 toFlowable 方法。

 

可完成

此外,RxJava 2.x 具有 Completable type that可能只会触发 onError 或 onComplete 信号, 但不能产生 onNext 信号。它也没有实现 Publisher 接口并且具有 toFlowable 方法。语义上对应Mono 类型,也不能生成 onNext信号。

总而言之,RxJava 2 在反应类型之间有更细粒度的语义区别。只有 Flowable 类型是一个 Reactive Streams 投诉。 Observable 做同样的工作,但没有背压支持。 Maybe<T> 类型对应 Reactor 的 Mono 和 RxJava 的 Completable< /code> 对应于 Reactor 的 Mono Single 类型的语义不能直接用 Project Reactor 表示,因为它的任何类型都不能保证产生的事件数量最少。要与其他 Reactive Streams 投诉代码集成,应将 RxJava 类型转换为 Flowable 类型。 

创建通量和单声道序列

Flux 和Mono 提供很多工厂方法 根据 已经可用的数据创建反应式流。例如,我们可以使用对象引用或从集合创建Flux ,或者我们甚至可以创建自己的惰性数字范围:

Flux<String>  stream1 = Flux.just("Hello","world");
Flux<Integer> stream2 = Flux.fromArray(newInteger[]{1,2,3});
Flux<Integer> stream3 = Flux.fromIterable(Arrays.asList(9, 8, 7));

使用range方法很容易生成整数流,其中2010 是起点,9 是序列中的元素个数:

Flux<Integer> stream4 = Flux.range(2010, 9);

这是生成最近几年的流的一种方便方法,因此前面的代码生成以下整数流:

2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018

Mono 提供类似的工厂方法,但主要针对一个元素。它也经常与可空和可选 类型一起使用:

Mono<String> stream5 = Mono.just("One");
Mono<String> stream6 = Mono.justOrEmpty(null);
Mono<String> stream7 = Mono.justOrEmpty(Optional.empty());

Mono 对于包装异步操作(例如 HTTP 请求或 DB 查询)可能非常有用。为此,Mono 提供了这些方法——fromCallable(Callable)fromRunnable( Runnable),fromSupplier(Supplier),fromFuture(CompletableFuture),fromCompletionStage (CompletionStage) 等。我们可以用以下代码行 Mono 包装长的 HTTP 请求:

Mono<String> stream8 = Mono.fromCallable(()->httpRequest());

或者,我们可以使用 Java 8 方法引用语法将前面的代码重写为更短:

Mono<String> stream8 = Mono.fromCallable(this::httpRequest);

请注意,前面的代码不仅异步发出 HTTP 请求(提供适当的 Scheduler),而且还处理可能作为 onError 传播的错误 信号。

FluxMono 都允许使用 from(Publisher<; T>p) 工厂方法。

两种反应类型都有创建方便且常用的空流的方法,以及只包含错误的流:

Flux<String> empty = Flux.empty();
Flux<String> never = Flux.never();
Mono<String> error = Mono.error(newRuntimeException("Unknown id"));

FluxMono 都有工厂方法称为 empty(),  ;分别生成 Flux 或Mono 的空instances< /跨度>。类似地,never() 方法创建一个永远不会发出完成、数据或错误信号的流。

 error(Throwable) 工厂方法创建一个序列,该序列总是通过onError(...)传播错误 每个订阅者订阅时的方法。错误是在 Flux orMono 声明期间创建的,因此每个订阅者都会收到相同的Throwable 实例。

defer factory 方法创建一个序列,该序列决定其在订阅时的行为,因此可能为不同的订阅者生成不同的数据:

Mono<User>requestUserData(String sessionId){return Mono.defer(()->isValidSession(sessionId)? Mono.fromCallable(()->requestUser(sessionId)): Mono.error(newRuntimeException("Invalid user session")));}

此代码将sessionId 验证推迟到实际订阅发生。相比之下,以下代码在调用 requestUserData(...) 方法时进行验证,这可能在实际订阅之前(也可能没有订阅)完全):

Mono<User>requestUserData(String sessionId){returnisValidSession(sessionId)? Mono.fromCallable(()->requestUser(sessionId)): Mono.error(new RuntimeException("Invalid user session"));}

第一个示例在每次有人订阅返回的 Mono 时验证会话。第二个示例执行会话验证,但仅在调用 requestUserData 方法时。但是,订阅时不会发生验证。

综上所述,Project Reactor 允许创建 Flux 和 Mono 序列仅通过枚举元素使用  ;只是 方法。 我们可以很容易 包装 可选 进入 Mono with  justOrEmpty,或包装 Supplier 进入 Mono with the fromSupplier 方法。 我们可以映射 Future with the fromFuture 方法或 Runnable with the fromRunnable 工厂方法。 此外,我们可以将一个数组或一个 Iterable 集合转换为 Flux 流 与 < code class="literal">fromArray 或 fromIterable 方法。除此之外,Project Reactor 还允许创建更复杂的反应序列,我们将在本章后面介绍。现在,让我们学习如何使用由反应流产生的元素。

订阅响应式流

正如我们可能猜到的那样, Flux and Mono 提供基于 lambda 的 重载  subscribe() 方法,大大简化了订阅流程:

subscribe();                                                         // (1)
subscribe(Consumer<T> dataConsumer);                                 // (2)
subscribe(Consumer<T> dataConsumer,                                  // (3)
          Consumer<Throwable> errorConsumer);
subscribe(Consumer<T> dataConsumer,                                  // (4)
          Consumer<Throwable> errorConsumer,
          Runnable completeConsumer);
subscribe(Consumer<T> dataConsumer,                                  // (5)
          Consumer<Throwable> errorConsumer,
          Runnable completeConsumer,
          Consumer<Subscription> subscriptionConsumer);

subscribe(Subscriber<T> subscriber);                                 // (6)

让我们探索创建订阅者的选项。首先,所有 subscribe 方法的重写都返回一个 Disposable 接口的实例。这可用于取消基础订阅。在 (1)(4) 的情况下,订阅请求无限需求(Long .MAX_VALUE)。现在,让我们看看它们的区别:

  1. 这是订阅流的最简单方法,因为此方法忽略所有信号。通常,应该首选其他变体。但是,有时触发具有副作用的流处理可能很有用。
  2. dataConsumer 在每个值上调用(onNext 信号)。它不处理 onErroronComplete 信号。
  3. 同选项 (2);然而,这允许 处理 onError信号。 onComplete 信号被忽略。
  4. 同选项 (3);但是,这也允许处理 onComplete 信号。
  5. 允许使用 Reactive Stream 中的所有元素,包括句柄错误和完成信号。重要的是,此覆盖允许通过请求足够数量的数据来控制订阅,当然,我们仍然可以请求 Long.MAX_VALUE)。
  6. 订阅序列的最通用方式。在这里,我们可以为我们的 Subscriber 实现提供所需的行为。尽管此选项非常通用,但很少需要。 

让我们创建一个简单的 Reactive Stream 并订阅它:

Flux.just("A","B","C").subscribe(
      data -> log.info("onNext: {}", data),
      err ->{ /* ignored  */ },()-> log.info("onComplete"));

前面的代码产生以下控制台输出:

onNext: A
onNext: B
onNext: C
onComplete

 

再次值得注意的是,简单的订阅请求无绑定需求 options (Long.MAX_VALUE) 有时可能会迫使生产者做大量的工作来完成要求。因此,如果生产者更适合处理有界需求,建议使用订阅对象或应用请求限制运算符来控制需求,本章稍后会介绍。

让我们使用手动订阅控件订阅 Reactive Stream:

Flux.range(1, 100)                                                 // (1)
    .subscribe(                                                    // (2)
        data -> log.info("onNext: {}", data),
        err -> { /* ignore */ },
        () -> log.info("onComplete"),
        subscription -> {                                          // (3)
           subscription.request(4);                                // (3.1)
           subscription.cancel();                                  // (3.2)
        }
    );

前面的代码执行以下操作:

  1. 首先,我们使用 range 运算符生成 100 值。
  2. 我们以与上一个示例相同的方式订阅流。
  3. 但是,现在我们控制订阅。一开始我们请求4(3.1)然后立即取消订阅(3.2 ),因此根本不应该生成其他元素。

运行上述代码时,我们会收到以下输出:

onNext: 1
onNext: 2
onNext: 3
onNext: 4

 

请注意,我们没有收到 onComplete 信号,因为订阅者在流完成之前取消了订阅。同样重要的是要记住,反应式流可能由生产者完成(使用 onErroronComplete 信号)或取消由订阅者通过 Subscription 实例。 此外,Disposable 实例也可用于取消目的。通常,它不是由订阅者使用,而是由上面一层抽象的代码使用。例如,让我们通过调用 Disposable 来取消流处理:

Disposable disposable = Flux.interval(Duration.ofMillis(50))         // (1)
    .subscribe(                                                      // (2)
        data -> log.info("onNext: {}", data)
    );
Thread.sleep(200);                                                   // (3)
disposable.dispose();                                                // (4)

前面的代码执行以下操作:

  1. interval 工厂方法允许生成具有定义周期(每 50 毫秒)的事件。生成的流是无穷无尽的。
  2. 我们通过仅提供 onNext 信号的处理程序来订阅。
  3. 我们等待一段时间来接收几个事件(200/50 应该允许传递大约 4 个事件)。
  4. 调用内部取消订阅的 dispose 方法。

实现自定义订阅者

如果默认 subscribe(...) 方法不提供 所需的多功能性,我们可以实现我们自己的 Subscriber。 我们 可以 总是直接实现 Subscriber  ; 来自 Reactive Streams 规范的接口并将其订阅到流,如下所示:

Subscriber<String> subscriber =newSubscriber<String>(){volatile Subscription subscription;                             // (1)publicvoidonSubscribe(Subscription s){                       // (2)
      subscription = s;                                            // (2.1)
      log.info("initial request for 1 element");                   //
      subscription.request(1);                                     // (2.2)}publicvoidonNext(String s){                                  // (3)
      log.info("onNext: {}", s);                                   //
      log.info("requesting 1 more element");                       //
      subscription.request(1);                                     // (3.1)
   }publicvoidonComplete(){
      log.info("onComplete");}publicvoidonError(Throwable t){
      log.warn("onError: {}", t.getMessage());
   }};

Flux<String> stream = Flux.just("Hello","world","!");            // (4)
stream.subscribe(subscriber);                                      // (5)

在我们的自定义 Subscriber 实现中,我们执行以下操作:

  1. 我们的订阅者必须持有对一个 Subscription 的引用,它绑定了一个 Publisher 和我们的 订阅者。 由于订阅和数据处理可能发生在不同的线程中,我们使用 volatile 关键字来确保所有线程都能正确引用 Subscription 实例。
  2. 当订阅到达时,我们的 Subscriber 会收到 onSubscribe 回调。这里我们保存订阅(2.1),请求初始需求(2.2)。如果没有该请求,将不允许 TCK 投诉提供者发送数据,并且根本不会开始元素处理。
  3. onNext 回调中,我们记录接收到的数据并请求下一个元素。在这种情况下,我们使用简单的拉取模型(subscription.request(1))进行背压管理。
  4. 在这里,我们使用 just 工厂方法生成一个简单的流。
  5. 在这里,我们将自定义订阅者订阅到步骤 (4) 中定义的 Reactive Stream。

前面的代码应该产生以下控制台输出:

initial request for 1 element
onNext: Hello
requesting 1 more element
onNext: world
requesting 1 more element
onNext: !
requesting 1 more element
onComplete

 

但是,所描述的用于定义订阅的方法是不正确的。它打破了线性 代码流并且 也 容易出错。最困难的部分是我们需要自己管理背压并正确实现订阅者的所有 TCK 要求。此外,在前面的示例中,我们打破了一些关于订阅验证和取消的 TCK 要求。

相反,建议扩展 Project Reactor 提供的 BaseSubscriber 类。在这种情况下,我们的订阅者可能如下所示:

classMySubscriber<T>extendsBaseSubscriber<T>{publicvoidhookOnSubscribe(Subscription subscription){
      log.info("initial request for 1 element");request(1);}publicvoidhookOnNext(T value){
      log.info("onNext: {}", value);
      log.info("requesting 1 more element");
      request(1);}}

除了 hookOnSubscribe(Subscription) and hookOnNext(T) 方法之外,我们还可以重写方法 这样的  ;as hookOnError(Throwable)hookOnCancel()hookOnComplete( ) 和其他一些。  BaseSubscriber 类通过这些方法——request(long)和<代码类="literal">requestUnbounded()。此外, 使用BaseSubscriber 类,实现符合 TCK 的订阅者要容易得多。当订户本身通过周到的生命周期管理拥有宝贵的资源时,可能需要这种方法。例如,订阅者可以 包装 文件处理程序或WebSocket连接到第三方服务。

使用运算符转换反应序列

使用反应序列时,除了创建和使用流之外,具有能力 完美地转换和操作它们至关重要.只有这样,反应式编程才会成为一种有用的技术。 Project Reactor 为几乎所有需要的响应式转换提供了工具(方法和工厂方法),一般来说,我们可以将库的功能分类如下:

  • Transforming existing sequences
  • Methods for peeking at the sequences' processing
  • Splitting and joiningFlux sequences
  • Working with time
  • Returning data synchronously

在这里,我们无法描述所有 Reactor 的操作符和工厂方法,因为它会占用太多页面,并且几乎不可能记住所有它们。鉴于 Project Reactor 提供了出色的文档,包括选择适当运算符的指南,这也是不必要的:http://projectreactor.io/docs/core/release/reference/#which-operator。尽管如此,在本节中,我们将通过一些代码示例来介绍最常用的运算符。

请注意,大多数运算符都有许多带有不同选项的覆盖来增强基本行为。此外,随着每个版本,Project Reactor 接收到越来越多有用的运算符。因此,请参阅 Reactor 的文档以获取有关运算符的最新更新。

映射反应序列的元素

转换序列最自然的方法是映射 每个元素到某个新值。 Flux and Mono 给出 map 操作符,其行为类似于 < code class="literal">map 运算符 来自 Java Stream API。带有map(Function<T, R>) signature 的函数允许逐个处理元素。当然,当它把元素的类型从 T 更改为 R 时,整个序列就改变了它的类型,所以 地图运算符 Flux<T> 变成 Flux<R> Mono<T> 变成 Mono<R>。  Flux.map() 的弹珠图如下:

读书笔记《hands-on-reactive-programming-in-spring-5》Project Reactor - 响应式应用程序的基础

图 4.4 运算符:map

当然,Mono 类的 map 操作符 行为类似 。  cast(Class c) 运算符 将流的元素转换为目标类。实现 cast(Class c) 运算符的最简单方法是使用 map()  运营商。我们可以查看 Flux 类的源码,找到如下代码,证明了我们的假设:

publicfinal<E> Flux<E>cast(Class<E> clazz){returnmap(clazz::cast);}

 

 

 

 

index 运算符允许枚举序列中的元素。该方法具有以下签名——Flux >索引() 。所以,现在我们必须使用 Tuple2 类。这代表了标准 Java 库中不存在的元组数据结构。该库提供 Tuple2Tuple8 类,这些类经常被库操作员使用。 timestamp 运算符的行为类似于 index 运算符,但添加了当前时间戳而不是索引。因此,下面的代码应该枚举元素并为序列中的每个元素附加时间戳:

Flux.range(2018, 5)                                                // (1)
    .timestamp()                                                   // (2)
    .index()                                                       // (3)
    .subscribe(e -> log.info("index: {}, ts: {}, value: {}",       // (4)
        e.getT1(),                                                 // (4.1)
        Instant.ofEpochMilli(e.getT2().getT1()),                   // (4.2)
        e.getT2().getT2()));                                       // (4.3)

前面的代码执行以下操作:

  1. 在这里,我们使用 range 运算符(2018 年到 2022 年)生成一些数据。该运算符返回 Flux 类型的序列。
  2. 使用 timestamp 运算符,我们附加当前时间戳。现在,该序列具有 Flux > 类型。
  3. 在这里,我们使用索引运算符应用枚举。 现在,序列具有 Flux<Tuple2<Long, Tuple2<Long, Integer>>> 类型.
  4. 在这里,我们订阅了序列和日志元素。 e.getT1() 调用返回索引 (4.1), e .getT2().getT1() 调用返回一个时间戳,我们使用 Instant(4.2),而 e.getT2().getT2()调用返回一个实际值(4.3)< /代码>。

运行前面的代码片段后,我们应该会收到以下输出:

index: 0, ts: 2018-09-24T03:00:52.041Z, value: 2018
index: 1, ts: 2018-09-24T03:00:52.061Z, value: 2019
index: 2, ts: 2018-09-24T03:00:52.061Z, value: 2020
index: 3, ts: 2018-09-24T03:00:52.061Z, value: 2021
index: 4, ts: 2018-09-24T03:00:52.062Z, value: 2022

过滤反应序列

当然,Project Reactor 包含用于过滤 元素的各种运算符,例如:

  • The filter operator passes only elements that satisfy the condition.
  • The ignoreElements operator returnsMono<T>and filters out all elements. The resulting sequence ends only after the original ends.
  • The library allows for the limiting of taken elements with the take(n)method, which ignores all elements except the firstn.
  • takeLast returns only the last element of the stream.
  • takeUntil(Predicate) passes an element until some condition is satisfied.
  • elementAt(n)allows the taking of only thenth element of the sequence.
  • The single operator emits a single item from the source and signals the NoSuchElementException error for an empty source or IndexOutOfBoundsException for a source with more than one element.
  • It is possible to take or skip an element not only by an amount but also by Durationwith the skip(Duration) ortake(Duration)operators.
  • Also, we may skip or take an element until some message arrives from another stream—takeUntilOther(Publisher)orskipUntilOther(Publisher).

让我们考虑一个工作流程,我们必须启动然后停止流处理,作为对来自其他流的某些事件的反应。代码可能如下所示:

Mono<?> startCommand =... Mono<?> stopCommand =...
Flux<UserEvent> streamOfData =...

streamOfData
    .skipUntilOther(startCommand).takeUntilOther(stopCommand).subscribe(System.out::println);

在这种情况下,我们可以开始然后停止元素处理,但只能进行一次。此用例的弹珠图如下:

读书笔记《hands-on-reactive-programming-in-spring-5》Project Reactor - 响应式应用程序的基础

图 4.5 起停命令之间的窥视元素

 

收集反应序列

可以收集列表中的所有元素并将 resulting 集合作为一个 单声道 流与 Flux.collectList() 和 Flux.collectSortedList()。最后一个不仅收集元素,还对它们进行排序。考虑以下代码:

Flux.just(1,6,2,8,3,1,5,1).collectSortedList(Comparator.reverseOrder()).subscribe(System.out::println);

这将产生以下输出,其中一个包含已排序数字的集合:

[8, 6, 5, 3, 2, 1, 1, 1]

笔记

请注意,收集集合中的序列元素可能需要大量资源,尤其是当序列具有许多元素时。此外,当尝试在无尽的流上收集时,可能会消耗所有可用内存。

Project Reactor 不仅可以将 Flux 元素集合到 List,还可以到以下内容:

  • Map ( Map<K, T>) with the collectMap operator
  • Multi-map (Map<K, Collection<T>>) with the collectMultimap operator
  • Any data structure with a custom java.util.stream.Collector and the Flux.collect(Collector) operator

 Flux 和 Mono 都有 repeat()  and repeat(times) 方法,允许循环输入序列。我们已经在上一节中使用了这些。

还有一种更方便的方法,称为 defaultIfEmpty(T), 允许为空的 Flux提供默认值  ;或 单声道

Flux.distinct() 只传递之前在流中没有遇到过的元素。但是,此方法会跟踪所有唯一元素,因此请谨慎使用,尤其是对于高基数数据流。 distinct 方法具有允许为重复跟踪提供自定义算法的覆盖。因此,有时可以手动优化 distinct 运算符的资源使用。

笔记

高基数指的是具有非常罕见或独特元素的数据。例如,识别号或用户名通常是非常重要的。同时,枚举值或来自小型固定字典的值不是。 

 Flux.distinctUntilChanged() 算子 没有这样的限制,可用于无尽的流以删除出现在不间断行中的重复项。下面的弹珠图显示了它的行为:

读书笔记《hands-on-reactive-programming-in-spring-5》Project Reactor - 响应式应用程序的基础

图 4.6。运算符——在改变之前是不同的

减少流元素

Project Reactor 可以 count() elements 在流中,或检查所有元素是否具有必需的属性 with Flux.all(Predicate)。使用 Flux.any(Predicate) 运算符还可以轻松检查至少一个元素是否具有所需的属性。

我们可以使用 hasElements 操作符检查流是否有任何元素,或者使用 hasElement 来检查流是否包含所需的元素操作员。后者实现短路逻辑,并在元素与值匹配时以 true 完成。此外,any 运算符不仅可以检查元素的相等性,还可以通过提供自定义 Predicate 实例来检查任何其他属性.让我们检查一个序列中是否有偶数:

Flux.just(3, 5, 7, 9, 11, 15, 16, 17)
    .any(e -> e % 2 == 0)
    .subscribe(hasEvens -> log.info("Has evens: {}", hasEvens));

sort 运算符允许在后台对元素进行排序,然后在原始序列完成后发出排序后的序列。

 

 

Flux 类允许使用自定义逻辑减少序列(有时,该过程称为折叠)。 reduce 运算符通常需要一个初始值和一个将上一步的结果与当前步骤的元素相结合的函数。让我们对 1 到 10 之间的整数求和:

Flux.range(1, 5)
    .reduce(0, (acc, elem) -> acc + elem)
    .subscribe(result -> log.info("Result: {}", result));

结果将是 15。 reduce 运算符只生成一个元素和最终结果。但是,在进行聚合时,有时向下游发送中间结果会很方便。  Flux.scan() 操作符就是这样做的。让我们使用 scan 运算符对 1 到 10 之间的整数求和:

Flux.range(1, 5)
    .scan(0, (acc, elem) -> acc + elem)
    .subscribe(result -> log.info("Result: {}", result));

前面的代码产生以下输出:

Result: 0
Result: 1
Result: 3
Result: 6
Result: 10
Result: 15

正如我们所看到的,最终结果是相同的(15)。但是,我们也收到了所有中间结果。话虽如此,scan 运算符对于许多需要有关正在进行的事件的信息的应用程序可能很有用。例如,我们可以计算流上的移动平均线:

int bucketSize = 5;                                                // (1)
Flux.range(1, 500)                                                 // (2)
    .index()                                                       // (3)
    .scan(                                                         // (4)
        new int[bucketSize],                                       // (4.1)
        (acc, elem) -> {                                           //
           acc[(int)(elem.getT1() % bucketSize)] = elem.getT2();   // (4.2)
           return acc;                                             // (4.3)
        })
    .skip(bucketSize)                                              // (5)
    .map(array -> Arrays.stream(array).sum() * 1.0 / bucketSize)   // (6)
    .subscribe(av -> log.info("Running average: {}", av));         // (7)

 

让我们描述一下这段代码:

  1. 在这里,我们定义移动平均窗口的大小(假设我们对最近的五个事件感兴趣)。
  2. 让我们使用 range 运算符生成一些数据。
  3. 使用 index 操作符,我们可以为每个元素附加一个索引。
  4. 使用 scan 运算符,我们将最新的五个元素收集到容器 (4.1) 中,其中元素的索引用于计算容器 (4.2) 中的位置。在每一步,我们都会返回包含更新内容的同一个容器。
  5. 在这里,我们在流的开头跳过了一些元素,以便为移动平均收集足够的数据。
  6. 计算移动平均线的值, 我们将容器内容的总和除以它的大小。
  7. 当然,我们必须订阅数据才能接收值。

MonoFlux 流具有 thenthenManythenEmpty 运算符,当上游完成时完成。运算符 忽略传入的元素,只重播完成或错误信号。这些运算符可用于在上游完成处理后立即触发新流: 

Flux.just(1, 2, 3)
    .thenMany(Flux.just(4, 5))
    .subscribe(e -> log.info("onNext: {}", e));

subscribe 方法中的 lambda 只接收 45,即使1, 2 和 3 由流生成和处理。

结合反应式流

当然,Project Reactor 允许将许多传入流合并为一个传出流。命名运算符有许多覆盖,但执行以下转换:

  • The concat operator concatenates all sources by forwarding received elements downstream. When the operator concatenates two streams, at first, it consumes and resends all elements of the first stream, then does the same for the second.
  • The merge operator merges data from upstream sequences into one downstream sequence. Unlike  the concat operator, upstream sources are subscribed to eagerly (at the same time).
  • The zip operator subscribes to all upstreams, waits for all sources to emit one element and then combines received elements into an output element. In Chapter 2, Reactive Programming in Spring - Basic Concepts, we described how zip works in detail. In Reactor, the zip operator may operate not only with reactive publishers but also with an Iterable container. For that purpose, we can use the zipWithIterable operator.
  • The combineLatest operator works similarly to the zip operator. However, it generates a new value as soon as at least one upstream source emits a value.

让我们连接几个流:

Flux.concat(
    Flux.range(1, 3),
    Flux.range(4, 2),
    Flux.range(6, 5)
).subscribe(e -> log.info("onNext: {}", e));

显然,结果中的上述代码生成的值从 110 [1, 2, 3] + [4, 5] + [6, 7, 8, 9, 10])。

批处理流元素

Project Reactor 支持流的批处理 元素 (Flux<T> )有几种方式:

  • Buffering elements into containers such as List, the result stream has the Flux<List<T>>type.
  • Windowing elements into a stream of streams such as Flux<Flux<T>>. Note that, now, the stream signals not values but sub-streams, which we can process.
  • Grouping elements by some key into a stream that has the type Flux<GroupedFlux<K, T>>. Each new key triggers a new GroupedFlux instance and all elements with that key are pushed through that instance of the GroupFlux class.

缓冲和窗口化可能基于以下情况发生:

  • The number of processed elements; let's say every 10 elements
  • Some time-span; let's say every 5 minutes
  • Based on some predicate; let's say cutting before each new even number
  • Based on an event arrival from some other Flux, which controls the execution

让我们在大小为 4 的列表中缓冲整数元素:

Flux.range(1, 13)
    .buffer(4)
    .subscribe(e -> log.info("onNext: {}", e));

上述代码生成以下输出:

onNext: [1, 2, 3, 4]
onNext: [5, 6, 7, 8]
onNext: [9, 10, 11, 12]
onNext: [13]

在程序的输出中,我们可以看到除了最后一个元素之外的所有元素都是大小为 4 的列表。最后一个元素是大小为 1 的集合,因为它是 13 除以 4 的模数。缓冲区 运算符将许多事件收集到一个事件集合中。该集合本身成为下游操作员的事件。当需要使用元素集合发出少量请求而不是仅使用一个元素的许多小请求时,缓冲区运算符对于批处理很方便。例如,我们不是将元素一个一个地插入到数据库中,而是将项目缓冲几秒钟并进行批量插入。当然,这只有在一致性要求允许我们这样做的情况下。

为了练习 window 运算符,让我们在每次元素是素数时将数字序列拆分为窗口。为此,我们可以使用 window 运算符的 windowUntil 变体。它使用谓词来确定何时制作新切片。代码可能如下所示:

Flux<Flux<Integer>> windowedFlux = Flux.range(101, 20)              // (1)
    .windowUntil(this::isPrime, true);                              // (2)

windowedFlux.subscribe(window -> window                             // (3)
        .collectList()                                              // (4)
        .subscribe(e -> log.info("window: {}", e)));                // (5)

我们看一下前面的代码:

  1. 首先,我们生成 20 个以 101 开头的整数。
  2. 在这里,每当一个数字是素数时,我们都会用元素分割一个新窗口。 windowUntil 运算符的第二个参数定义我们是在满足谓词之前还是之后切割新切片。在前面的代码中,我们分割了新素数开始其窗口的方式。结果流具有 Flux<Flux<Integer>> 类型。
  1. 现在,我们可以订阅 windowedFlux 流。然而, windowedFlux 流的每个元素本身就是一个 Reactive Stream。因此,对于每个 window,我们进行另一个反应式转换。
  2. 在我们的例子中,对于每个窗口,我们使用 collectList 运算符收集元素,以便每个窗口现在都简化为 Mono ; > 类型。
  3. 对于每个内部 Mono 元素,我们进行单独的订阅并记录收到的事件。

上述代码生成以下输出:

window: []
window: [101, 102]
window: [103, 104, 105, 106]
window: [107, 108]
window: [109, 110, 111, 112]
window: [113, 114, 115, 116, 117, 118, 119, 120]

请注意,第一个窗口是空的。发生这种情况是因为,一旦我们开始原始流,我们就会生成一个初始窗口。然后,第一个元素到达(数字 101),它是一个素数,它触发一个新窗口,因此,已经打开的窗口关闭(使用 onComplete 信号) 没有任何元素。

当然,我们可以使用 buffer 操作符来解决这个问题。两个运算符的行为非常相似。但是,buffer 仅在缓冲区关闭时发出一个集合,而 window 运算符在事件到达时立即传播,使其可以更快地做出反应并实施更复杂的工作流程。

此外,我们可以使用 groupBy 运算符按某些标准对 Reactive Stream 中的元素进行分组。让我们将整数序列除以奇数和偶数,并仅跟踪每组中的最后两个元素。代码可能如下所示:

Flux.range(1, 7)                                                   // (1)
    .groupBy(e -> e % 2 == 0 ? "Even" : "Odd")                     // (2)
    .subscribe(groupFlux -> groupFlux                              // (3)
        .scan(                                                     // (4)
            new LinkedList<>(),                                    // (4.1)
            (list, elem) -> {
                list.add(elem);                                    // (4.2)
                if (list.size() > 2) {
                    list.remove(0);                                // (4.3)
                }
                return list;
            })
        .filter(arr -> !arr.isEmpty())                             // (5)
        .subscribe(data ->                                         // (6)
            log.info("{}: {}", groupFlux.key(), data)));

我们看一下前面的代码:

  1. 在这里,我们生成一个小的数字序列。
  2. 使用 groupBy 运算符,我们根据除法模块将序列分为奇数和偶数。运算符返回 Flux<GroupedFlux<String, Integer>> 类型的流。
  3. 在这里,我们订阅 toFlux 并且对于每个分组的通量,我们应用 scan 运算符。
  4. scan 运算符是一个 seed 与空列表 (4.1). 分组通量中的每个元素都被添加到列表中 (4.2)< em>, 如果列表大于两个元素,则删除最旧的元素(4.3)
  5. scan 运算符首先传播种子,然后重新计算值。在这种情况下,filter 运算符允许我们从扫描的种子中删除空数据容器。
  6. 最后,我们分别订阅每个分组的通量并显示扫描运算符 发​​送 的内容。

正如我们所料,前面的代码显示以下输出:

Odd: [1]
Even: [2]
Odd: [1, 3]
Even: [2, 4]
Odd: [3, 5]
Even: [4, 6]
Odd: [5, 7]

此外,Project Reactor 库支持一些高级技术,例如在不同的时间窗口中对发射的元素进行分组。有关该功能,请参阅 groupJoin 运算符的文档。

flatMap、concatMap 和 flatMapSequential 运算符

当然, Project Reactor 不能省略实现flatMap 运算符,因为它是函数式编程transformation ="indexterm"> 本身。

flatMap 操作符 逻辑上由  两个操作组成——mapflatten(就 Reactor 而言,flatten 类似于 merge 运算符)。 flatMap 运算符的 map 部分将每个传入元素转换为 Reactive Stream (T -> Flux<R>), flatten 部分将所有生成的反应序列合并为一个新的反应序列,通过它传递 R 类型的元素。 

下面的弹珠图可以帮助我们把握思路:

读书笔记《hands-on-reactive-programming-in-spring-5》Project Reactor - 响应式应用程序的基础

图 4.7。运营商:平面地图

在上图中,对于每个 circle(n),我们生成 square(n) ,然后是 triangle(n)。所有这些子序列都合并到一个下游。 

Project Reactor 提供了 flatMap 运算符的一些不同变体。除了覆盖之外,该库还提供了 flatMapSequential 运算符和 concatMap 运算符。这三个运算符在几个维度上有所不同,即:

  • Whether the operator is eagerly subscribing to its inner streams (the flatMap and flatMapSequential operators subscribe eagerly, the concatMap waits for each inner completion before generating the next sub-stream and subscribing to it)
  • Whether the operator preserves the order of generated elements (the concatMap naturally preserves the same order as the source elements, the flatMapSequential operator preserves the order by queuing elements received out of order, while the flatMap operator does not necessarily preserve original ordering)
  • Whether the operator allows the interleaving of elements from different sub-streams (the flatMap operator allows interleaving, while concatMap and flatMapSequential do not)

让我们实现一个简单的算法来请求 每个用户最喜欢的书。提供用户最喜欢的书籍的服务可能如下所示:

public Flux<String> requestBooks(String user) {
    return Flux.range(1, random.nextInt(3) + 1)                      // (1)
               .map(i -> "book-" + i)                                // (2)
               .delayElements(Duration.ofMillis(3));                 // (3)

}

模拟服务执行以下操作:

  1. 服务生成随机数量的整数值
  2. 然后它将每个数字映射到书名
  3. 该服务将每本书延迟一段时间,这应该模拟与数据库的通信延迟

现在我们可以为几个用户组合执行 requestBooks 方法:

Flux.just("user-1", "user-2", "user-3")
    .flatMap(u -> requestBooks(u)
        .map(b -> u + "/" + b))
    .subscribe(r -> log.info("onNext: {}", r));

上述代码生成以下输出,证明了元素的交错:

[thread: parallel-3] onNext: user-3/book-1
[thread: parallel-1] onNext: user-1/book-1
[thread: parallel-1] onNext: user-2/book-1
[thread: parallel-4] onNext: user-3/book-2
[thread: parallel-5] onNext: user-2/book-2
[thread: parallel-6] onNext: user-1/book-2
[thread: parallel-7] onNext: user-3/book-3
[thread: parallel-8] onNext: user-2/book-3

此外,我们可以看到 flatMap 运算符的传出元素到达 不同线程中的订阅者处理程序跨度>。但是,Reactive Streams 规范保证 happens-before 语义。因此,即使元素可能在不同的线程中到达,它们也不会同时到达。  线程调度部分详细介绍了 Project Reactor 的这一方面。

 

此外,该库还可以使用 flatMapDelayErrorflatMapSequentialDelayError onError 信号code> 和 concatMapDelayError 运算符。除此之外,concatMapIterable 运算符允许在转换函数为每个元素生成迭代器 而不是响应式流时进行类似的操作。在这种情况下, 交织不会发生。

flatMap 运算符(及其变体) 在函数式编程和反应式编程中都非常重要,因为它允许使用一行代码实现复杂的工作流。  ;

采样元素

对于高吞吐量场景,通过应用采样技术仅处理 部分事件可能是有意义的。 Reactor 允许我们使用 samplesampleTimeout 操作符来做到这一点。因此,序列可能会周期性地发出与时间窗口内最近看到的值相对应的项目。让我们假设以下代码:

Flux.range(1, 100)
    .delayElements(Duration.ofMillis(1))
    .sample(Duration.ofMillis(20))
    .subscribe(e -> log.info("onNext: {}", e));

上述代码生成以下输出:

onNext: 13
onNext: 28
onNext: 43
onNext: 58
onNext: 73
onNext: 89
onNext: 100

前面的日志显示,即使我们每毫秒按顺序生成项目,订阅者也只会收到所需限制内的一小部分事件。通过这种方法,我们可以在不需要所有传入事件来成功操作的情况下使用被动速率限制。 

 

将反应序列转化为阻塞结构

Project Reactor 库提供了一个 API,用于 反应序列转换为阻塞结构。即使在反应式应用程序中应该省略任何阻塞操作,有时上层 API 也需要它。因此,我们有以下选项来阻塞流并同步生成结果:

  • The toIterable method transforms reactive Flux into a blocking Iterable.
  • The toStream method transforms reactive Flux into a blocking Stream API. As of Reactor 3.2, it uses the toIterable method under the hood.
  • The blockFirst method blocks the current thread until the upstream signals its first value or completes.
  • The blockLast method blocks the current thread until the upstream signals its last value or completes. In the case of the onError signal, it throws the exception in the blocked thread.

重要的是要记住 blockFirstblockLast 运算符具有方法覆盖,其中包含线程将被阻塞的持续时间。那应该防止无限阻塞的线程。此外,toIterabletoStream 方法可以使用 Queue  ; 存储可能比客户端代码迭代阻塞 IterableStream 更快到达的事件。

在序列处理时偷看元素

有时需要为处理管道中间的每个元素或特定信号执行 action。为了满足这些要求,Project Reactor 提供了以下方法:

  • doOnNext(Consumer<T>)allows us to execute some action on each element on FluxorMono
  • doOnComplete()anddoOnError(Throwable)are invoked on corresponding events
  • doOnSubscribe(Consumer<Subscription>),doOnRequest(LongConsumer), anddoOnCancel(Runnable) allow us to react to subscription life-cycle events
  • doOnTerminate(Runnable)is called when a stream is terminated, no matter what caused the termination

另外,FluxMono提供了 doOnEach(Consumer 方法,处理所有表示反应流域的信号—— onError, onSubscribe, onNextonError 和  onComplete

让我们考虑以下代码:

Flux.just(1, 2, 3)
    .concatWith(Flux.error(new RuntimeException("Conn error")))
.doOnEach(s -> log.info("signal: {}", s))
.subscribe();

前面的代码使用 concatWith 运算符,它是 concat 运算符的方便包装器。此外,上述代码生成以下输出:

signal: doOnEach_onNext(1)
signal: doOnEach_onNext(2)
signal: doOnEach_onNext(3)
signal: onError(java.lang.RuntimeException: Conn error)

在这个例子中,我们不仅 收到了所有onNext信号,还收到了onError信号。

物质化和非物质化信号

有时,处理流很有用,而不是根据数据,而是根据术语<一个 id="id325656526" class="indexterm"> 信号。 FluxMono 提供了 materialize 和 dematerialize 方法。一个例子如下:

Flux.range(1,3).doOnNext(e -> log.info("data  : {}", e)).materialize().doOnNext(e -> log.info("signal: {}", e)).dematerialize().collectList().subscribe(r-> log.info("result: {}", r));

前面的代码产生以下输出:

data  : 1
signal: onNext(1)
data  : 2
signal: onNext(2)
data  : 3
signal: onNext(3)
signal: onComplete()
result: [1, 2, 3]

这里,在处理信号流时, doOnNext 方法不仅接收带有数据的 onNext 事件,而且一个 onComplete 事件包装在 Signal 类中。这种方法允许处理 onNext、 onError和 onCompete 一种类型层次结构中的事件。

如果我们只想要记录信号而不修改它们,Rector 提供了 log 方法,它使用 可用的记录器 记录所有处理过的信号。

寻找合适的运营商

Project Reactor 为 Reactive 流处理提供了一个非常通用的 DSL。但是,需要一些练习才能习惯该库,以便为任务选择合适的操作员并不困难。 Reactor 流畅的 API 和编写良好的文档对这个过程有很大帮​​助。此外,我们建议您阅读官方文档中的 Which operator? 部分,如果不清楚具体使用哪个运算符问题。这篇文章可以在这里找到:http://projectreactor。 io/docs/core/release/reference/#which-operator

以编程方式创建流

我们已经介绍了如何创建数组、期货和阻塞请求的反应式流。但是,有时,我们需要一种更复杂的方法来在流中生成信号或将对象的生命周期绑定到 Reactive 流。本节介绍 Reactor 提供的以编程方式创建流的内容。

工厂方法——推送和创建

push 工厂方法允许 programmatical 创建 Flux 实例通过适配单线程生产者。这种方法对于适应 异步、单线程、多值 API 非常有用,而无需担心背压 和 消除。如果订户无法处理负载,则排队信号会涵盖这两个方面。我们来看下面的代码:

Flux.push(emitter -> IntStream                                     // (1)
        .range(2000, 3000)                                         // (1.1)
        .forEach(emitter::next))                                   // (1.2)
    .delayElements(Duration.ofMillis(1))                           // (2)
    .subscribe(e -> log.info("onNext: {}", e));                    // (3)

 

我们看一下前面的代码:

  1. 在这里,我们使用 push 工厂方法来使一些现有的 API 适应响应式范例。为了简单起见,这里我们使用 Java Stream API 生成 1000 个整数元素 (1.1) 并将它们发送到 emitter FluxSink 类型的 code> 对象 (1.2)。在 push 方法中,我们不关心背压和取消,因为这些功能由 push 方法本身覆盖。 
  2. 让我们延迟流中的每个元素以模拟背压情况。
  3. 在这里,我们订阅 onNext 事件。

 push factory 方法可以方便地使用默认背压和取消策略来调整异步 API。

此外,还有 create factory 方法,其行为类似于 push 工厂方法。但是,这允许从不同线程发送事件,因为它额外序列化 FluxSink 实例。这两种方法都允许覆盖溢出策略,还可以通过注册额外的处理程序来启用资源清理,如下面的代码所示:

Flux.create(emitter -> {
    emitter.onDispose(() -> log.info("Disposed"));
    // push events to emitter
})
    .subscribe(e -> log.info("onNext: {}", e));

工厂方法——生成

generate 工厂方法旨在允许 基于生成器的内部转发状态的复杂序列。它需要一个初始值和一个函数,该函数根据前一个内部状态计算下一个内部状态,并将 onNext 信号发送给下游订阅者。例如,让我们创建一个简单的 Reactive Stream 来生成 Fibonacci 序列 (1, 1, 2, 3, 5, 8, 13, ...)。此任务的代码可能如下所示:

Flux.generate(                                                     // (1)
    () -> Tuples.of(0L, 1L),                                       // (1.1)      
    (state, sink) -> {                                             //
        log.info("generated value: {}", state.getT2());            //
        sink.next(state.getT2());                                  // (1.2)
        long newValue = state.getT1() + state.getT2();             //
        return Tuples.of(state.getT2(), newValue);                 // (1.3) 
    })
    .delayElements(Duration.ofMillis(1))                           // (2)
    .take(7)                                                       // (3)
    .subscribe(e -> log.info("onNext: {}", e));                    // (4)

我们看一下前面的代码:

  1. 使用生成工厂方法,我们可以创建自定义的反应序列。我们使用 Tuples.of(0L, 1L) 作为序列(1.1)的初始状态。在生成步骤中,我们通过引用状态对 (1.2)  中的第二个值来发送 onNext 信号并重新计算基于斐波那契数列 (1.3) 中的下一个值的新状态对。
  2. 使用 delayElements 运算符,我们在 onNext 信号之间引入了一些延迟。
  3.  这里,为了简单起见,我们只取前七个元素。
  4. 当然,要触发序列生成,我们订阅事件。

前面的代码产生以下输出:

generated value: 1
onNext: 1
generated value: 1
onNext: 1
generated value: 2
onNext: 2
generated value: 3
onNext: 3
generated value: 5
onNext: 5
generated value: 8
onNext: 8
generated value: 13
onNext: 13

正如我们在日志中看到的,每个新值都会在生成下一个值之前同步传播到订阅者。这种方法可用于生成不同的、复杂的反应序列,这些反应序列需要发射之间的中间状态。

 

 

将一次性资源包装到 Reactive Streams

 using 工厂方法 允许创建流依赖 在 一次性 资源上。它在响应式编程中实现了 try-with-resources 方法。让我们假设需要包装一个阻塞 API,该 API 用以下特意简化的 Connection 类表示:

public class Connection implements AutoCloseable {                 // (1)
   private final Random rnd = new Random();

   public Iterable<String> getData() {                             // (2)
      if (rnd.nextInt(10) < 3) {                                   // (2.1)
         throw new RuntimeException("Communication error");
      }
      return Arrays.asList("Some", "data");                        // (2.2)
   }

   public void close() {                                           // (3)
      log.info("IO Connection closed");
   }

   public static Connection newConnection() {                      // (4)       
      log.info("IO Connection created");
      return new Connection();
   }
}

上述代码描述了以下内容:

  1.  Connection 类管理一些内部资源,并通过实现 AutoClosable通知这个  ;界面。
  2.  getData 方法模拟一个IO操作,有时可能会导致异常 (2.1) 或者返回一个  ;Iterable 收集有用的数据 (2.2)。  
  3.  close 方法可能会释放内部资源并且应该始终被调用,即使在 getData期间发生错误之后 执行。
  4. 静态 newConnection 工厂方法 总是返回 Connection 类的新实例。

通常,连接和连接工厂具有更复杂的行为,但为了简单起见,我们将使用这种简单的设计。

 

使用命令式方法,我们可以使用以下代码从连接中接收数据:

try (Connection conn = Connection.newConnection()) {                 // (1)
   conn.getData().forEach(                                           // (2)
       data -> log.info("Received data: {}", data)
   );
} catch (Exception e) {                                              // (3)
   log.info("Error: {}", e.getMessage());
}

前面的代码遵循以下步骤:

  1. 使用 Java 的 try-with-resources 语句创建一个新连接,并在离开当前代码块时自动关闭它。
  2. 获取和处理业务数据。
  3. 如果发生异常,请记录相应的消息。

前面代码的响应式等效代码如下所示:

Flux<String> ioRequestResults = Flux.using(                        // (1)
    Connection::newConnection,                                     // (1.1)
    connection -> Flux.fromIterable(connection.getData()),         // (1.2)
    Connection::close                                              // (1.3)
);

ioRequestResults.subscribe(                                        // (2)
   data -> log.info("Received data: {}", data),                    //
   e -> log.info("Error: {}", e.getMessage()),                     //
   () -> log.info("Stream finished"));                             //

上述代码由以下步骤组成:

  1.  using factory 方法允许将 Connection instance 生命周期与 life-关联起来其包装流的循环。  using 方法需要知道如何创建一次性资源;在这种情况下,它是创建新连接的代码 (1.1)。然后,该方法必须知道如何将刚刚创建的资源转换为 Reactive Stream。在这种情况下,我们调用 fromIterable 方法 (1.2)。最后但同样重要的是,我们如何关闭资源?在我们的例子中,当处理结束时, close 连接实例的方法被调用。
  2. 当然,要开始实际处理,我们需要为 onNextonError 和  < code class="literal">onComplete 信号。

上述代码的成功路径生成以下输出:

IO Connection created
Received data: Some
Received data: data
IO Connection closed
Stream finished

带有模拟错误的执行会生成以下输出:

IO Connection created
IO Connection closed
Error: Communication error

在这两种情况下,using 运算符首先创建一个新连接,然后执行工作流(成功与否),然后关闭先前创建的连接。在这种情况下,连接的生命周期绑定到流的生命周期。运营商还可以选择是否应该在通知订阅者流终止之前或之后进行清理操作。

使用 usingWhen 工厂包装反应式事务

using 运算符类似,usingWhen 运算符允许我们 manage 反应式的资源。但是,using 运算符会同步检索托管资源(通过调用 Callable 实例)。 同时, usingWhen 运算符以反应方式检索托管资源(通过订阅 Publisher 的实例)。此外, usingWhen 运算符接受不同的处理程序来成功和不成功地终止主处理流。这些处理程序由发布者实现。这种区别允许仅使用一个操作员来实现完全非阻塞的反应式事务。 

假设我们有一个完全反应式的事务。出于演示目的,代码被过度简化了。反应式事务实现可能如下所示:

public classTransaction{privatestaticfinal Random random =newRandom();privatefinalint id;publicTransaction(int id){this.id = id;
        log.info("[T: {}] created", id);}publicstatic Mono<Transaction>beginTransaction(){// (1)return Mono.defer(()->
            Mono.just(newTransaction(random.nextInt(1000))));}public Flux<String>insertRows(Publisher<String> rows){// (2)return Flux.from(rows).delayElements(Duration.ofMillis(100)).flatMap(r ->{if(random.nextInt(10)<2){return Mono.error(newRuntimeException("Error: "+ r));}else{return Mono.just(r);}});}public Mono<Void>commit(){// (3)return Mono.defer(()->{
            log.info("[T: {}] commit", id);if(random.nextBoolean()){return Mono.empty();}else{return Mono.error(newRuntimeException("Conflict"));}});}public Mono<Void>rollback(){// (4)return Mono.defer(()->{
            log.info("[T: {}] rollback", id);if(random.nextBoolean()){return Mono.empty();}else{return Mono.error(newRuntimeException("Conn error"));}});}}

 

 

 

 

我们看一下前面的代码:

  1. 这是一个允许创建新事务的静态工厂。
  2. 每个事务都有一种在事务中保存新行的方法。有时,由于某些内部问题(随机行为),该过程会失败。insertRows消耗并返回 Reactive Streams。
  3. 这是一个异步提交。有时,事务可能无法提交。
  4. 这是一个异步回滚。有时,事务可能无法回滚。

现在,使用 usingWhen 操作符,我们可以实现使用以下代码更新的事务:

Flux.usingWhen(
    Transaction.beginTransaction(),// (1)
    transaction -> transaction.insertRows(Flux.just("A","B","C")),// (2)
    Transaction::commit,// (3)
    Transaction::rollback                                            // (4)).subscribe(
    d -> log.info("onNext: {}", d),
    e -> log.info("onError: {}", e.getMessage()),()-> log.info("onComplete"));

前面的代码使用 usingWhen 操作符进行以下操作:

  1. 这里,beginTransaction静态方法通过返回Mono 类型异步返回一个新事务
  2. 对于给定的事务实例,它会尝试插入新行
  3. 如果步骤 (2) 成功完成则提交事务
  4. 如果步骤 (2) 失败则回滚事务

执行我们练习中的代码后,我们应该看到以下输出以表示成功执行:

[T:265] created
onNext: A
onNext: B
onNext: C
[T:265] commit
onComplete

 

 

一个中止事务的执行示例可能如下所示:

[T:582] created
onNext: A
[T:582] rollback
onError: Error: B

使用 usingWhen 操作符,可以更轻松地以完全反应式的方式管理资源生命周期。此外,反应式事务可以很容易地用它来实现。因此,与 using 运算符相比,usingWhen 运算符是一个巨大的改进。

处理错误

当我们设计一个与外部服务进行大量通信的反应式应用程序时,我们必须处理各种异常情况。幸运的是,onError 信号是 Reactive Stream 规范的一个组成部分,所以异常应该总是有办法传播给可以处理它的参与者。但是,如果最终订阅者没有为 onError 信号定义处理程序, onError 会抛出 UnsupportedOperationException

此外,响应式流的语义定义 onError 是一个终端操作,之后响应式序列停止执行。届时,我们可能会通过应用以下策略之一做出不同的反应:

  • Of course, we should define handlers for the onError signal in the subscribe operator.
  • We can catch and replace an error with a default static value or a value calculated from the exception by applying the onErrorReturn operator.
  • We can catch an exception and execute an alternative workflow by applying the onErrorResume operator.
  • We can catch and transform an exception into another exception that better represents the situation by applying the onErrorMap operator.
  • We can define a reactive workflow that, in the event of errors, retries the execution. The retry operator resubscribes to the source reactive sequence if it signals an error. It may behave so indefinitely or for a limited amount of time. The retryBackoff operator gives out-of-the-box support for the exponential backoff algorithm, which retries the operation with increasing delays.

 

此外,空流并不是我们一直想要的。在这种情况下,我们可以使用  defaultIfEmpty 运算符返回一个默认值,或者使用  switchIfEmpty 运算符返回一个完全不同的反应流.

还有一个更方便的操作符,称为 timeout,它允许限制操作等待时间并抛出一个 TimeoutException 异常,这,反过来,我们可以使用其他一些错误处理策略进行处理。

让我们演示一下我们如何 应用一些所描述的策略。我们可以假设以下不可靠的推荐服务:

public Flux<String> recommendedBooks(String userId) {
    return Flux.defer(() -> {                                        // (1)
        if (random.nextInt(10) < 7) {
            return Flux.<String>error(new RuntimeException("Err"))   // (2)
                .delaySequence(Duration.ofMillis(100));
        } else {
            return Flux.just("Blue Mars", "The Expanse")             // (3)
                .delayElements(Duration.ofMillis(50));
        }
    }).doOnSubscribe(s -> log.info("Request for {}", userId));       // (4)
}

我们看一下前面的代码:

  1. 我们延迟计算直到订阅者到达。
  2. 我们的不可靠服务很可能会返回错误。但是,我们通过应用 delaySequence 运算符来及时移动所有信号。
  3. 当客户幸运时,他们会延迟收到他们的建议。
  4. 此外,我们将每个请求记录到服务中。

现在,让我们实现一个客户端来处理我们的不可靠服务 好吧:

Flux.just("user-1")                                                // (1)
    .flatMap(user ->                                               // (2)
        recommendedBooks(user)                                     // (2.1)
        .retryBackoff(5, Duration.ofMillis(100))                   // (2.2)
        .timeout(Duration.ofSeconds(3))                            // (2.3)
        .onErrorResume(e -> Flux.just("The Martian")))             // (2.4)
    .subscribe(                                                    // (3)
        b -> log.info("onNext: {}", b),
        e -> log.warn("onError: {}", e.getMessage()),
        () -> log.info("onComplete")
    );

 

前面的代码执行以下操作:

  1. 在这里,我们生成了一个请求电影推荐的用户流。
  2. 对于每个用户,我们将不可靠的 recommendedBooks 服务称为(2.1)。如果调用失败,我们使用指数退避重试(不超过 5 次重试,从 100 毫秒的持续时间开始)(2.2)。但是,如果我们的重试策略在三秒后没有带来任何结果,则会导致错误信号 (2.3)。最后,如果出现任何错误,我们会使用 onErrorResume 运算符 (2.4) 返回预定义的通用推荐集。
  3. 当然,我们需要创建一个订阅者。

运行时,我们的应用程序可能会生成以下输出:

[time: 18:49:29.543] Request for user-1
[time: 18:49:29.693] Request for user-1
[time: 18:49:29.881] Request for user-1
[time: 18:49:30.173] Request for user-1
[time: 18:49:30.972] Request for user-1
[time: 18:49:32.529] onNext: The Martian
[time: 18:49:32.529] onComplete

从日志中,我们可以看到我们的核心尝试五次获得 user-1 的推荐。此外,重试延迟从约 150 毫秒增加到约 1.5 秒。最后,我们的代码 停止尝试从 recommendedBooks方法检索结果 并返回 (The Martian) 后备值,并完成了流。

总而言之,Project Reactor 提供了广泛的工具来帮助处理异常情况,从而提高应用程序的弹性。

 

 

背压处理

即使 Reactive Stream 规范 要求在生产者和消费者之间的通信中内置背压,仍然可能溢出消费者。一些消费者可能 无辜地请求一个未绑定的需求,然后无法处理生成的负载。一些消费者可能对传入消息的速率有严格的限制。例如,数据库客户端每秒插入的记录数不得超过 1,000 条。在这种情况下,事件批处理技术可能会有所帮助。我们在 批处理流元素 部分中介绍了这种方法。或者,我们可以通过以下方式配置流来处理背压情况:

  • The onBackPressureBuffer operator requests an unbounded demand and pushes returned elements to a downstream. However, if a downstream consumer cannot keep up, elements are buffered in a queue. The onBackPressureBuffer operator has many overrides and exposes many configuration options, which facilitate the tuning of its behavior.
  • The onBackPressureDrop operator also requests an unbounded demand (Integer.MAX_VALUE) and pushes data downstream. If not enough demand is requested from downstream, elements are dropped. It is possible to handle dropped elements with a custom handler.
  • The onBackPressureLast operator works similarly to onBackPressureDrop. However, it remembers the most recently received element and pushes it downstream as soon as the demand appears. It may help to always receive recent data, even in overflow situations.
  • The onBackPressureError operator requests an unbounded demand while trying to push data downstream. If a downstream consumer cannot keep up, the operator raises an error.

另一种管理背压的方法可能是通过限速技术。  limitRate(n) 算子将下游需求拆分成不大于 n 的小批量。通过这种方式,我们可以保护我们的精细生产者免受下游消费者的不合理数据请求。  limitRequest(n) 运算符允许限制来自下游消费者的需求(总请求值)。例如,  limitRequest(100) 确保对生产者的请求不会超过 100 个元素。发送 100 个事件后,操作员成功关闭流。

冷热流

在谈论关于反应式发布者时,我们可能会区分 两种类型的发布商——hotcold< /跨度>。

冷发布者的行为方式是,无论何时出现订阅者,都会为该订阅者生成所有序列数据。此外,对于冷发布者,没有订阅者就不会生成任何数据。例如,以下代码代表冷发布者的行为:

Flux<String> coldPublisher = Flux.defer(() -> {
log.info("Generating new items");
    return Flux.just(UUID.randomUUID().toString());
});

log.info("No data was generated so far");
coldPublisher.subscribe(e -> log.info("onNext: {}", e));
coldPublisher.subscribe(e -> log.info("onNext: {}", e));
log.info("Data was generated twice for two subscribers");

上述代码生成以下输出:

No data was generated so far
Generating new items
onNext: 63c8d67e-86e2-48fc-80a8-a9c039b3909c
Generating new items
onNext: 52232746-9b19-4b5e-b6b9-b0a2fa76079a
Data was generated twice for two subscribers

正如我们所见,每当订阅者出现时,都会生成一个新序列——这些语义可能代表 HTTP 请求。在没有人对结果感兴趣并且每个新订阅者触发 HTTP 请求之前,不会进行调用。

另一方面,热发布者中的数据生成不依赖于订阅者的存在。因此,热门发布者可能会在第一个订阅者之前开始生产元素。此外,当订阅者出现时,热发布者可能不会发送先前生成的值,而只会发送新值。这种语义代表数据广播场景。例如,一旦价格发生变化,热门发布者可能会向其订阅者广播有关当前油价的更新。但是,当订阅者到达时,它只会收到未来的更新,而不是之前价格的历史记录。 Reactor 库中的大多数热门发布者都扩展了 Processor 接口。  处理器 部分介绍了 Reactor 的处理器。但是,justfactory 方法会生成一个热发布者,因为它的值仅在发布者构建时计算一次,并且在新订阅者到达时不会重新计算。

 

我们可以将 just 通过包裹在 defer中转换成一个冷发布者。这样,即使 just 在其初始化时生成值,这样的初始化也只会在出现新订阅时发生。后一种行为由defer工厂方法决定。

流的多播元素

当然,我们可以通过应用 一种反应式转换将冷发布者转换为热发布者。例如, 我们可能希望在几个订阅者都准备好生成数据后立即在几个订阅者之间共享冷处理器的结果。此外,我们不想为每个订阅者重新生成数据。 Project Reactor 具有 ConnectableFlux 正是为了这样的目的。使用 ConnectableFlux,可以生成数据以满足最迫切的需求,并缓存数据 以便所有其他订阅者可以按照他们的速度处理数据。当然,队列的大小和超时时间可以通过 publish and replay 班级。此外, ConnectableFlux 可以通过使用以下方法自动跟踪下游订阅者的数量以在达到所需阈值时触发执行—连接autoConnect(n)、 refCount(n)和 refCount(int, Duration)

让我们用下面的例子来描述 ConnectableFlux 的行为:

Flux<Integer> source = Flux.range(0, 3)
    .doOnSubscribe(s ->
        log.info("new subscription for the cold publisher"));

ConnectableFlux<Integer> conn = source.publish();

conn.subscribe(e -> log.info("[Subscriber 1] onNext: {}", e));
conn.subscribe(e -> log.info("[Subscriber 2] onNext: {}", e));

log.info("all subscribers are ready, connecting");
conn.connect(); 

运行时,前面的代码会产生以下输出:

all subscribers are ready, connecting
new subscription for the cold publisher
[Subscriber 1] onNext: 0
[Subscriber 2] onNext: 0
[Subscriber 1] onNext: 1
[Subscriber 2] onNext: 1
[Subscriber 1] onNext: 2
[Subscriber 2] onNext: 2

 

正如我们所看到的,我们的冷发布者收到了订阅,因此只生成了一次项目。但是,两个订阅者都收到了完整的事件集。

缓存流的元素

使用 ConnectableFlux,很容易实现不同的数据缓存策略。然而,Reactor 已经拥有以 cache 操作符的形式进行事件缓存的 API。在内部, cache 算子使用 ConnectableFlux,所以它的主要附加值是一个流畅而直接的API。我们可以调整缓存可以容纳的数据量以及每个缓存项的过期时间。让我们通过以下示例演示它是如何工作的:

Flux<Integer> source = Flux.range(0, 2)                              // (1)
    .doOnSubscribe(s ->
        log.info("new subscription for the cold publisher"));

Flux<Integer> cachedSource = source.cache(Duration.ofSeconds(1));    // (2)

cachedSource.subscribe(e -> log.info("[S 1] onNext: {}", e));        // (3)
cachedSource.subscribe(e -> log.info("[S 2] onNext: {}", e));        // (4)

Thread.sleep(1200);                                                  // (5)

cachedSource.subscribe(e -> log.info("[S 3] onNext: {}", e));        // (6)

前面的代码执行以下操作:

  1. 首先,我们创建一个冷发布者来生成一些项目。
  2. 我们使用缓存操作符将冷发布者缓存 1 秒。
  3. 我们连接第一个订阅者。
  4. 在第一个订阅者之后,我们连接第二个订阅者。
  5. 我们等待一段时间让缓存的数据过期。
  6. 最后我们连接第三个订阅者。

让我们看看程序的输出:

new subscription for the cold publisher
[S 1] onNext: 0
[S 1] onNext: 1
[S 2] onNext: 0
[S 2] onNext: 1
new subscription for the cold publisher
[S 3] onNext: 0
[S 3] onNext: 1

 

根据日志,我们可以得出结论,前两个订阅者共享第一个订阅的相同缓存数据。然后,在延迟之后,第三个订阅者无法检索缓存的数据,因此为冷发布者触发了新订阅。最后,第三个订阅者也收到了所需的数据,即使该数据没有从缓存中到达。

共享流的元素

使用 ConnectableFlux,我们为一对 订阅者多播事件。但是,我们正在等待订阅者出现,然后才开始处理。 share 运算符允许将冷发布者转换为热发布者。运营商的行为方式为每个新订阅者传播订阅者尚未错过的事件。让我们考虑以下用例:

Flux<Integer> source = Flux.range(0, 5)
    .delayElements(Duration.ofMillis(100))
    .doOnSubscribe(s ->
        log.info("new subscription for the cold publisher"));

Flux<Integer> cachedSource = source.share();

cachedSource.subscribe(e -> log.info("[S 1] onNext: {}", e));
Thread.sleep(400);
cachedSource.subscribe(e -> log.info("[S 2] onNext: {}", e));

在前面的代码中,我们共享了一个冷流,它每 100 毫秒生成一次事件。然后,有一些延迟,几个订阅者订阅了共享发布者。让我们看看应用程序的输出:

new subscription for the cold publisher
[S 1] onNext: 0
[S 1] onNext: 1
[S 1] onNext: 2
[S 1] onNext: 3
[S 2] onNext: 3
[S 1] onNext: 4
[S 2] onNext: 4

从日志中可以看出,第一个订阅者从第一个订阅者开始接收事件,而第二个订阅者错过了在其出现之前产生的事件(S 2只接收到事件< code class="literal">3 和 4)。

 

处理时间

反应式编程是异步的,因此它本质上假定存在时间箭头。

使用 Project Reactor,我们可以使用 interval 运算符根据持续时间生成事件,使用 delayElements 延迟元素 运算符,并使用 delaySequence 运算符延迟所有信号。在本章中,我们已经使用了几个这样的运算符。

我们已经讨论了如何根据配置的超时(buffer(Duration)window(Window) 进行数据缓冲和窗口化运营商)。 Reactor 的 API 允许您对一些与时间相关的事件做出反应,例如前面描述的 timestamp and timeout运营商。与 timestamp 类似,elapsed 运算符测量从前一个事件开始的时间间隔。让我们考虑以下代码:

Flux.range(0, 5)
    .delayElements(Duration.ofMillis(100))
    .elapsed()
    .subscribe(e -> log.info("Elapsed {} ms: {}", e.getT1(), e.getT2()));

在这里,我们每 100 毫秒生成事件。让我们看一下日志输出:

Elapsed 151 ms: 0
Elapsed 105 ms: 1
Elapsed 105 ms: 2
Elapsed 103 ms: 3
Elapsed 102 ms: 4

从前面的输出中可以明显看出,事件不会在 100 毫秒间隔内精确到达。发生这种情况是因为 Reactor 使用 Java 的 ScheduledExecutorService 来处理计划事件,它本身并不能保证确切的延迟。因此,我们应该注意不要对 Reactor 库要求太精确的时间(实时)间隔。

 

组合和转换反应式流

当我们构建complicated响应式工作流时,通常需要使用相同的sequence 几个不同地方的运算符。使用 transform 运算符,我们可以将这些常见的部分提取到单独的对象中,并在需要时重用它们。以前,我们已经在流中转换了事件。使用 transform 操作符,我们可以扩充流结构本身。让我们假设以下示例:

Function<Flux<String>, Flux<String>> logUserInfo =                 // (1)
    stream -> stream                                               //
        .index()                                                   // (1.1)
        .doOnNext(tp ->                                            // (1.2)
            log.info("[{}] User: {}", tp.getT1(), tp.getT2()))     //
        .map(Tuple2::getT2);                                       // (1.3)

Flux.range(1000, 3)                                                // (2)
    .map(i -> "user-" + i)                                         //
    .transform(logUserInfo)                                        // (3)
    .subscribe(e -> log.info("onNext: {}", e));

我们看一下前面的代码:

  1. 我们用 Function<Flux<String>、Flux<String>> 签名定义了 logUserInfo函数。它将 String 值的 Reactive Stream 转换为另一个 Reactive Stream,该流也生成 String 值。在此示例中,对于每个 onNext 信号,我们的函数记录有关用户 (1.2) 的详细信息,另外使用index 运算符 (1.1)。传出的流不包含任何关于枚举的信息,因为我们用 map(Tuple2::getT2)调用(1.3)< /代码>。
  2. 在这里,我们生成一些用户 ID。
  3. 我们通过应用 transform 运算符嵌入了 logUserInfo 函数定义的转换。

让我们执行前面的代码。日志输出如下:

[0] User: user-1000
onNext: user-1000
[1] User: user-1001
onNext: user-1001
[2] User: user-1002
onNext: user-1002

在日志中,我们看到每个元素都由 logUserInfo function 和最终订阅记录。但是, logUserInfo 函数还跟踪事件的索引。

transform 运算符仅在流生命周期的组装阶段更新流行为一次。同时,Reactor 有 compose 操作符,每次订阅者到达时都会进行相同的流转换。让我们用下面的代码来说明它的行为:

Function<Flux<String>, Flux<String>> logUserInfo = (stream) -> {     // (1)
if (random.nextBoolean()) {
return stream
            .doOnNext(e -> log.info("[path A] User: {}", e));
} else {
return stream
            .doOnNext(e -> log.info("[path B] User: {}", e));
}
};

Flux<String> publisher = Flux.just("1", "2")                         // (2)
    .compose(logUserInfo);                                           // (3)

publisher.subscribe();                                               // (4)
publisher.subscribe();

在前面的代码中,我们执行以下操作:

  1. 与前面的示例类似,我们定义了一个转换函数。在这种情况下,函数每次随机选择流变换的路径。两条建议的路径仅在日志消息前缀上有所不同。
  2. 在这里,我们创建了一个生成一些数据的发布者。
  3. 通过 compose 运算符,我们将 logUserInfo 函数嵌入到执行工作流中。
  4. 此外,我们订阅了几次,希望观察不同订阅的不同行为。

 

让我们执行前面的代码,它应该产生以下输出:

[path B] User: 1
[path B] User: 2
[path A] User: 1
[path A] User: 2

日志信息证明第一次订阅触发了path B,而第二次 触发了path A。当然,compose 操作符允许实现比随机选择日志消息前缀更复杂的业务逻辑。 transformcompose 运算符都是强大的工具,可以在反应式应用程序中重用代码。

处理器

Reactive Streams 规范定义了 Processor 接口。一个Processor同时是一个Publisher和一个Subscriber。因此,我们可以订阅 Processor 实例,也可以手动 发送信号(onNext, onError, onComplete) 。 Reactor 的作者建议省略处理器,因为它们很难使用并且容易出错。在大多数情况下,提供的处理器可能会被运算符组合取代。或者,生成器工厂方法(pushcreategenerate)可以更适合适配外部 API。

Reactor 提出了以下几种处理器:

  • Direct processors can only push data through manual user actions by operating with the processor's sink. DirectProcessor and UnicastProcessor are representatives of this group of processors. DirectProcessor does not handle backpressure but may be used to publish events to multiple subscribers.  UnicastProcessor handles backpressure with an internal queue, however, may serve one Subscriberat most.
  • Synchronous processors (EmitterProcessor and ReplayProcessor) may push data both manually and by subscribing to an upstream Publisher. EmitterProcessor may serve multiple subscribers and honor their demands, but may consume data only from one Publisher and in a synchronous manner. ReplayProcessor behaves similarly to EmitterProcessor, however, allows a couple of strategies for caching incoming data.
  • Asynchronous processors (WorkQueueProcessor and TopicProcessor) can push downstream data obtained from multiple upstream publishers. To deal with multiple upstream publishers, these processors use the RingBuffer data structure. These processors have a dedicated builder API because the number of configuration options makes it hard to initialize them. TopicProcessor is Reactive Streams compliant and associates a Thread for each downstream Subscriber to handle interactions there. Consequently, there is a limit to how many downstream subscribers it can serve. WorkQueueProcessor has characteristics similar to TopicProcessor. However, it relaxes some of the Reactive Streams' requirements, which allows a reduction in the size of resources it uses at runtime.

测试和调试 Project Reactor

测试框架伴随着Reactor 图书馆。  io.projectreactor:reactor-test 库提供了所有必要的工具来测试使用 Project Reactor 实现的反应式工作流。 第 9 章, 测试反应式应用程序< /em>,详细介绍了适用于反应式编程的测试技术。

尽管响应式代码并不那么容易调试,但 Project Reactor 提供了必要的技术来简化调试过程。与任何基于回调的框架一样,Project Reactor 中的堆栈跟踪信息量不是很大。他们没有在我们的代码中给出发生异常情况的确切位置。 Reactor 库带有面向调试的汇编时工具功能。  Project Reactor 高级 部分介绍了流生命周期的组装时间阶段的详细信息。 可以使用以下代码激活此功能:

Hooks.onOperatorDebug();

启用后,此功能开始收集将要组装的所有流的堆栈跟踪,稍后此信息可以使用组装信息扩展堆栈跟踪信息,因此有助于更快地发现任何问题。 但是,创建堆栈跟踪的成本很高。因此,它只能以受控方式激活,作为最后的手段。 有关此功能的更多信息,请参阅 Reactor 的文档。

此外,Project Reactor 的 FluxMono types 提供了一个方便的方法,称为 log 。它记录通过操作员的所有信号。许多可用方法的定制提供了足够的自由来跟踪所需的数据,即使在调试情况下也是如此。

反应堆插件

Project Reactor 是一个多功能且功能丰富的库。但是,它不能包含所有有用的反应式实用程序。因此,有一些项目可以在几个领域扩展 Reactor 的功能。官方 Reactor 插件项目(https://github.com/reactor/reactor-addons) 包含几个用于 Reactor 项目的模块。在撰写本文时,Reactor Addons 由以下模块组成 - reactor-adapterreactor-logback反应堆额外

reactor-adapter模块为 RxJava 2 反应式类型和调度器提供了桥梁。此外,该模块允许与 Akka 集成。

reactor-logback模块提供高速异步日志记录。它基于Logback的AsyncAppender和LMAX Disruptor的RingBuffer通过Reactor的Processor

reactor-extra模块包含满足高级需求的附加实用程序。例如,该模块包含TupleUtils类,它简化了围绕Tuple类的代码。我们在第 7 章中解释了如何使用这个类, 反应式数据库访问。此外,该模块还有 MathFlux 类,它可以计算、求和和平均来自数值源的最小值或最大值。 ForkJoinPoolScheduler类将Java的ForkJoinPool适配到Reactor的Scheduler。我们可以使用以下导入将模块添加到我们的 Gradle 项目中:

compile 'io.projectreactor.addons:reactor-extra:3.2.RELEASE'

此外,Project Reactor 生态系统为流行的异步框架和消息代理提供响应式驱动程序。

Reactor RabbitMQ 模块(https://github.com/reactor/reactor-rabbitmq ) 使用熟悉的 Reactor API 为 RabbitMQ 提供反应式 Java 客户端。该模块启用具有背压支持的异步非阻塞消息传递。此外,该模块允许我们的应用程序通过使用 FluxMono类型将RabbitMQ用作消息总线。 The Reactor Kafka 模块(https://github.com/reactor/reactor-kafka) 为 Kafka 消息代理提供类似的功能。

另一个流行的 Reactor 扩展称为 Reactor Netty (https://github.com/reactor/反应堆网络)。它使用 Reactor 的响应类型来适配 Netty 的 TCP/HTTP/UDP 客户端和服务器。 Spring WebFlux 模块在内部将 Reactor Netty 用于非阻塞 Web 应用程序。 第 6 章, WebFlux 异步非阻塞通信,更详细地介绍了这个主题。

 

先进的项目反应堆


在上一节中,我们探讨了响应式类型和 reactive 运算符,它们允许实现许多响应式工作流。现在,我们必须更深入地了解 Reactive Streams 的生命周期、多线程以及 Project Reactor 中的内部优化是如何工作的。

反应式流生命周期

为了了解多线程是如何工作的,以及在 Reactor 中实现了多少内部优化,首先我们要了解 reactive 类型在 Reactor 中。

组装时间

流生命周期的第一部分是 组装时间。正如我们在前面的部分中可能已经注意到的那样,Reactor provide为我们提供了一个流畅的 API,它允许构建复杂的流程的元素处理。乍一看,Reactor 提供的 API 看起来像一个构建器,它在流中组合选定的运算符。我们可能还记得,Builder 模式是可变的,它假设一个终端操作,例如 build 执行另一个对象的构建。与常见的构建器模式相比,Reactor API 提供了不变性。因此,每个应用的运算符都会生成一个新对象。在反应式库中,构建执行流程的过程称为 组装。为了更好地理解组装方法,下面的伪代码演示了如果我们没有 Reactor builder API 时流的组装方式:

Flux<Integer> sourceFlux = new FluxArray(1, 20, 300, 4000);
Flux<String> mapFlux = new FluxMap(sourceFlux, String::valueOf);
Flux<String> filterFlux = new FluxFilter(mapFlux, s -> s.length() > 1) 
...

前面的代码演示了如果我们没有 fluent builder API 时响应式代码的外观。很明显,在引擎盖下,通量是 组成 一个彼此。在组装过程之后, 我们得到一个 Publisher 链, 其中每个新的 Publisher 包装前一个。下面的伪代码演示了这一点:

FluxFilter(
  FluxMap(
    FluxArray(1, 2, 3, 40, 500, 6000)
  )
)

前面的代码显示了结果 Flux 在应用一系列运算符后的样子,例如 just -> 地图 -> 过滤器

在流的生命周期中,该阶段起着重要作用,因为在流组装过程中,我们可以通过检查流的类型来替换操作符。例如 concatWith -> 的序列concatWith -> concatWith 运算符可以很容易地压缩为一个串联。以下代码显示了它是如何在 Reactor 中完成的:

public final Flux<T> concatWith(Publisher<? extends T> other) {
if (this instanceof FluxConcatArray) {
@SuppressWarnings({ "unchecked" })
FluxConcatArray<T> fluxConcatArray = (FluxConcatArray<T>) this;

return fluxConcatArray.concatAdditionalSourceLast(other);
}
return concat(this, other);
}

从前面的代码可以看出,如果当前的Flux是实例FluxConcatArray,那么,就不用创建FluxConcatArray(FluxConcatArray(FluxA, FluxB), FluxC) 我们创建一个 FluxConcatArray(FluxA, FluxB, FluxC) 并以这种方式提高流的整体性能。

此外,在 assemble-time 我们可能会为正在组装的流提供一些 Hooks 并启用一些额外的日志记录、跟踪、指标收集或其他可能有用的重要添加在调试或流监控期间。

总结一下 Reactive Streams 生命周期的 assemble-time 阶段的作用,在那个阶段,我们可以操纵流的构造并应用不同的技术来优化、监控或更好的流调试,这是构建 Reactive 系统的必然部分。

订阅时间

流中执行生命周期的第二个重要阶段是 subscription- 时间。当我们 订阅 到给定的 Publisher时,订阅就会发生。例如,以下代码演示了如何订阅 到上述执行流程:

... 
filteredFlux.subscribe(...);

 

 

正如我们可能记得的那样,为了构建执行流程,我们将 Publishers  传递到彼此内部。所以,我们有一个 Publisher链。有一次,我们subscribe 到顶层包装器,我们开始该链的订阅过程。以下伪代码显示了 Subscriber 在订阅期间如何通过 Subscriber 链传播:

filterFlux.subscribe(Subscriber) {
  mapFlux.subscribe(new FilterSubscriber(Subscriber)) {
    arrayFlux.subscribe(new MapSubscriber(FilterSubscriber(Subscriber))) {
// start pushing real elements here
    }
  }
}

前面的代码显示了 订阅时间 在组装的 Flux中发生了什么。正如我们所见, 过滤后的Flux.subscribe 方法的执行随后为每个 subscribe 方法执行内部 Publisher。最后,当执行在带有注释的行结束时,我们将有以下序列的订阅者相互包裹:

ArraySubscriber(
  MapSubscriber(
    FilterSubscriber(
      Subscriber
    )
  )
)

与组装的 Flux 相比,这里的 ArraySubscriber 包装器位于 订阅者s pyramid 在 Flux pyramid的情况下,我们有 FluxArray  在中间(包装器的倒金字塔)。

订阅时间阶段的重要性在于,在该阶段我们可以进行与汇编时间阶段相同的优化。另一个重要的事情是,一些在 Reactor 中启用多线程的操作符允许更改发生订阅的工作线程。我们将在本章稍后介绍订阅时间优化和多线程,现在切换到流执行生命周期的最后阶段的解释。

 

运行

执行的最后一步是一个 runtime 阶段。在那个阶段,我们在 Publisher 和 Subscriber之间进行了实际的信号交换。我们可能记得在 Reactive Streams 规范中,PublisherSubscriber 交换的前两个信号是 onSubscribe signal 和 request signal。  onSubscribe 方法由顶级源调用,在我们的例子中是 ArrayPublisher。这会将其 Subscription 传递给给定的 Subscriber。描述将 Subscription 传递给 ever Subscribers  的过程的伪代码如下所示:

MapSubscriber(FilterSubscriber(Subscriber)).onSubscribe(
  new ArraySubscription()
) {
  FilterSubscriber(Subscriber).onSubscribe(
    new MapSubscription(ArraySubscription(...))
  ) {
    Subscriber.onSubscribe(
      FilterSubscription(MapSubscription(ArraySubscription(...)))
    ) {
// request data here
    }
  }
}

一旦 Subscription 已通过所有 Subscriber 链和每个 Subscriber在链中将给定的 Subscription 包装成特定的表示。所以最后我们得到了 Subscription 包装器的金字塔,如下面的代码所示:

FilterSubscription(
  MapSubscription(
    ArraySubscription()
  )
)

最后,最后一个 Subscriber 接收 Subscription 链,并且为了开始接收元素应该调用 Subscription#request 启动元素发送的方法。下面的伪代码演示了请求元素的过程是怎样的:

FilterSubscription(MapSubscription(ArraySubscription(...)))
  .request(10) { 
    MapSubscription(ArraySubscription(...)) 
      .request(10) { 
        ArraySubscription(...)
          .request(10) { 
            // start sending data
          }
      } 
  }

一旦所有 Subscribers 通过请求的需求并且 ArraySubscription 收到它,ArrayFlux 可以开始向 MapSubscriber(FilterSubscriber(Subscriber))链发送元素。以下是描述通过所有 Subscriber 发送元素的过程的伪代码:

...
ArraySubscription.request(10) { 
  MapSubscriber(FilterSubscriber(Subscriber)).onNext(1) {
// apply mapper here
    FilterSubscriber(Subscriber).onNext("1") { 
      // filter
      // element does not match
      // request and additional element then
      MapSubscription(ArraySubscription(...)).request(1) {...}
    } 
  } 
  MapSubscriber(FilterSubscriber(Subscriber)).onNext(20) { 
    // apply mapper here
    FilterSubscriber(Subscriber).onNext("20") { 
      // filter
      // element matches
      // send it downstream Subscriber
      Subscriber.onNext("20") {...} 
    } 
  } 
}

从前面的代码中我们可以看到,在运行时,来自源的元素通过 Subscriber 链,在每个阶段执行不同的功能。

理解这个阶段的重要性在于,在运行时我们可能会应用优化来减少信号交换的数量。例如,正如我们将在下一节中看到的,我们可以减少 Subscription#request 调用的数量,从而提高流的性能。

笔记

我们可能记得 第 3 章,  Reactive Streams - 新 Streams 的标准Subscription#request 方法的调用会导致写入到持有需求的 volatile 字段。从计算的角度来看,这样的写入是一项昂贵的操作,因此最好尽可能避免它。

为了总结我们对 Stream 的生命周期和每个阶段的执行情况的理解,我们可以考虑下图:

读书笔记《hands-on-reactive-programming-in-spring-5》Project Reactor - 响应式应用程序的基础

图 4.8。反应流生命周期

总结一下, 我们已经涵盖了 Flux 和 Mono 反应类型。在接下来的部分中,我们将使用流生命周期阶段来阐明 Reactor 如何为每个 Reactive Stream 提供非常有效的实现。

Reactor中的线程调度模型

在本节中,我们将了解 Reactor 为多线程执行提供了哪些功能,以及可用的多线程运算符之间的根本区别是什么。一般来说,有四个运算符允许将 execution 切换到不同的 worker。我们来一一看这些

publishOn 运算符

简而言之, publishOn 操作符允许 runtime 执行的移动部分到一个指定 worker

笔记

我们避免在这里使用单词 线程 因为 调度器的底层机制 可能会将工作排入同一队列  ;线程, 但执行工作可能由不同的 worker 从 调度器 实例的角度。 

为了指定应该在运行时处理元素的worker,Reactor为此引入了一个特定的抽象,称为 Scheduler。 Scheduler 是一个接口,表示 Project Reactor 中的一个工人或工人池。我们将在本章后面介绍 Scheduler,但现在,我们只提一下这个接口用于为 current< /span> 流。为了更好地理解我们如何使用 publishOn 运算符,让我们考虑以下代码示例:

Scheduler scheduler = ...;        // (1)
                                  //
Flux.range(0, 100)                // (2) ¯|
    .map(String::valueOf)         // (3)  |> Thread Main
    .filter(s -> s.length() > 1)  // (4) _|

    .publishOn(scheduler)         // (5)

    .map(this::calculateHash)     // (6) ¯|
    .map(this::doBusinessLogic)   // (7)  |> Scheduler Thread
    .subscribe()                  // (8) _|

从前面的代码我们可以看出,第2步到第4步对元素的操作发生在 Thread Main publishOn operator 在不同的 Scheduler worker 上。这意味着哈希的计算发生在 线程A上,所以 calculateHash 和 doBusinessLogic 在与 Thread Main worker不同的worker上执行。如果我们从执行模型的角度来看 publishOn 操作符,我们可以看到以下流程:

读书笔记《hands-on-reactive-programming-in-spring-5》Project Reactor - 响应式应用程序的基础

图 4.9。 Reactor 的 publishOn 操作符内部的表示

我们可能会注意到, publishOn 操作符的重点是运行时执行。在底层,publishOn 操作符保留了一个队列,它向其中提供新元素,以便专门的工作人员可以使用消息并一一处理。在这个例子中,我们展示了工作在单独的 线程上运行,所以我们的执行被异步边界分割。所以,现在,我们有两个独立处理的流程部分。我们需要强调的一件重要的事情是,Reactive Stream 中的所有元素都是一个一个处理的(不是同时处理的),因此我们可以始终为所有事件定义严格的顺序。此属性也称为 serializability。这意味着,一旦元素到达 publishOn,它就会被入队,一旦轮到它,它就会出队并被处理。请注意,只有一名工作人员专门处理队列,因此元素的顺序始终是可预测的。

与 publishOn 运算符并行化

乍一看, publishOn operator 并没有启用 Reactive 我们可能期望的流元素。尽管如此,Project Reactor 支持的反应式编程范式允许使用 publishOn 运算符对处理流进行细粒度扩展和并行化。例如,让我们首先考虑下图中描述的完全同步处理:

读书笔记《hands-on-reactive-programming-in-spring-5》Project Reactor - 响应式应用程序的基础

图 4.10 Reactive Stream 的完全同步处理

 

从上图中可以看出,我们有一个包含三个元素的处理流程。由于流中元素同步处理的性质,我们必须通过所有转换阶段一个一个地移动元素。但是,为了开始处理下一个元素,我们必须完全处理前一个元素。相反,如果我们在此流程中放置 publishOn ,我们可以潜在地加快处理速度。下图显示了相同的图表,但包含了一个 publishOn 运算符:

读书笔记《hands-on-reactive-programming-in-spring-5》Project Reactor - 响应式应用程序的基础

图 4.11 publishOn 操作符对流处理的影响

从上图可以看出,保持元素的处理时间不变,只在处理阶段之间提供一个异步边界(由publishOn操作符表示),我们可以实现并行处理。现在,处理流程的左侧不需要等待右侧的处理完成。相反,它们可以独立工作,以便正确实现并行处理。

 

subscribeOn 运算符

Reactor 中多线程的另一个重要因素是通过一个名为 subscribeOn 的运算符。与 publishOn 相比, subscribeOn 允许您更改正在发生订阅链的哪个部分的工作人员。当我们从函数的执行中创建流的源时,此运算符会很有用。通常,此类执行发生在订阅时,因此会调用一个为我们提供执行 .subscribe 方法的数据源的函数。例如,让我们看一下下面的代码示例,它展示了我们如何使用 Mono.fromCallable 提供一些信息:

ObjectMapper objectMapper = ...
String json = "{ \"color\" : \"Black\", \"type\" : \"BMW\" }";
Mono.fromCallable(() ->
       objectMapper.readValue(json, Car.class)
    )
    ...

在这里, Mono.fromCallable 允许创建一个 Mono from Callable< ;T> 并将其评估结果提供给每个 订阅者。  Callable instance 在我们调用 .subscribe 方法时执行,所以Mono.fromCallable 在后台执行以下操作:

publicvoidsubscribe(Subscriber actual){
    ...
    Subscription subscription = ...
    try {
        T t = callable.call();
        if (t == null) {
            subscription.onComplete();
        }
        else {
            subscription.onNext(t);
            subscription.onComplete();
        }
    }
    catch (Throwable e) {
        actual.onError(
             Operators.onOperatorError(e, actual.currentContext()));
    }
}

从前面的代码中我们可以看出,callable 的执行发生在 subscribe 方法中。这意味着我们可以使用 publishOn 更改将在其上执行 Callable 的worker .幸运的是, subscribeOn 允许我们指定订阅将发生的工作人员。以下示例显示了我们如何做到这一点:

Scheduler scheduler = ...;
Mono.fromCallable(...)
    .subscribeOn(scheduler)
    .subscribe();

前面的例子展示了我们如何在一个单独的worker上执行给定的 Mono.fromCallable 。在后台, subscribeOn 执行订阅父 Publisher进入Runnable,即指定Scheduler的调度器。如果我们比较 subscribeOn 和 publishOn的执行模型, 我们会看到以下内容:

读书笔记《hands-on-reactive-programming-in-spring-5》Project Reactor - 响应式应用程序的基础

图 4.12 .publishOn 操作符的内部结构 

 

从上图中我们可以看出, subscribeOn 可以部分指定运行时工作者以及订阅时工作者。发生这种情况是因为,随着 subscribe 方法执行的调度,它调度对 Subscription.request()< /code> 方法,所以它发生在  Scheduler instance指定的 worker上。根据 Reactive Streams 规范,Publisher 可能会开始在调用者 Thread 上发送数据,因此后续的 Subscriber.onNext() 将在与初始相同的线程上调用 Subscription.request()  电话。相比之下, publishOn 只能为downstream指定执行行为,不能影响上游执行。

并行算子

除了用于管理我们想要处理部分执行流的线程的重要运算符之外,Reactor 还提供了一种熟悉的工作并行技术。为此,Reactor 有一个名为 .parallel 的运算符,它允许将流拆分到并行子流上并平衡它们之间的元素。以下是使用此运算符的示例:

Flux.range(0, 10000)
    .parallel()
    .runOn(Schedulers.parallel())
    .map()
    .filter()
    .subscribe()

从前面的示例中我们可以看出,.parallel()Flux API 的一部分。我们在这里注意到的一件事是,通过应用 parallel 运算符,我们开始对不同类型的 Flux 进行操作,这就是所谓的  ParallelFlux。  ParallelFlux 是对一组 Flux 的抽象, 在这些 Flux 之间源中的元素 < code class="literal">Flux 是平衡的。然后,通过应用 runOn 操作符,我们可以将 publishOn 应用到内部 Fluxes 并分发与元素相关的工作在不同的工人之间进行处理。

 

 

调度器

Scheduler 是一个接口,它有两个中心方法: Scheduler.schedule 和 Scheduler.createWorker。第一种方法可以安排一个 Runnable 任务,而第二种方法为我们提供了 Worker< /code> 接口,可以用同样的方式调度 Runnable tasks。  Scheduler interface 和 Worker interface 的主要区别在于 Scheduler 接口表示一个工作池,而 Worker 是对 的专用抽象线程 或资源。默认情况下,Reactor 提供了三个中央 Scheduler 接口实现, 列举如下:

  • SingleScheduler allows the scheduling of all possible tasks for one dedicated worker. It is time-capable, so it can schedule periodical events with a delay. This scheduler may be referenced with the Scheduler.single() call.
  • ParallelScheduler works on a fixed size pool of workers (by default, the size is bound to the number of CPU cores). Fits well for CPU bound tasks. Also, by default, handles time-related scheduled events, for example, Flux.interval(Duration.ofSeconds(1)). This scheduler may be referenced with the Scheduler.parallel() call.
  • ElasticScheduler dynamically creates workers and caches thread pools. The maximum number of created thread pools is unbounded, so this scheduler fits well when we need a scheduler for IO-intensive operations. This scheduler may be referenced with the Scheduler.elastic() call.

此外,我们可以实现我们自己的 Scheduler 与我们想​​要的特性。 第 10 章最后,发布它! 提供了一个示例,说明如何为具有广泛监控功能的 Reactor 创建 Scheduler

笔记

要了解有关线程和调度程序的更多信息,请参阅 Project Reactor 文档的以下部分 [http://projectreactor.io/docs/core/release/reference/#schedulers]

 

 

校长背景

Reactor 附带的另一个关键特性是 Context。 Context 是一个沿流传递的接口。  Context 接口的中心思想是提供对一些上下文信息的访问,这些信息可能对稍后在运行阶段访问有用。如果我们有 ThreadLocal 允许做同样的事情,我们可能想知道为什么我们需要这样的功能。例如,许多框架使用 ThreadLocal 以便在用户请求执行时传递 SecurityContext,并使其成为可能在任何处理点访问授权用户。不幸的是,这种概念只有在我们有单线程处理时才有效,因此执行附加到相同的 线程。如果我们开始在异步处理中使用该概念, ThreadLocal 将会很快松动。例如,如果我们执行如下操作,那么我们将丢失可用的 ThreadLocal

classThreadLocalProblemShowcase{                             

   publicstaticvoidmain(String[] args){
      ThreadLocal<Map<Object, Object>> threadLocal =           // (1)new ThreadLocal<>();                                  //
      threadLocal.set(new HashMap<>());                        // (1.1)

      Flux                                                     // (2)
         .range(0, 10)                                         // (2.1)
         .doOnNext(k ->                                        //
            threadLocal                                        //
               .get()                                          //
               .put(k, new Random(k).nextGaussian())           // (2.2)
         )                                                     //
         .publishOn(Schedulers.parallel())                     // (2.3)
         .map(k -> threadLocal.get().get(k))                   // (2.4)
         .blockLast();                                         //
   }
}

 

以下是对上述代码的说明:

  1. 此时,我们有一个 ThreadLocal 实例的声明。此外,在第 (1.1) 点,我们已经设置了 ThreadLocal,因此我们可以稍后在代码中使用它。
  2. 这里我们有 Flux stream 声明,它生成从 0 到 9 (2.1) 的一系列元素。此外,对于流中的每个新元素,我们生成一个 randomGaussian double,其中元素是生成随机值的种子。生成数字后,我们将其放入存储在 ThreadLocal map中。然后,在第 (2.3) 点,我们将执行移至不同的 线程。最后,在第 (2.4) 点,我们将流中的数字映射到先前存储在 ThreadLocal map 随机高斯双精度数中。此时,我们会得到 NullPointerException,因为之前存储在 Thread Main中的map在不同的地方不可用  线程

从前面的例子中我们可能会注意到,在多线程环境中使用 ThreadLocal 是非常危险的,可能会导致不可预知的行为。尽管 Java API 允许将 ThreadLocal 数据从 Thread 到 Thread,它不保证它在各处的一致传输。

幸运的是,Reactor Context 通过以下方式解决了这个问题:

Flux.range(0, 10)                                               //
    .flatMap(k ->                                               //
       Mono.subscriberContext()                                 // (1)
           .doOnNext(context -> {                               // (1.1)
              Map<Object, Object> map = context.get("randoms"); // (1.2)
              map.put(k, new Random(k).nextGaussian());         //
           })                                                   //
           .thenReturn(k)                                       // (1.3)
    )                                                           //
    .publishOn(Schedulers.parallel())                           //
    .flatMap(k ->                                               //
       Mono.subscriberContext()                                 // (2)
           .map(context -> {                                    //
              Map<Object, Object> map = context.get("randoms"); // (2.1)return map.get(k);                                // (2.2)
           })                                                   //
    )                                                           //
    .subscriberContext(context ->                               // (3)
       context.put("randoms", new HashMap())                    //
    )                                                           //
    .blockLast();                                               //

 

以下是对上述代码的说明:

  1. 这里是我们如何访问 Reactor 的 Context 的示例。正如我们所见,Reactor 使用静态运算符 subscriberContext 提供了对当前流中 Context 的实例的访问.与前面的示例一样,一旦 上下文 实现(1.1),我们访问存储的 地图 (1.2)并将生成的值放在那里。最后,我们返回flatMap的初始参数。
  2. 在这里,我们在切换 Thread 后再次访问 Reactor 的 Context。尽管此示例与我们之前使用 ThreadLocal 的示例相同,但在点 (2.1) 我们将成功检索存储的地图并生成随机高斯 double(2.2) .
  3. 最后,在这里,为了制作 "randoms", key 返回一个 Map 我们在上游填充一个新的 Context 包含 Map 在所需键下的实例。

我们可以从前面的示例中看到,Context 可以通过无参数 Mono.subscriberContext 操作符和可以使用单参数 subscriberContext(Context) 运算符提供给流。

看前面的示例,我们可能会想,为什么我们需要使用 Map 来传输数据,因为 Context  interface 具有与 Map interface 类似的方法。就其本质而言, Context 设计为不可变对象,一旦我们向其中添加新元素,我们就会实现 Context的新实例 。这样的设计决定是有利于多线程访问模型的。这意味着,向流提供 Context 并动态提供一些数据的唯一方法,这些数据将在组装或订阅期间的整个运行时执行期间可用.如果在汇编期间提供了 Context,那么所有订阅者将共享相同的静态上下文,这在每个 订阅者(可能代表用户连接)应该有自己的Context。因此,每个Subscriber 可以提供其上下文的唯一生命周期周期是订阅时间。

 

 

我们可能还记得前面几节。在订阅期间 Subscriber通过 Publisher链从流的底部提升到顶部并被包裹在每个阶段到一个本地 Subscriber 表示引入额外的运行时逻辑。为了保持该过程不变并允许通过流传递额外的 Context 对象,Reactor 使用了 Subscriber 的特定扩展 接口调用 CoreSubscriber。 CoreSubscriber 允许传输 Context  作为它的字段。下面显示了 CoreSubscriber 界面的外观:

interfaceCoreSubscriber<T> extendsSubscriber<T> {
   default Context currentContext(){
      return Context.empty();
   }
}

从前面的代码我们可以看出, CoreSubscriber 引入了一个额外的方法,叫做 currentContext。这提供了对当前 Context 对象的访问权限。 Project Reactor 中的大多数操作符都提供了 CoreSubscriber 接口的实现,并引用了下游 Context .我们可能注意到,唯一允许修改当前的 Context 是 subscriberContext, CoreSubscriber 被合并的下游 Context 和作为参数传递。

此外,这种行为意味着可访问的 Context 对象在流中的不同点可能不同。例如,以下显示了上述行为:

voidrun(){
   printCurrentContext("top")
   .subscriberContext(Context.of("top", "context"))
   .flatMap(__ -> printCurrentContext("middle"))
   .subscriberContext(Context.of("middle", "context"))
   .flatMap(__ -> printCurrentContext("bottom"))
   .subscriberContext(Context.of("bottom", "context"))
   .flatMap(__ -> printCurrentContext("initial"))
   .block();
}
voidprint(String id, Context context){
   ...
}
Mono<Context> printCurrentContext(String id){
   return Mono
      .subscriberContext()
      .doOnNext(context -> print(id, context));
}

前面的代码展示了我们如何在流构建过程中使用Context。如果我们运行上述代码,控制台中将出现以下结果:

top {
  Context3{bottom=context, middle=context, top=context}
}

middle {
  Context2{bottom=context, middle=context}
}

bottom {
  Context1{bottom=context}
}

initial {
  Context0{}
}

从前面的代码我们可以看出,流的 Context  顶部 包含了整个 Context  在这个流中可用,中间只能访问那些 Context,它是在流的下游定义的,而上下文消费者在最底部(带有id  ;initial) 完全有空的上下文。

总的来说, Context  是一个杀手级功能,它将 Project Reactor 推向了构建反应式系统的下一级工具。此外,这种特性在我们需要访问一些上下文数据的许多情况下很有用,例如在处理用户请求的过程中。正如我们将在 第 6 章: WebFlux 异步非阻塞通信,这个特性在 Spring Framework 中被广泛使用,尤其是在响应式 Spring Security 中。

尽管我们广泛地介绍了 Context 功能,但这种 Reactor 的技术仍有巨大的可能性和用例。要了解有关 Reactor 的 Context 的更多信息,请参阅 Project Reactor 文档的以下部分:http://projectreactor.io/docs/core/release/reference/#context

项目反应堆的内部结构

正如我们在上一节中看到的,Reactor 有很多有用的运算符。此外,我们可能注意到整个API 具有与RxJava 中类似的运算符。但是,老一代库和包括 Project Reactor 3 在内的新库之间的主要区别是什么?这里最关键的突破是什么?最显着的改进之一是 Reactive Stream 生命周期算子融合。在上一节中,我们已经介绍了 Reactive Streams 的生命周期;现在,我们来看看 Reactor 的算子融合。

宏观融合

Macro-fusion 主要发生在组装时,其目的是替换 一个运算符与另一个运算符。例如,我们已经看到 Mono 高度优化,只处理一个或零个元素。同时, Flux 中的部分操作符也应该处理一个或零个元素(例如,操作符 just(T) empty() 和 error(Throwable))。在大多数情况下,这些简单的运算符与其他转换流程一起使用。因此,减少此类开销至关重要。为此,Reactor 在组装时提供优化,如果它检测到 上游 Publisher 实现了诸如 CallableScalarCallable,上游的 Publisher 将被替换为优化的运算符。将应用此类优化的示例是以下代码:

Flux.just(1)
    .publishOn(...)
    .map(...)

前面的代码显示了一个非常简单的示例,其中元素的执行应在元素创建后立即移至不同的工作人员。如果没有应用优化,这样的执行会分配一个队列来保存 来自不同工作人员的元素,加上从这样的队列中入队和出队的元素会导致一些不稳定的读取和写入,因此执行这种普通的 Flux 转换过多。幸运的是,我们可以优化该流程。因为无论哪个worker执行发生并且提供一个元素可以表示为 ScalarCallable#call,所以我们可以替换带有 subscribeOn  的 publishOn 运算符不需要创建额外的队列。此外,下游的执行不会因为应用优化而改变,因此我们将通过运行优化流获得相同的结果。

前面的例子是 Project Reactor 中隐藏的 Macro-fusion 优化之一。在 Assembly-time 部分,我们提到了此类优化的另一个示例。一般来说,在 Project Reactor 中应用 Macro-fusions 的目的是优化组装流程,而不是仅仅使用强大的工具来敲钉子,我们可以使用更原始且成本更低的解决方案。

微融合

Micro-fusion是一种比较复杂的优化,与运行时相关优化 和共享资源的重用。微融合的一个很好的例子是条件算子。为了理解这个问题,让我们看一下下图:

读书笔记《hands-on-reactive-programming-in-spring-5》Project Reactor - 响应式应用程序的基础

图 4.13。卡车示例的条件问题

让我们想象一下下面的情况。商店订购了n 件商品。一段时间后,工厂将货物用卡车运送到商店。但是,为了最终到达 B 店,卡车必须通过检验部门并确认所有物品都符合预期的质量。不幸的是,有些物品没有仔细包装,只有部分订单到达了商店。之后,工厂又准备了另一辆卡车,再次送到商店。这种情况反复发生,直到所有订购的物品都到达商店。幸运的是,工厂意识到他们花了很多时间和金钱通过单独的检验部门交付物品,并决定从检验部门聘请自己的当地检验员(图 4.14):

读书笔记《hands-on-reactive-programming-in-spring-5》Project Reactor - 响应式应用程序的基础

图 4.14。通过工厂方面的专门检查员解决了有条件的开销。

 

 

现在,所有物品都可以在工厂进行验证,然后发送到商店,而无需访问检验部门。

这个故事与编程有何关联?让我们看一下下面的例子:

Flux.from(factory).filter(inspectionDepartment).subscribe(store);

在这里,我们有类似的情况。下游订阅者向源请求了一定数量的元素。在通过运算符链发出元素时,元素正在通过条件运算符移动,这可能会拒绝某些元素。为了满足下游需求,每个被拒绝项目的过滤器运算符必须在上游执行一个额外的request(1)调用。根据当前响应式库(如 RxJava 或 Reactor 3)的设计,request 操作有其自身额外的 CPU 开销。

笔记

根据 David Karnok 的研究,每个"...对 request() 的调用通常都以原子 CAS 循环结束,每个丢弃的元素花费 21-45 个循环。”

这意味着条件运算符,例如 filter 运算符,可能会对整体性能产生重大影响!因此,有一种称为ConditionalSubscriber的微融合。这种类型的优化使我们能够在源端验证条件并发送所需数量的元素,而无需额外的request调用。

第二种微融合是最复杂的一种。这种融合与算子之间的异步边界有关,在第 3 章, Reactive Streams - 新 Streams 的标准。为了理解这个问题,让我们想象一个带有一些异步边界的操作符链,如下例所示:

Flux.just(1,2,3).publishOn(Schedulers.parallel())// (1).concatMap(i -> Flux.range(0, i)
                       .publishOn(Schedulers.parallel()))            // (2).subscribe();

前面的示例显示了 Reactor 的运营商链。这个链包括两个异步边界,这意味着队列应该出现在这里。例如,concatMap运算符的本质是它可能会产生n 元素在来自上游的每个传入元素上。因此,无法预测内部通量会产生多少元素。为了处理背压并避免压倒消费者,有必要将结果放入队列中。  publishOn operator 还需要一个内部队列来将 Reactive Stream 中的元素从一个工作线程传输到另一个工作线程。除了队列开销之外,还有更多危险的request()调用通过异步边界。这些可能会导致更大的内存开销。为了理解这个问题,让我们看一下下图:

读书笔记《hands-on-reactive-programming-in-spring-5》Project Reactor - 响应式应用程序的基础

图 4.15。没有优化的异步边界开销

前面的示例扩展了前面代码片段的内部行为。在这里,我们在 concatMap 的内部有很大的开销,我们需要发送一个 请求 每个内部流,直到下游需求得到满足。每个拥有队列的算子都有自己的 CAS 循环,如果请求模型不合适,可能会导致显着的性能开销。例如, request(1) 或与数据总量相比足够小的任何其他数量的元素可能被认为是不合适的请求模型。

 

笔记

CAS (compare-and-swap) 是单个操作,根据操作的成功返回值 1 或 0。由于我们希望操作成功,因此我们重复 CAS 操作,直到成功。这些重复的 CAS 操作称为 CAS 循环

为了防止内存和性能开销,我们应该按照 Reactive Streams 规范的建议切换通信协议。假设边界或边界内的元素链具有共享队列,并切换整个运算符链以使用上游运算符作为队列,而无需额外的request 调用可以显着提高整体性能。因此,如果该值不可用于指示流的结束,则下游可能会从上游排出值,直到它返回 null。为了通知下游 元素可用,上游调用下游的onNextwithnull值作为该协议的特定排除.此外,错误情况或流的完成将照常通过onErroronComplete得到通知。因此,可以通过以下方式优化前面的示例:

读书笔记《hands-on-reactive-programming-in-spring-5》Project Reactor - 响应式应用程序的基础

图 4.16。队列订阅融合与协议切换

在此示例中,publishOnconcatMap 运算符可能会得到显着优化。在第一种情况下,没有中间操作符,必须在主线程中执行。因此,我们可以直接使用 just 操作符作为队列,并在单独的线程中从其中pull。在 concatMap 的情况下,所有内部流也可以被视为队列,因此每个流都可以在没有任何额外的 request 的情况下被排空 电话。

 

 

笔记

我们应该注意到,没有什么可以阻止 publishOnconcatMap 使用优化的协议进行通信,但在撰写本文时,此类优化没有实现,所以我们决定按原样公开通信机制。

总而言之,正如我们从本节中看到的那样,Reactor 库的内部结构比乍看之下还要复杂。凭借强大的优化,Reactor 远远领先于 RxJava 1.x,从而提供了更好的性能。

概括


在本章中,我们涵盖了许多主题。我们简要概述了 Reactor 的历史,以确定 另一个反应式库, Project Reactor 背后的动机。我们还研究了这个库最重要的里程碑——构建这样一个多功能且强大的工具所需的里程碑。此外,我们还概述了 RxJava 1.x 实现的主要问题,以及早期 Reactor 版本的问题。 通过查看 Reactive Streams 规范之后 Project Reactor 中发生的变化 我们强调了原因反应式编程——如此高效和直接——需要如此具有挑战性的实现。

我们还描述了 MonoFlux 反应类型,以及创建、转换和使用的不同方式反应性流。我们通过Subscription对象查看了一个正在运行的流并使用拉-推模型控制背压。我们还描述了操作符融合如何提高反应流的性能。总之,Project Reactor 库为反应式编程以及异步和 IO 密集型应用程序提供了强大的工具集。

在接下来的章节中,我们将介绍 Spring 框架是如何被改进的,以便在一般情况下利用反应式编程的力量,特别是 Project Reactor。我们将专注于使用 Spring 5 WebFlux 和 Reactive Spring Data 构建高效的应用程序。