vlambda博客
学习文章列表

读书笔记《hands-on-reactive-programming-in-spring-5》反应式流 - 新流的标准

第 3 章反应式流 - 新流的标准

在本章中,我们将讨论上一章中提到的一些问题,以及多个响应式库在一个项目中相遇时出现的问题。我们还将深入研究反应系统中的背压控制。在这里,我们将监督 RxJava 提出的解决方案及其局限性。我们将探索 Reactive Streams 规范如何解决这些问题,学习该规范的要点。我们还将介绍新规范带来的反应性景观 变化 。最后,为了巩固我们的知识,我们将构建一个简单的应用程序并在其中组合几个反应式库。

本章涵盖以下主题: 

  • Common API problems
  • Backpressure control problems
  • Reactive Stream examples
  • Technology compatibility problems
  • Reactive Streams inside JDK 9
  • Advanced concepts of Reactive Streams
  • Reinforcement of reactive landscape
  • Reactive Streams in action

 

每个人的反应


在前面的章节中,我们已经学到了很多关于Spring响应式编程的令人兴奋的东西,以及 RxJava的作用  ;在它的故事中播放。我们还研究了使用反应式编程来实现反应式系统的必要性。我们还看到了对响应式环境和 RxJava 可用替代方案的简要概述,这使得快速开始响应式编程成为可能。

API的不一致问题

一方面,extensive 的竞争库列表,例如 RxJava 和 Java Core 库的特性,例如 < code class="literal">CompletableStage,让我们可以选择编写代码的方式。 例如,我们可能依赖 RxJava 的 API 来编写 一个 流程正在处理的项目。因此,要构建一个简单的异步请求-响应交互,依赖 CompletableStage就绰绰有余了。或者,我们可以使用特定于框架的类 例如 org.springframework.util.concurrent.ListenableFuture 来构建组件之间的异步交互并简化该框架的工作。

另一方面, 丰富的选择 可能很容易使系统过于复杂。例如,两个依赖于异步非阻塞通信概念但具有不同 API 的库的存在导致提供额外的实用程序类,以便将一个回调转换为另一个回调,反之亦然:

interface AsyncDatabaseClient {                                    // (1)
   <T> CompletionStage<T> store(CompletionStage<T> stage);         //
}                                                                  //

final class AsyncAdapters {                                        
   public static <T> CompletionStage<T> toCompletion(              // (2)
                     ListenableFuture<T> future) {                 //
                                                                   //
      CompletableFuture<T> completableFuture =                     // (2.1)    
         new CompletableFuture<>();                                //
                                                                   //
future.addCallback(                                          // (2.2)
         completableFuture::complete,                              //
         completableFuture::completeExceptionally                  //
      );                                                           //
                                                                   //
      return completableFuture;                                    //
   }                                                               //

   public static <T> ListenableFuture<T> toListenable(             // (3)
                     CompletionStage<T> stage) {                   //
      SettableListenableFuture<T> future =                         // (3.1)
         new SettableListenableFuture<>();                         //
                                                                   //
stage.whenComplete((v, t) -> {                               // (3.2)
if (t == null) {                                          //
future.set(v);                                         //
}                                                         //
else {                                                    //
future.setException(t);                                //
}                                                         //
      });                                                          //
                                                                   //
      return future;                                               //
   }                                                               //
}

@RestController                                                    // (4)
public class MyController {                                        //
   ...                                                             //
@RequestMapping                                                 //
public ListenableFuture<?> requestData() {                      // (4.1)
      AsyncRestTemplate httpClient = ...;                          //
AsyncDatabaseClient databaseClient = ...;                    //
                                                                   //
CompletionStage<String> completionStage = toCompletion(      // (4.2)
         httpClient.execute(...)                                   //
      );                                                           //
                                                                   //
      return toListenable(                                         // (4.3)
         databaseClient.store(completionStage)                     //
      );                                                           //
}                                                               //
}                                                                  //

上述代码中的编号点解释如下:

  1. 这是async 数据库客户端的接口声明,它是异步数据库访问可能的客户端接口的代表性示例。
  1. 这是ListenableFutureCompletionStage适配器方法的实现。在 (2.1) 点,为了提供对 CompletionStage 的手动控制,我们创建了它的直接实现,称为 CompletableFuture 通过不带参数的构造函数。为了提供与 ListenableFuture 的集成,我们必须添加回调 (2.2)这里我们直接复用CompletableFuture的API。
  2. 这是 CompletionStage到 ListenableFuture 适配器方法实现。 点 (3.1)我们声明ListenableFuture的具体实现叫做SettableListenableFuture。这允许我们在 (3.2) 点手动提供CompletionStage 执行的结果。
  3. 这是 RestController 的类声明。这里在(4.1)这一点,我们声明了request handler方法,它异步执行并返回ListenableFuture来处理ListenableFuture的结果 以非阻塞方式执行。反过来,为了存储 AsyncRestTemplate 的执行结果,我们必须将其适配到 CompletionStage(4.2)。最后,为了满足 支持的API,我们不得不采用  为ListenableFuture 再次(4.3)存储的结果ListenableFuture代码>。

从前面的示例中可以看出,没有与 Spring Framework 4.x 直接集成ListenableFuture and CompletionStage .此外,该示例并不是反应式编程的常见用法的例外情况。许多库和框架为组件之间的异步通信提供了自己的接口和类,其中包括普通的请求-响应通信以及流处理框架。在许多情况下,为了解决这个问题并使几个独立的库兼容,我们必须提供自己的适应并在一些地方重用它。此外,我们自己的改编可能包含错误并需要额外的维护。 

笔记

在 Spring Framework 5.x 中,ListenableFuture 的 API 得到了扩展,并提供了一个名为 completable 的附加方法来解决那种不兼容。请查看以下链接以了解更多信息:https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/util/concurrent/ListenableFuture.html#completable --

 

这里的核心问题在于,没有一种单一的方法可以让图书馆供应商构建他们的 对齐的 API。例如,我们可能在第 2 章, Reactive Programming in Spring - 基本概念,RxJava 被许多框架所重视,例如 Vert.x、Ratpack、Retrofit 等。

 

反过来,它们都为 RxJava 用户提供了支持并引入了额外的模块,从而可以轻松地集成现有项目。 乍一看,这很奇妙,因为引入 RxJava 1.x 的项目列表广泛 并包括用于 Web、桌面或移动开发的框架。然而,在对开发人员需求的支持背后,许多隐藏的陷阱影响着库供应商。当多个 RxJava 1.x 兼容库在一个地方相遇时,通常会发生的第一个问题是粗略的版本不兼容。由于 RxJava 1.x 随着时间的推移发展得非常快,许多库供应商没有机会将他们的依赖关系更新到新版本。有时,更新会带来许多 内部更改,最终导致某些版本不兼容。因此,拥有依赖于不同版本的 RxJava 1 的不同库和框架可能会导致一些不必要的问题。第二个问题与第一个类似。 RxJava 的定制是非标准化的。在这里, 自定义 指的是 提供Observable的额外实现的能力或者特定的转换阶段,这在 RxJava 扩展开发过程中很常见。由于非标准化 API 和快速发展的内部结构,支持自定义实现是另一个挑战。

笔记

可以在以下链接中找到版本重大更改的一个很好的示例: https://github.com/ReactiveX/RxJava/issues/802

拉与推

最后,要理解上一节描述的问题,我们还得回溯历史,分析一下最初的交互模型在源和它的订阅者之间。

在整个反应式格局演变的早期,所有库的设计都考虑到数据从源推送到订阅者。做出这个决定是因为在某些情况下纯拉模型效率不够高。这方面的一个例子是当网络上的通信出现在具有网络边界的系统中时。假设我们过滤一个巨大的 数据列表,但只从中提取前十个元素。通过采用 PULL模型来解决这样的问题,我们得到以下代码:

final AsyncDatabaseClient dbClient = ...                           // (1)

public CompletionStage<Queue<Item>> list(int count) {              // (2)
   BlockingQueue<Item> storage = new ArrayBlockingQueue<>(count);  //
CompletableFuture<Queue<Item>> result                           //  
      = new CompletableFuture<>();                                 //
                                                                   //
pull("1", storage, result, count);                              // (2.1)
                                                                   //
   return result;                                                  //
}                                                                  //

void pull(                                                         // (3)
   String elementId,                                               //
Queue<Item> queue,                                              //
CompletableFuture resultFuture,                                 //
   int count                                                       //
) {                                                                //
dbClient.getNextAfterId(elementId)                              //  
           .thenAccept(item -> {                                   //
if (isValid(item)) {                                 // (3.1)
queue.offer(item);                                //
                                                                   //
                 if (queue.size() == count) {                      // (3.2)
resultFuture.complete(queue);                  //
                    return;                                        //
}                                                 //
              }                                                    //
                                                                   //
pull(item.getId(),                                   // (3.3)
queue,                                          //
resultFuture,                                   //
                   count);                                         //
});                                                     //
}                                                                  //

注释代码再次解释如下:

  1. 这是 AsyncDatabaseClient field 声明。在这里,使用该客户端,我们将异步、非阻塞通信与外部数据库连接起来。
  2. 这是 list 方法声明。这里我们通过返回 CompletionStage 作为调用 list方法的结果来声明一个异步合约。反过来,为了聚合拉取结果并将其异步发送给调用者,我们声明 QueueCompletableFuture 来存储接收到的值和然后手动发送收集到的Queue 稍后。在这里,在 (2.1) 我们开始第一次调用 pull 方法。
  1. 这是 pull 方法声明。在该方法中,我们调用AsyncDatabaseClient#getNextAfterId 来执行查询并异步接收结果。然后当接收到结果时,我们在点 (3.1). 的情况下进行过滤的有效项目,我们将其聚合到队列中。此外,在 (3.2) 点, 我们检查是否收集了足够的元素,将它们发送给调用者,然后退出拉取。如果上述任何一个 if 分支被绕过,我们递归调用pull 方法再次(3.3)

从前面的代码可以看出,我们在服务和数据库之间使用了异步、非阻塞的交互。乍一看,这里并没有错。但是,如果我们看下图,我们会看到差距:

读书笔记《hands-on-reactive-programming-in-spring-5》反应式流 - 新流的标准

图 3.1。拉取处理流程示例

从上图中可以看出,逐个请求下一个元素会导致将请求从 Service 传递到数据库。从服务的角度来看,大部分的整体处理时间都浪费在空闲状态。即使那里没有使用资源,由于额外的网络活动,总处理时间也会增加一倍甚至三倍。而且,数据库不知道未来请求的数量,这意味着数据库无法提前生成数据,因此处于空闲状态。这意味着数据库正在等待新的请求,并且在将响应 传递给服务时效率低下,而服务正在处理传入的 响应 然后 然后 请求新的数据部分。 

为了优化整体执行并保持拉取模型为一等公民,我们可以将拉取与批处理结合起来,如下中心示例的修改所示: 

void pull(                                                         // (1)
   String elementId,                                               //
Queue<Item> queue,                                              //
CompletableFuture resultFuture,                                 //
   int count                                                       //
) {                                                                //

dbClient.getNextBatchAfterId(elementId, count)                  // (2)
           .thenAccept(items -> {                                  //
              for(Item item : items) {                             // (2.1)
if (isValid(item)) {                              //
queue.offer(item);                             //
                                                                   //
                    if (queue.size() == count) {                   //
resultFuture.complete(queue);               //
                       return;                                     //
}                                              //
                 }                                                 //
              }                                                    //

pull(items.get(items.size() - 1)                     // (3)
                        .getId(),                                  //
queue,                                          //
resultFuture,                                   //
                   count);                                         //
});                                                     //
} 

同样,以下键解释了代码:

  1. 这与前面示例中的 pull 方法声明相同。
  2. 这是 getNextBatchAfterId 执行。可能会注意到, AsyncDatabaseClient 方法 允许请求特定数量的元素,这些元素以 List<Item> 的形式返回。 。反过来,当数据可用时,它们的处理方式几乎相同,只是创建了一个额外的 for-loop 用于分别处理批处理的每个元素 (2.1)
  3. 这是递归的 pull 方法执行,其设计目的是在之前的 pull 中缺少项目的情况下检索额外的一批项目。

一方面,通过请求一批元素,我们可以显着提高 list 方法执行的性能 并减少整体处理时间。另一方面,交互模型还存在一些差距,可以通过分析下图来发现: 

读书笔记《hands-on-reactive-programming-in-spring-5》反应式流 - 新流的标准

图 3.2。基于批处理的拉取处理流程示例

 

 

正如我们可能注意到的,我们的处理时间仍然有些低效。例如,当数据库查询数据时,客户端仍然处于空闲状态。反过来,发送一批元素比只发送一个元素需要更多的时间。最后,对整批元素的额外请求可能实际上是多余的。例如,如果 只有一个元素 剩下来完成处理并且下一批的第一个元素满足验证,那么其余的项目将被跳过并且完全冗余。

为了提供最终的优化,我们可能会请求一次数据,当它们可用时,源会异步推送它们。以下对代码的修改显示了如何实现这一点:

public Observable<Item> list(int count) {                          // (1)
   return dbClient.getStreamOfItems()                              // (2)
                  .filter(item -> isValid(item))                   // (2.1)
                  .take(count)                                     // (2.2)
}                                                                  //

注释如下:

  1. 这是 list 方法声明。这里,Observable<Item> 返回类型标识元素正在被 推送。
  2. 这是查询流阶段。通过调用 AsyncDatabaseClient#getStreamOfItems 方法,我们订阅了一次数据库。在这里,在 (2.1) 点我们过滤元素,并通过使用 运算符 .take() 根据调用者的要求,获取特定数量的数据。

在这里,我们使用 RxJava 1.x 类作为一等公民来接收 pushed 元素。反过来,一旦满足所有要求,就会发送取消信号,并关闭与数据库的连接。当前的交互流程如下图所示: 

读书笔记《hands-on-reactive-programming-in-spring-5》反应式流 - 新流的标准

图 3.3。 Push 处理流程示例

在上图中,再次优化了整体处理时间。在交互过程中,当服务等待第一个响应时,我们只有一个大空闲。在第一个元素到达后​​,数据库开始在后续元素到来时发送它们。反过来,即使处理可能比查询 the next 元素快一点,服务的整体空闲时间也很短。但是,一旦收集了 required 数量的元素,数据库仍可能生成 过多的元素,这些元素会被服务忽略。

流量控制问题

一方面, 前面的解释可能已经告诉我们 接受的核心原因 PUSH模型是通过将请求量降低到 最小化来优化 整体处理时间。这就是为什么 RxJava 1.x 和类似的库是为推送数据而设计的,这就是为什么流式传输成为分布式系统中组件之间通信的一种有价值的技术。

 

另一方面,仅结合 PUSH 模型,该技术有其局限性。我们可能记得第 1 章,  为什么选择 Reactive Spring?, 消息驱动通信的本质假设作为对请求的响应,服务可能会收到一个异步的、潜在的无限的消息流。这是棘手的部分,因为如果生产者不尊重消费者的吞吐量可能性,它可能会以以下两节所述的方式影响整个系统的稳定性。

慢生产者和快消费者

让我们从最简单的开始。假设我们 生产者速度慢,消费者速度非常快。这种情况可能是由于生产者方面对未知消费者的一些精简假设而出现的。

一方面,这样的配置是一种特殊的商业假设。另一方面,实际运行时间可能不同,消费者的可能性可能会动态变化。例如,我们可能总是通过扩展生产者的数量来增加它们的数量,从而增加消费者的负载。

要解决这样的问题,我们最需要的是实际需求。不幸的是,纯 push 模型无法给我们这样的指标,因此动态增加系统的吞吐量是不可能的。

快生产者和慢消费者

第二个问题要复杂得多。假设我们一个快速生产者和一个慢消费者。这里的问题是生产者发送的数据可能比消费者可以处理的多得多,这可能导致组件在压力下发生灾难性故障。

对于这种情况,一种直观的解决方案是将未处理的元素收集到队列中,这些元素可能位于生产者和消费者之间,甚至可能驻留在消费者端。即使消费者很忙,这种技术也可以通过处理前一个元素或部分数据来处理新数据。 

使用队列处理推送数据的关键因素之一是选择具有适当特征的队列。一般来说,有三种常见的队列类型,在以下小节中进行讨论。

 

无界队列

第一个也是最明显的解决方案是提供一个queue,它的特点是无限大小,或者只是一个无界队列。在这种情况下,所有生成的元素首先存储在队列中,然后由实际订阅者排出。下面的大理石图描述了所提到的交互(图 3.4):

读书笔记《hands-on-reactive-programming-in-spring-5》反应式流 - 新流的标准

图 3.4. 无界队列示例

一方面,使用无界队列处理消息的主要好处是 消息的可传递性,这意味着消费者将在某个时间点处理所有存储的元素。 

另一方面,通过成功实现消息的传递性,应用程序的弹性会降低,因为没有无限资源。例如,一旦达到内存限制,整个系统可能很容易崩溃。

 

有界丢弃队列

或者,为了避免内存溢出,我们可以使用一个队列,如果它已满,它可能会忽略传入的消息。下面的大理石图描绘了一个 queue,它的大小为 2 个元素,其特点是溢出时丢弃元素(图表 3.5 ):

读书笔记《hands-on-reactive-programming-in-spring-5》反应式流 - 新流的标准

图 3.5。具有两个项目的 容量 的丢弃队列示例

一般来说,这种技术尊重资源的限制,并且可以根据资源的容量来配置队列的容量。反过来,当消息的重要性较低时,采用这种队列是一种常见的做法。业务案例的一个示例可能是数据集更改事件流。反过来,每个事件都会触发一些使用整个数据集聚合的统计重新计算,并且与传入的事件数量相比会花费大量时间。在这种情况下,唯一重要的是数据集发生了变化。知道哪些数据受到影响并不重要。

笔记

前面提到的示例考虑了删除最新元素的最简单策略。一般来说,有一些策略可以选择要删除的元素。例如,按优先级丢弃、丢弃最旧的等等。

有界阻塞队列

另一方面,在每条消息都很重要的情况下,这种技术可能不可接受。例如,在支付系统中, 每个用户的 提交的 支付 必须被处理,并且不允许丢弃一些。因此,我们不是丢弃消息并保留有界 queue 作为处理推送数据的方法,而是一旦达到限制,可能会阻止生产者。以能够阻塞生产者为特征的队列通常称为阻塞队列。使用具有三个元素容量的阻塞队列进行交互的示例如下图所示:

读书笔记《hands-on-reactive-programming-in-spring-5》反应式流 - 新流的标准

图 3.6.三项容量阻塞队列示例

不幸的是,这种技术否定了系统的所有异步行为。一般来说,一旦生产者达到队列的限制,它将开始被阻塞并处于该状态,直到消费者耗尽一个元素 并且队列中的空闲空间变得可用。然后我们可以得出结论,最慢消费者的吞吐量限制了系统的整体吞吐量。随后,除了否定异步行为之外,该技术还否定了 有效的资源利用。 因此,如果我们想要实现所有三个:弹性、弹性和响应性,任何情况都是不可接受的。 

此外,队列的存在可能使系统的整体设计复杂化,并增加了在上述解决方案之间寻找权衡的额外责任,这是另一个挑战。

 

一般来说,纯 push 模型的不受控制的语义可能会导致许多不希望的情况。这就是为什么 Reactive Manifesto 提到 机制的重要性,它允许系统 优雅地 响应负载,或者换句话说 需要 背压控制机制。

不幸的是,响应式库类似于 RxJava 1.x,并且不提供这样的标准化功能。 没有明确的 API 可以允许开箱即用地控制背压。

笔记

需要指出的是,在纯 push 模式下,可以使用批处理来稳定生产率。 RxJava 1.x 提供了诸如 .window or .buffer 这样的操作符,使得在运行期间收集元素成为可能。指定时间段到相应的子流或集合。这种技术显示性能突增的一个示例是对数据库的批量插入或批量更新。不幸的是,并非所有服务都支持批处理操作。因此,这种技术的应用确实受到限制。

解决方案

2013 年底,一群天才工程师来自 Lightbend、Netflix 和 Pivotal 聚集在一起解决上述问题并提供 具有标准的 JVM 社区。经过一年的努力,全世界看到了 Reactive Streams 规范的初稿。这个提议背后并没有什么特别之处——概念上的想法是我们在前一章看到的熟悉的 reactive programming patterns  的标准化。在下一节中,我们将详细介绍这一点。

Reactive Streams 规范的基础知识


Reactive Streams 规范定义了四个 主要接口: Publisher, < code class="literal">订阅者、订阅处理器。由于该倡议独立于任何组织发展,它作为一个单独的 JAR 文件提供,其中所有接口都存在于 org.reactivestreams 包中。

 

 

通常,指定的接口与我们之前的类似(例如,在 RxJava 1.x 中)。在某种程度上,这些反映了 RxJava 中著名的类。前两个接口类似于 Observable-Observer,类似于经典的发布者-订阅者模型。因此,前两个被命名为 PublisherSubscriber。要检查这两个接口是否类似于 ObservableObserver,让我们考虑它们的声明:

packageorg.reactivestreams;

public interface Publisher<T> {   
    void subscribe(Subscriber<? super T> s);
}

前面的代码描述了 Publisher 接口的内部结构。可能会注意到,只有一种方法可以注册 Subscriber。与 Observable(旨在提供有用的 DSL)相比,Publisher 代表一个简单的标准化入口点  PublisherSubscriber 连接。与 Publisher 相比, Subscriber 端更像是一个冗长的 API,与我们在  中的几乎相同;来自 RxJava 的Observer 接口:

packageorg.reactivestreams;

public interfaceSubscriber<T> { 
    voidonSubscribe(Subscription s); 
voidonNext(T t); 
voidonError(Throwable t); 
    voidonComplete(); 
}

我们可能已经注意到,除了与 RxJava Observer 中的方法相同的三个方法外,规范还为我们提供了一个新的附加方法,称为 onSubscribe

onSubscribe 方法是一种概念上新的 API 方法,它为我们提供了一种通知订阅者订阅成功。反过来,该方法的传入参数向我们介绍了一个名为 Subscription 的新合约。 为了理解这个想法,让我们仔细看看界面:

packageorg.reactivestreams;

public interface Subscription {   
    void request(long n);   
    void cancel();
}

正如我们可能已经注意到的,Subscription提供了控制元素生成的基础。类似于 RxJava 1.x 的Subscription#unsubscribe(),这里我们有 cancel()方法,允许我们取消订阅从流中,甚至完全取消发布。但是,取消功能带来的最显着改进在于新的request 方法。 Reactive Stream 规范引入了request 方法来扩展能力PublisherSubscriber之间的交互。现在,为了通知 Publisher 应该推送多少数据,Subscriber应该通知 的大小。  通过 request 方法的需求,  并且可以确保传入元素的数量不超过限制。让我们看一下下面的弹珠图来了解底层机制:

读书笔记《hands-on-reactive-programming-in-spring-5》反应式流 - 新流的标准

图 3.7。背压机制

从上图中可以看出,Publisher 现在 保证元素的新部分 只有在订户要求他们。 Publisher 的整体实现取决于Publisher,从纯粹的阻塞等待到复杂的仅根据订阅者的请求生成数据的机制。但是,我们 现在 因为我们有上述保证,所以不必支付额外排队的费用。

此外,与纯 push模型相反,规范为我们提供了混合push- pull 模型,允许适当控制背压。

为了理解混合模型的威力,让我们回顾一下我们之前从数据库流式传输的示例,看看这种技术是否像以前一样高效:

public Publisher<Item> list(int count) {                           // (1)

   Publisher<Item> source = dbClient.getStreamOfItems();           // (2)
   TakeFilterOperator<Item> takeFilter = new TakeFilterOperator<>( // (2.1)
      source,                                                      //
      count,                                                       //
      item -> isValid(item)                                        //
   );                                                              //

   return takeFilter;                                              // (3)
}                                                                  //

关键如下:

  1. 这是列表方法声明。这里我们遵循 Reactive Streams 规范并返回 Publisher<> 接口作为通信的一等公民。
  2. 这是 AsyncDatabaseClient#getStreamOfItems 方法的执行。这里我们使用一个更新的方法,它返回 Publisher<>。在 (2.1) 点, 我们实例化了 TakeFilter 运算符,它们接受应采用的元素数量。此外,我们传递 一个自定义 Predicate 实现,这使得验证流中的传入项目成为可能。
  3. 此时,我们 返回之前创建的 TakeFilterOperator 实例。请记住,即使运算符具有不同的类型,它仍然扩展了 Publisher 接口。

反过来,必须清楚地了解我们自定义的 TakeFilterOperator 的内部结构。以下代码扩展了该运算符的内部结构:

public class TakeFilterOperator<T> implements Publisher<T> {       // (1)
   ...                                                             //

   public void subscribe(Subscriber s) {                           // (2)
source.subscribe(new TakeFilterInner<>(s, take,predicate)); //
   }                                                               //

   static final class TakeFilterInner<T> implements Subscriber<T>, // (3)
                                                    Subscription { //
      final Subscriber<T> actual;                                  //
      final int take;                                              //
      final Predicate<T> predicate;                                //
      final Queue<T> queue;                                        //
      Subscription current;                                        //
      int remaining;                                               //
      int filtered;                                                //
      volatile long requested;                                     //
      ...                                                          //

      TakeFilterInner(                                             // (4)
         Subscriber<T> actual,                                     //
         int take,                                                 //
         Predicate<T> predicate                                    //
      ) { ... }                                                    //

      public void onSubscribe(Subscription current) {              // (5)
         ...                                                       //
         current.request(take);                                    // (5.1)
         ...                                                       //
      }                                                            //

      public void onNext(T element) {                              // (6)
         ...                                                       //
         long r = requested;                                       //
         Subscriber<T> a = actual;                                 //
         Subscription s = current;                                 //

         if (remaining > 0) {                                      // (7)
            boolean isValid = predicate.test(element);             //
            boolean isEmpty = queue.isEmpty();                     //
            if (isValid && r > 0 && isEmpty) {                     // 
               a.onNext(element);                                  // (7.1)
               remaining--;                                        //
               ...                                                 //
            }                                                      //
            else if (isValid && (r == 0 || !isEmpty)) {            //
               queue.offer(element);                               // (7.2)
               remaining--;                                        //
               ...                                                 //
            }                                                      //
            else if (!isValid) {                                   //
               filtered++;                                         // (7.3)
            }                                                      //
         }                                                         //
         else {                                                    // (7.4)
            s.cancel();                                            //
            onComplete();                                          //
         }                                                         //

         if (filtered > 0 && remaining / filtered < 2) {           // (8)
            s.request(take);                                       //
            filtered = 0;                                          //
         }                                                         //
      }
      ...                                                          // (9)
   }                                                               
}                                                                  

前面代码的关键点在下面的列表中解释:

  1. 这是 TakeFilterOperator 类声明。此类扩展 Publisher<>。另外,后面  ... 隐藏了类的构造函数及相关字段。
  2. 这是 Subscriber#subscribe 方法的实现。通过考虑实现,我们可以得出结论,要为流提供额外的逻辑,我们必须将实际的 Subscriber 包装到扩展相同接口的适配器类中。
  3. 这是 TakeFilterOperator.TakeFilterInner 类声明。这个类实现了 Subscriber 接口并扮演着最重要的角色,因为它作为实际的 Subscriber 传递给主源。一旦在onNext中接收到元素,它就会被过滤并传输到下游的Subscriber。反过来,与 Subscriber 接口一起, TakeFilterInner 类实现 Subscription 接口,可以转移到下游Subscriber 从而控制所有下游需求。注意这里, Queue是 ArrayBlockingQueue的实例 大小等于 采取。创建扩展 SubscriberSubscription 接口的内部类的技术是实现中间转换阶段的经典方法。
  4. 这是构造函数声明。正如这里可能注意到的,除了 take 和 predicate 参数,我们还有 actual 通过调用 subscribe() 方法订阅了 TakeFilterOperator 的订阅者实例。
  5. 这是 Subscriber#onSubscribe 方法的实现。此处最有趣的元素位于 (5.1) 处。这里我们执行了第一个 Subscription#request 到远程数据库,这通常发生在第一个 onSubscribe 方法期间调用。
  6. 这是 Subscriber#onNext 调用,它包含元素处理声明所需的有用参数列表。
  1. 这是元素声明的处理流程。在这里,我们在该处理中有四个关键点。一旦remaining 应该取的元素个数大于零,实际的Subscriber 已经请求了数据,元素有效,队列中没有元素,那么我们可以直接将该元素发送到下游(7.1). 如果需求尚未显示,或者队列中有东西,我们必须将该元素排队(以保持顺序)并稍后交付它 (7.2) 。在 元素无效的情况下,我们必须增加 过滤 元素的数量 (7.3) . 最后,如果 remaining 元素个数为零,那么我们有到取消(7.4) 订阅并完成直播.
  2. 这是请求声明的附加数据的机制。在这里,如果 filtered elements  的数量达到限制,我们会从数据库中请求额外的部分数据,而不会阻塞整个过程。
  3. 这是 Subscriber 和 Subscriptions 方法的其余部分。

一般来说,当与数据库的连接是有线的并且TakeFilterOperator实例已经收到 Subscription, 第一个请求将指定数量的元素发送到数据库。紧接着,数据库开始生成指定数量的元素,并在它们到来时推送它们。反过来,TakeFilterOperator 的逻辑指定了应该请求额外数据部分的情况。一旦发生这种情况,下一部分数据的新非阻塞请求将从服务发送到数据库。这里需要注意的是 Reactive Streams规范直接指定Subscription#request 应该是非阻塞性执行,这意味着不推荐阻塞操作或任何在该方法中停止调用者执行线程的操作。

笔记

要获取有关上述行为的更多信息,请参阅以下链接: https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.2/README.md#3.4

最后,下图描绘了服务与数据库之间的整体交互: 

读书笔记《hands-on-reactive-programming-in-spring-5》反应式流 - 新流的标准

图 3.8。混合推挽处理流程

从上图中可能会注意到,由于 Reactive Streams 规范的原因,数据库中的第一个元素可能会稍晚到达PublisherSubscriber 之间的交互合同。请求数据的新部分不需要 中断 或阻塞正在进行的元素处理。因此,整个处理时间几乎不受影响。 

另一方面,在某些情况下,纯 push 模型更可取。幸运的是,Reactive Streams 足够灵活。除了动态 push-pull 模型外,规范还提供了单独的 push pull 模型也是如此。根据文档,要实现纯 push 模型,我们可以考虑请求等于 to 263的需求- 1 (java.lang.Long.MAX_VALUE)。

 

笔记

这个数字可能被认为是无限的,因为对于当前或预期的硬件, 在合理的时间内 实现 263-1的需求是不可行的 (每纳秒 1 个元素需要 292 年)。因此, 允许发布者在此之后停止跟踪需求: https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.2/README.md#3.17

相反,要切换到纯 pull 模型,我们可能会在每次 Subscriber 时请求一个新元素#onNext已被调用。

反应式流规范在行动

一般来说,正如我们在上一节中可能注意到的那样,尽管 Reactive Streams 规范中的接口很简单,  整体概念足够复杂。因此,我们将在一个日常示例中学习这三个接口的中心思想和概念行为。

让我们以新闻订阅为例,以及如何通过新的 Reactive Streams 接口使其变得更智能。考虑以下用于为新闻服务创建 Publisher 的代码:

NewsServicePublisher newsService = new NewsServicePublisher();

现在让我们创建一个 Subscriber 并将其订阅到 NewsService

NewsServiceSubscriber subscriber = new NewsServiceSubscriber(5);
newsService.subscribe(subscriber);
...
subscriber.eventuallyReadDigest();

通过在 newsService 实例上调用subscribe(),我们表现出获取最新消息的愿望。通常,在发送任何新闻摘要之前,高质量的服务会发送一封祝贺信,其中包含有关订阅和订阅取消的信息。此操作与我们的Subscriber#onSubscribe()方法完全相同,该方法通知 Subscriber 订阅成功并赋予他们取消订阅的能力。由于我们的服务遵循 Reactive Streams 规范的规则,它允许客户端一次选择尽可能多的新闻文章。只有在客户端通过调用 Subscription#request 指定摘要的第一部分的数量后,新闻服务才会开始通过Subscriber#onNext方法,然后订阅者可以阅读新闻。

 

 

在这里,e最终意味着在现实生活中我们可能会推迟阅读时事通讯 到晚上或周末,这意味着我们手动检查收件箱中的新闻。从订阅者的角度来看,该逻辑是在  NewsServiceSubscriber#eventuallyReadDigests() 的支持下实现的。一般来说,这种行为意味着用户的收件箱收集新闻摘要,而通常的服务订阅模型很容易溢出订阅者的收件箱。反过来,当新闻服务不经意地向订阅者发送消息而订阅者没有阅读它们时,通常会发生邮件服务提供商将新闻服务电子邮件地址列入黑名单的情况。此外,在这种情况下, Subscriber 可能会错过重要的摘要。即使这没有发生,订阅者也不会因为邮箱里满是来自新闻服务的未读摘要而感到高兴。因此,为了保持订阅者的幸福感,新闻服务需要提供传递新闻的策略。假设时事通讯的阅读状态可以确认服务。在这里,一旦我们确保所有消息都被读取,我们可能会提供一些特定的逻辑,以便仅在前一个消息已被读取时才发送新的新闻摘要。这种机制可以很容易地在规范中实现。下一段代码公开了整个提到的机制的示例:

class NewsServiceSubscriber implements Subscriber<NewsLetter> {    // (1)
   final Queue<NewsLetter> mailbox = new ConcurrentLinkedQueue<>();//
   final int take;                                                 //
   final AtomicInteger remaining = new AtomicInteger();            //
   Subscription subscription;                                      //

   public NewsServiceSubscriber(int take) { ... }                  // (2)

   public void onSubscribe(Subscription s) {                       // (3)
      ...                                                          //
      subscription = s;                                            //
      subscription.request(take);                                  // (3.1)
      ...                                                          //
   }                                                               //

   public void onNext(NewsLetter newsLetter) {                     // (4)
      mailbox.offer(newsLetter);                                   //
   }                                                               //

   public void onError(Throwable t) { ... }                        // (5)
   public void onComplete() { ... }                                //

   public Optional<NewsLetter> eventuallyReadDigest() {            // (6)
      NewsLetter letter = mailbox.poll();                          // (6.1)
      if (letter != null) {                                        //
         if (remaining.decrementAndGet() == 0) {                   // (6.2)
            subscription.request(take);                            //
            remaining.set(take);                                   //
         }                                                         //
         return Optional.of(letter);                               // (6.3)
      }                                                            //
      return Optional.empty();                                     // (6.4)
   }                                                               //
}                                                                  //

关键如下:

  1. 这是实现 Subscriber NewsServiceSubscriber 类声明。在这里,除了普通的类定义,我们还有有用字段的列表(例如 mailbox 由一个 Queue 表示, 或代表当前订阅的 subscription field);换句话说,客户端和新闻服务之间的协议。
  2. 这是 NewsServiceSubscriber 构造函数声明。在这里,构造函数接受一个名为 take  的参数,它表示用户可以 潜在地 一次或在不久的时间阅读的新闻摘要的大小。
  3. 这是 Subscriber#onSubscribe 方法实现。在这里,在 (3.1) 点,连同存储接收到的 Subscription,我们发送较早的 偏好用户的新的读取吞吐量 到服务器。
  4. 这是 Subscriber#onSubscribe 方法实现。新摘要处理的整个逻辑很简单,只是将消息放入 Queue  邮箱的过程。
  5. 这是 Subscriber#onErrorSubscriber#onComplete 方法声明。这些方法在订阅终止时调用。
  6. 这是公共 eventuallyReadDigest 方法声明。首先,要表明邮箱可能是空的,我们依赖Optional。反过来,作为第一步,在 (6.1) 点,我们尝试从邮箱中获取最新的未读新闻摘要。如果邮箱中没有可用的未读时事通讯,我们返回 Optional.empty()(6.4)。在有可用摘要的情况下,我们会减少计数器 (6.2),它表示 以前 被新闻服务请求 的未读消息的数量。如果我们仍在等待一些消息,我们返回已完成的 Optional。否则,我们另外请求摘要的新部分并重置剩余新消息的计数器(6.3)

 

由于规范,第一次调用调用 onSubscribe(),将 Subscription存储在本地,然后通知 Publisher 关于他们准备通过 request() 方法接收新闻通讯的信息。反过来,当第一个摘要到来时,它被存储在 队列中 以供将来阅读,这通常发生在真实邮箱中。毕竟,当订阅者已经阅读了收件箱中的所有摘要时,Publisher 将被通知该事实并准备新的新闻部分。反过来,如果新闻服务更改订阅策略(在某些情况下意味着完成当前用户订阅),那么订阅者将通过onComplete 方法。然后将要求客户端接受新策略并自动重新订阅服务。当onError 可以被处理的一个例子是 (当然)一个包含用户偏好信息的意外删除的数据库。在这种情况下,它可能会被视为失败,然后订阅者会收到一封道歉信,并被要求重新订阅具有新偏好的服务。最后, eventuallyReadDigest 的实现无非是真实用户的操作,例如打开邮箱、查看新邮件、阅读信件并将其标记为已读,或者只是当没有新的交互时关闭邮箱。

正如我们所见,Reactive Streams 乍一看自然适用于并解决无关业务案例的问题。仅仅通过提供这样的机制,我们就可以让我们的订阅者满意,并且不会进入邮箱提供商的黑名单。

处理器概念的引入

我们已经了解了构成 Reactive Streams 规范的主要三个接口。我们还看到了提议的机制如何改进通过电子邮件发送新闻摘要的新闻服务。但是,在本节的开头,就提到了规范中有四个核心接口。最后一个是 PublisherSubscriber 的组合,称为 Processor 。让我们看一下以下实现的代码:

packageorg.reactivestreams;

public interface Processor<T, R> extends Subscriber<T>,
                                         Publisher<R> {
}

对比  PublisherSubscriber,它们是start< /em>end 点按定义,Processor 是为了添加一些PublisherSubscriber 之间的处理阶段。由于 Processor可能代表一些转换逻辑,这使得流式管道行为和业务逻辑流更容易理解。  Processor 使用的光辉示例可能是可以在自定义操作符中描述的任何业务逻辑,也可能是提供流数据的额外缓存,等等.为了更好地理解 Processor的概念应用,让我们考虑一下NewsServicePublisher可以如何改进   ;处理器接口。

可能隐藏在NewsServicePublisher 背后的最简单的逻辑是数据库访问以及时事通讯准备和随后对所有订阅者的多播:

读书笔记《hands-on-reactive-programming-in-spring-5》反应式流 - 新流的标准

图 3.9。新闻服务的邮件流示例

在此示例中,NewsServicePublisher 被拆分为四个附加组件:

  1. 这是 DBPublisher 阶段。在这里,Publisher 负责提供对数据库的访问并返回最新的帖子。 
  2. 这是 NewsPreparationOperator 阶段。这个阶段是一个中间转换,负责聚合所有消息,然后,当从主源发出完成信号时,将所有消息组合到摘要中。请注意,由于聚合性质,此运算符始终产生一个元素。聚合假设存在存储,它可能在队列中或任何其他用于存储接收到的元素的集合中。
  3. 这是 ScheduledPublisher 阶段。这个阶段负责调度周期性任务。在前面提到的情况下,计划任务是 查询数据库(DBPublisher),处理结果 并将接收到的数据合并到下游。请注意, ScheduledPublisher 实际上是一个无限流,合并后的 Publisher 的完成被忽略。在缺少来自下游的请求的情况下,这个Publisher 通过< code class="literal">Subscriber#onError 方法。
  4. 这是 SmartMulticastProcessor 阶段。这个 Processor 在流程中起着至关重要的作用。首先,它缓存最新的摘要。反过来,该阶段支持多播,这意味着无需为每个 Subscriber 单独创建相同的流。此外,如前所述,SmartMulticastProcessor 包括一个智能邮件跟踪机制,并且 仅 为那些阅读过之前摘要的人发送新闻通讯。
  5. 这些是实际的订阅者,实际上是 NewsServiceSubscriber

一般来说,上图显示了简单的 NewsServicePublisher 背后可能隐藏的内容。反过来,该示例公开了 Processor 的实际应用。可能会注意到,我们有三个转换阶段,但只有其中一个是 the  Processor

首先,在我们只需要从 A 到 B 的简单转换的情况下,我们不需要公开Publisher订阅者。  Subscriber接口的存在意味着一旦Processor 订阅了upstream,元素就可以开始来了到 Subscriber#onNext 方法,并且 可能 由于缺少下游Subscriber而可能丢失。反过来,使用这种技术,我们必须记住一个事实,即 处理器 应该在它订阅主出版商

 

然而,这使业务流程过于复杂,并且不允许我们轻松创建适合任何情况的可重用运算符。此外,Processor 实现的构造引入了对独立(来自主Publisher)管理的额外努力;Subscriber 和适当的背压实现(例如,如果需要,使用队列)。随后,由于将处理器作为普通运算符的实现过于复杂,这可能会导致性能下降或仅仅降低整个流的吞吐量。

由于我们知道我们只想将 A 转换为 B,因此我们只想在实际的 Subscriber 调用 Publisher# 时启动流程订阅,我们不想过度复杂化内部实现。因此,多个 Publisher 实例的组合——接受上游作为构造函数的参数并简单地提供适配器逻辑——非常符合要求。

反过来,当我们需要多播元素时,无论是否有订阅者,Processor 都会发光。它还允许某种突变,因为它实现了 Subscriber 接口,该接口有效地允许诸如缓存之类的突变。 

重视我们已经看到 TakeFilterOperator 运算符 and NewsServiceSubscriber 的实现这一事实,我们可以肯定 Publisher、 Subscriber、 和 Processor 的大多数实例的内部结构代码> 与上述示例类似。因此,我们不深入每个类的内部细节,只考虑所有组件的最终组成:

Publisher<Subscriber<NewsLetter>> newsLetterSubscribersStream =... // (1)
ScheduledPublisher<NewsLetter> scheduler =                         //
   new ScheduledPublisher<>(                                       //
      () -> new NewsPreparationOperator(new DBPublisher(...), ...),// (1.1)
      1, TimeUnit.DAYS                                             //
   );                                                              //
SmartMulticastProcessor processor = new SmartMulticastProcessor(); //

scheduler.subscribe(processor);                                    // (2)

newsLetterSubscribersStream.subscribe(new Subscriber<>() {         // (3)
   ...                                                             //
   public void onNext(Subscriber<NewsLetter>> s) {                 //
      processor.subscribe(s);                                      // (3.1)
   }                                                               //
   ...                                                             //
});                                                                //

关键如下:

  1. 这是发布者、运营商和 处理器 声明。这里, newsLetterSubscribersStream 表示订阅邮件列表的用户的无限流。反过来,在 (1.1) 我们声明 Supplier<?扩展 Publisher<NewsLetter>>,它提供 DBPublisher  NewsPreparationOperator。 
  2. 这是 SmartMulticastProcessorScheduledPublisher<NewsLetter> 订阅。该操作立即启动调度程序,而后者又订阅内部 Publisher
  3. 这是 newsLetterSubscribersStream 订阅。这里我们声明匿名类来实现   Subscriber。反过来,在 (3.1) 我们将每个新传入的 Subscriber 订阅到处理器,该处理器进行多播 所有订阅者的摘要。

在此示例中,我们将所有处理器组合在一个链中,按顺序将它们彼此包装或制作 组件 订阅 彼此。

笔记

一般来说,Publisher/Processor 的实现是一个挑战。因此,我们跳过该章中提到的操作符或源代码的实现的详细解释。不过,要了解更多关于编写我们自己的 Publisher 所需的陷阱、模式和步骤,请参阅以下链接:https://medium.com/@olehdokuka/mastering-own-reactive-流实现部分 1-publisher-e8eaf928a78c

总而言之,我们已经介绍了 Reactive Streams 标准的基础知识。我们已经看到了在 RxJava 等库中表达的反应式编程思想向标准接口集的转变。与此同时,我们看到 提到的接口 很容易 允许我们在系统内的组件之间定义一个异步和非阻塞的交互模型。最后,当采用 Reactive Streams 规范时,我们不仅能够在高级架构级别上构建响应式系统,而且还能够在较小组件级别上构建。

Reactive Streams 技术兼容性套件

乍一看,Reactive Streams 似乎并不棘手,但实际上它确实包含很多隐藏的陷阱。除了 Java 接口之外,该规范还包括许多记录在案的实现规则——也许这是最具挑战性的一点。这些规则严格限制了每个接口,并且保留规范中提到的所有行为至关重要。这允许进一步集成来自不同供应商的实现,这不会导致任何问题。这是形成这些规则的基本点。不幸的是,构建一个涵盖所有极端情况的适当测试套件可能比正确实现接口需要更多的时间。另一方面,开发人员需要一个通用工具来验证所有行为并确保反应式库是标准化的并且相互兼容。幸运的是,Konrad Malawski  已经为此实施了一个工具包,其名称为 Reactive Streams Technology Compatibility Kit - 或简称为 TCK。

笔记

要了解有关 TCK 的更多信息,请参阅以下链接:https://github.com/reactive-streams/reactive-streams-jvm/tree/master/tck

TCK 保护所有 Reactive Streams 语句并根据指定规则测试相应的实现。本质上,TCK 是一堆 TestNG 测试用例,应该扩展并准备由相应的PublisherSubscriber进行验证. TCK 包含一个完整的测试类列表,旨在涵盖 Reactive Streams 规范中定义的所有规则。实际上,所有测试都被命名为对应于指定的规则。例如,可能在 org.reactivestreams.tck.PublisherVerification 中找到的示例测试用例之一如下:

...
void
required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements()
throws Throwable {
   ...
ManualSubscriber<T> sub = env.newManualSubscriber(pub);         // (1)
   try {
      sub.expectNone(..., pub));                                   // (2)
      sub.request(1);                                              //
      sub.nextElement(..., pub));                                  // 
      sub.expectNone(..., pub));                                   //
      sub.request(1);                                              //
      sub.request(2);                                              //
      sub.nextElements(3, ..., pub));                              //
      sub.expectNone(..., pub));                                   //
   } finally {
      sub.cancel();                                                // (3)
   }
   ...
}

关键如下:

  1. 这是对经过测试的发布者的手动订阅。 Reactive Streams 的 TCK 提供了自己的测试类,允许验证特定行为。
  2. 这是期望的声明。从前面的代码中可能会注意到,这里我们根据规则 1.01 对给定 Publisher 的行为进行了特定验证。在这种情况下,我们会验证 Publisher 无法发出比  Subscriber 请求的元素更多的信号。
  3. 这是 Subscription 的取消阶段。一旦测试通过或失败,为了关闭打开的资源并完成交互,我们使用 ManualSubscriber 取消订阅 Publisher代码> API。

上述测试的重要性隐藏在验证Publisher的任何实现都应该提供的交互的基本保证之后。此外,  PublisherVerification 中的所有测试用例 确保给定的Publisher 在某种程度上符合 Reactive Streams 规范。在这里,在某种程度上 意味着不可能在完整尺寸中验证所有规则。此类规则的示例是规则 3.04,它指出请求不应执行无法进行有意义测试的繁重计算。

发布者验证

随着对Reactive Streams TCK的importance的理解,还需要掌握工具包使用的基本知识。为了获得有关此工具包如何工作的基本知识,我们将验证我们的新闻服务的组件之一。由于Publisher是我们系统的重要组成部分,我们将从分析开始。我们记得,TCK 提供 org.reactivestreams.tck.PublisherVerification 检查Publisher的基本行为。一般来说,PublisherVerification 是一个抽象类,它要求我们只扩展两个方法。让我们看一下下面的例子,以了解如何编写之前开发的 NewsServicePublisher 的验证:

public class NewsServicePublisherTest                              // (1)
   extends PublisherVerification<NewsLetter> ... {                 //

   public StreamPublisherTest() {                                  // (2)
      super(new TestEnvironment(...));                             //
   }                                                               //

   @Override                                                       // (3)
   public Publisher<NewsLetter> createPublisher(long elements) {   //
      ...
      prepareItemsInDatabase(elements);                            // (3.1)
      Publisher<NewsLetter> newsServicePublisher =                 //
         new NewsServicePublisher(...);                            //
      ...                                                          //
      return newsServicePublisher;                                 //
   }                                                               //

   @Override                                                       // (4)
   public Publisher<NewsLetter> createFailedPublisher() {          //
      stopDatabase()                                               // (4.1)
      return new NewsServicePublisher(...);                        //
   }                                                               //

   ...                                                             //
}

关键如下:

  1. 这是 NewsServicePublisherTest 类声明,它扩展了PublisherVerification 类。
  2. 这是无参数构造函数声明。需要注意的是 PublisherVerification 没有默认构造函数,并要求 实现它的人提供 TestEnvironment< /code> 负责测试的具体配置,如超时配置、调试日志等。
  3. 这是 createPublisher 方法实现。此方法负责生成 Publisher,它会生成给定数量的元素。反过来,在我们的例子中,为了满足测试的要求,我们必须用一定数量的新闻条目填充数据库 (3.1).
  4. 这是 createFailedPublisher 方法实现。在这里,与 createPublisher 方法相反,我们必须提供一个失败的NewsServicePublisher实例。我们有一个失败的 Publisher 的选项之一是当 数据源不可用时,这在我们的例子中会导致 失败NewsServicePublisher(4.1)。 

 

上述测试扩展了运行 NewsServicePublisher验证所需的基本配置。 假设 Publisher< /code> 足够灵活,能够提供给定数量的元素。 换句话说,测试可以告诉 Publisher 它应该产生多少元素  ;以及它是否应该失败或正常工作。 另一方面,有很多具体情况 Publisher仅限于一个元素。例如,我们可能还记得,NewsPreparationOperator 只用一个元素响应 与来自上游的传入元素的数量无关。

通过简单地 按照上面提到的测试配置,我们无法检查那个Publisher的准确性 因为许多测试用例假设 流中存在多个元素.幸运的是,Reactive Streams TCK 尊重这种极端情况,并允许设置一个名为  maxElementsFromPublisher() 的附加方法,该方法返回一个值,该值指示产生的元素的最大数量:

@Override
public long maxElementsFromPublisher() {
    return 1;
}

一方面,通过覆盖该方法,需要多个元素的测试将被跳过。另一方面,响应式流规则的覆盖范围减少了,可能需要实现自定义测试用例。

订阅者验证

上述配置是开始测试生产者行为所需的 minimum。然而,除了  Publisher的实例,我们还有  Subscriber 的实例应该被测试以及。幸运的是, Reactive Stream 规范中的那组规则没有 Publisher 的规则复杂,但仍然需要满足所有要求。

有两种不同的测试套件来测试 NewsServiceSubscriber。第一个称为 org.reactivestreams.tck.SubscriberBlackboxVerification,它允许验证Subscriber,而无需了解或修改其内部结构。当 Subscriber来自外部代码库时,黑盒验证是一个有用的测试工具包,并且没有合法的方式来扩展该行为。另一方面,黑盒验证仅涵盖少数规则,并不能确保实现的完全正确性。要查看如何检查 NewsServiceSubscriber,让我们先实现 Blackbox 验证测试:

public class NewsServiceSubscriberTest                             // (1)    
   extends SubscriberBlackboxVerification<NewsLetter> {            // 

   public NewsServiceSubscriberTest() {                            // (2)
      super(new TestEnvironment());                                //
   }                                                               //

   @Override                                                       // (3)
   public Subscriber<NewsLetter> createSubscriber() {              //
      return new NewsServiceSubscriber(...);                       //
   }                                                               //

   @Override                                                       // (4)
   public NewsLetter createElement(int element) {                  //
      return new StubNewsLetter(element);                          //
   }                                                               //

   @Override                                                       // (5)
   public void triggerRequest(Subscriber<? super NewsLetter> s) {  //
      ((NewsServiceSubscriber) s).eventuallyReadDigest();          // (5.1)
   }                                                               //
}

关键如下:

  1. 这是 NewsServiceSubscriberTest 类声明,它扩展了 SubscriberBlackboxVerification 测试套件。
  2. 这是默认的构造函数声明。在这里,与 PublisherVerification 相同,我们被要求提供特定的 TestEnvironment
  3. 这是 createSubscriber 方法的实现。在这里,该方法返回 NewsServiceSubscriber 实例,应根据规范对其进行测试。
  1. 这是 createElement 方法的实现。在这里,我们需要提供一个方法的实现,它扮演一个新元素工厂的角色,并根据需要生成一个新的 NewsLetter 实例。
  2. 这是 triggerRequest 方法实现。由于黑盒测试假设无法访问内部,这意味着我们无法直接访问隐藏在 Subscriber< 中的 Subscription  /代码>。随后,这意味着我们必须以某种方式通过手动使用给定的 API (5.1).

前面的示例显示了用于 Subscriber 验证的可用 API。除了两个必需的方法, createSubscribercreateElement,还有一个额外的方法来处理  < code class="literal">Subscription#request 外部方法。在我们的例子中,这是一个有用的附加功能,可以让我们模拟真实的用户活动。

第二个测试套件称为 org.reactivestreams.tck.SubscriberWhiteboxVerification。这是与 前一个类似的验证,但要通过 验证, 订阅者应该提供与 WhiteboxSubscriberProbe :

public class NewsServiceSubscriberWhiteboxTest                     // (1)
   extends SubscriberWhiteboxVerification<NewsLetter> {            //
   ...                                                             //

   @Override                                                       // (2)
   public Subscriber<NewsLetter> createSubscriber(                 //
      WhiteboxSubscriberProbe<NewsLetter> probe                    //
   ) {                                                             //
      return new NewsServiceSubscriber(...) {                      //
         public void onSubscribe(Subscription s) {                 //
            super.onSubscribe(s);                                  // (2.1)
            probe.registerOnSubscribe(new SubscriberPuppet() {     // (2.2)
               public void triggerRequest(long elements) {         //
                  s.request(elements);                             //
               }                                                   //
               public void signalCancel() {                        //
                  s.cancel();                                      //
               }                                                   //
            });                                                    //
         }                                                         //
         public void onNext(NewsLetter newsLetter) {               //
            super.onNext(newsLetter);                              //
            probe.registerOnNext(newsLetter);                      // (2.3)
         }                                                         //
         public void onError(Throwable t) {                        //
            super.onError(t);                                      //
            probe.registerOnError(t);                              // (2.4)
         }                                                         //
         public void onComplete() {                                //
            super.onComplete();                                    //
            probe.registerOnComplete();                            // (2.5)
         }                                                         //
      };                                                           //
   }                                                               //
   ...                                                             //
}                                                                  //

关键如下:

  1. 这是 NewsServiceSubscriberWhiteboxTest 类声明,它扩展了 SubscriberWhiteboxVerification 测试套件。
  2. 这是 createSubscriber 方法的实现。此方法的工作原理与 Blackbox 验证相同,并返回 Subscriber 实例,但这里有一个名为 WhiteboxSubscriberProbe 的附加参数。  ;在这种情况下,WhiteboxSubscriberProbe代表了一种机制,可以实现对输入信号的需求和捕获的嵌入式控制。与 Blackbox 验证相比,通过在NewsServiceSubscriber(2.2),(2.3),(2.4),(2.5),测试套件不仅能够发送需求,而且能够验证需求是否得到满足并且所有元素也已收到。反过来,需求调节机制也比以前更加透明。在这里,在 (2.2) 点,我们实现了 SubscriberPuppet,它适应对接收到的 订阅。 

我们可以看到,与黑盒验证相反,白盒需要扩展Subscriber,在内部提供额外的钩子。虽然白盒测试涵盖了更广泛的规则,以确保被测试的 Subscriber 的正确行为, 当我们想让一个类最终成为防止它被扩展。

验证之旅的最后一部分是 处理器的测试。为此,TCK 为我们提供了 org.reactivestreams.tck.IdentityProcessorVerification。这个测试套件可以验证一个Processor,它接收和产生相同类型的元素。在我们的示例中,只有 martMulticastProcessor 以这种方式运行。由于测试套件应该验证 PublisherSubscriber的行为,IdentityProcessorVerification 继承了与PublisherSubscriber 测试类似的配置。因此,我们不会深入了解整个测试的实现细节,而是考虑 SmartMulticastProcessor 验证所需的其他方法:

public class SmartMulticastProcessorTest                           // (1)
   extends IdentityProcessorVerification<NewsLetter> {             //

   public SmartMulticastProcessorTest() {                          // (2)
      super(..., 1);                                               //
   }                                                               //

   @Override                                                       // (3)
   public Processor<Integer, Integer> createIdentityProcessor(     //
      int bufferSize                                               //
   ) {                                                             //
      return new SmartMulticastProcessor<>();                      //
   }                                                               //

   @Override                                                       // (4)
   public NewsLetter createElement(int element) {                  //
      return new StubNewsLetter(element);                          //
   }                                                               //
}

关键如下:

  1. 这是 SmartMulticastProcessorTest 类定义,它扩展了 IdentityProcessorVerification
  2. 这是默认的构造函数定义。正如我们可能从代码中注意到的那样,(以及 TestEnvironment 配置,在该示例中被跳过)我们传递了一个附加参数,该参数指示   处理器必须缓冲而不丢弃。由于我们知道我们的 Processor 只支持一个元素的 缓冲,我们必须在开始任何验证之前手动提供该数字。
  1. 这是 createIdentityProcessor 方法实现,它返回一个被测试的Processor的实例。在这里, bufferSize 表示 处理器必须缓冲而不丢弃的元素数量。我们现在可以跳过 parameter,因为我们知道内部缓冲区大小等于构造函数中预先配置的大小.
  2. 这是 createElement 方法的实现。类似于 Subscriber的验证,我们必须提供工厂方法来创建新元素。 

 

前面的示例显示了 SmartMulticastProcessor 验证的基本配置。由于 IdentityProcessorVerification 扩展了 SubscriberWhiteboxVerification 和 PublisherVerification,所以一般配置是从它们中的每一个合并而来的。

概括地说,我们概述了有助于验证实现的反应式运算符的指定行为的基本测试集。在这里,TCK 可以被认为是初始集成测试。尽管如此,我们应该记住,除了 TCK 验证之外,每个 operator 都应该仔细测试其所需的行为自己的。

笔记

要了解更多关于验证的信息,请访问原始 TCK 页面https://github.com/reactive-streams/reactive-streams-jvm/tree/master/tck。要查看更多 TCK 使用示例,请访问以下 Ratpack 存储库 https://github.com/ratpack/ratpack/tree/master/ratpack-exec/src/test/groovy/ratpack/stream/tck。在以下链接中还有用于验证 RxJava 2 的更广泛的 TCK 使用示例列表:https://github.com/ReactiveX/RxJava/tree/2.x/src/test/java/io/reactivex/tck .

JDK 9

同样,JDK 实施团队也看到了 的价值。规范首次发布后不久,Doug Lee 提出了在 JDK 9 中添加上述接口的提议。该提议得到了当前 Stream API 仅提供拉模型和 推送模型 是这里的缺失点:

"...t没有最好的流利度async/parallel API。 CompletableFuture/CompletionStage 最好地支持期货的延续风格编程, java.util.Stream < span class="emphasis">最好支持(多阶段,可能并行)对集合元素的“拉”式操作。到目前为止,缺少的一个类别是对项目的“推式”操作,因为它们变成可从活跃的来源获得。”

笔记

请注意,在底层,Java Stream API 使用 Spliterator, 这只不过是 Iterator的修改版本,能够并行执行。我们可能还记得, Iterator 不是为推动而设计的,而是为拉动 Iterator#next 方法。类似地, Spliterator 有 tryAdvance 方法,它是  Iterator 的 hasNext 和 next 方法。因此,我们可以得出结论,通常 Stream API 是基于拉取的。

该提案的主要目标是为 JDK 中的反应流指定接口。根据提案, Reactive Streams 规范中定义的所有接口都在 java.util.concurrent.Flow类中作为静态子类提供。一方面,这种改进意义重大,因为 Reactive Streams 成为了 JDK 标准。另一方面,许多供应商已经依赖 org.reactivestreams.* 包中提供的规范。 因为大多数供应商(例如 RxJava)都支持JDK 的几个版本,不可能只实现这些接口和以前的接口。因此,这一改进体现了与 JDK 9+ 兼容并以某种方式将一种规范转换为另一种规范的额外要求。

 

幸运的是,Reactive Streams 规范为此提供了一个额外的模块,它允许将 Reactive Streams 类型转换为 JDK Flow types:

...                                                                // (1)
import org.reactivestreams.Publisher;                              //
import java.util.concurrent.Flow;                                  //
...                                                                //

Flow.Publisher jdkPublisher = ...;                                 // (2)
Publisher external = FlowAdapters.toPublisher(jdkPublisher)        // (2.1)
Flow.Publisher jdkPublisher2 = FlowAdapters.toFlowPublisher(       //
   external                                                        // (2.2)
);                                                                 //

关键如下:

  1. 这些是 import 定义。从导入语句中可能会注意到,我们从原始 Reactive Streams 库中导入了 Publisher Flow,它是 Reactive Streams 的所有接口的访问点,但移植到 JDK 9。
  2. 这是 Flow.Publisher 实例定义。在这里,我们从 JDK 9 定义 Publisher 的实例。反过来,在 (2.1) 点,我们使用 FlowAdapters.toPublisher 原始 Reactive Streams 库中将 Flow.Publisher 转换为 org.reactivestreams.Publisher。此外,出于演示目的,在 (2.2) 行,我们使用 FlowAdapters.toFlowPublisher 方法来转换 org.reactivestreams.Publisher 返回 Flow.Publisher

前面的示例展示了我们如何轻松地将 Flow.Publisher 转换为 org.reactivestreams.Publisher。应该注意 该示例与实际业务用例无关,因为在本书出版时,没有在 JDK 9 Flow API 之上从头开始编写的知名响应式库。因此,没有必要从 Reactive Streams 规范迁移为支持 JDK 6 及更高版本的外部库。然而,在未来,一切都 很可能 改变,反应式库的新迭代肯定会在反应式流规范之上编写并移植到JDK 9。

笔记

请注意,适配器功能是作为单独的库提供的。要查看所有可用的库,请查看以下链接:http:// /www.reactive-streams.org/#jvm-interfaces-completed

 

高级 - 反应式流中的异步和并行


previous 部分中,我们讨论了概念性行为 的反应流。但是,没有提到反应管道的异步和非阻塞行为。因此,让我们深入研究 Reactive Streams 标准并分析这些行为。

一方面,Reactive Streams API 在规则 2.2 和 3.4 中规定,所有由Publisher 产生并由Subscriber 应该是非阻塞和非阻塞的。因此,我们可以确定我们可以有效地利用处理器的一个节点或一个内核,具体取决于执行环境。 

另一方面,所有处理器或内核的有效利用需要并行化。对 Reactive Streams 规范中的并行化概念的通常理解可以解释为 Subscriber#onNext 方法的并行调用。不幸的是,规范在规则 1.3 中声明 on*** 方法的调用 必须在线程中发出信号-安全 方式并且——如果由多个线程执行——使用 外部同步。这假定对所有 on*** 方法进行序列化或简单的顺序调用。反过来,这意味着我们不能创建像 ParallelPublisher 这样的东西并对流中的元素进行并行处理。

因此,问题是:我们如何有效地利用资源?为了找到答案,我们不得不分析一下常见的流处理管道:

读书笔记《hands-on-reactive-programming-in-spring-5》反应式流 - 新流的标准

图 3.10。 Source 和 Destination 之间具有一些业务逻辑的处理流程示例

可能会注意到,通常的处理管道——连同数据源和最终目的地——包括一些处理或转换阶段。反过来,每个处理阶段可能会花费大量的处理时间并拖延其他执行。

 

在这种情况下,解决方案之一是在阶段之间传递异步消息。对于内存流处理,这意味着执行的一部分绑定到一个 Thread,另一部分绑定到另一个 Thread .例如,最终的元素消耗可能是一个 CPU 密集型任务,将在单独的 Thread 上进行合理处理:

读书笔记《hands-on-reactive-programming-in-spring-5》反应式流 - 新流的标准

图 3.11。 Source with Processing 和 Destination 之间的异步边界示例

一般来说,通过在两个独立的 Thread 之间拆分处理,我们在阶段之间设置了异步边界。反过来,通过这样做,我们可以并行化元素的整体处理,因为两个 Thread 可以彼此独立地工作。为了实现并行化,我们必须应用一个数据结构,例如 Queue,来适当地解耦处理。因此,线程 A 内的处理独立地提供项目 to,以及线程内的Subscriber B 独立消费来自的项目,同样的Queue

在线程之间拆分处理会导致数据结构中的额外开销。当然, 由于 Reactive Streams 规范,这样的数据结构总是被绑定的。反过来,数据结构中的项目数通常等于 Subscriber 从它的 请求的批次的大小发布者,以及这个 取决于系统的一般容量。

除此之外,针对 API 实现者和开发人员的主要问题是流处理部分应该附加到哪个异步边界?这里至少会出现三个简单的选择。第一种情况是处理流附加到源资源(图 3.11),并且所有操作都发生在与源相同的边界内。在这种情况下,所有数据都被一个一个地同步处理,因此一个项目在被发送到另一个线程上的处理之前要经过所有处理阶段的转换。与 first 情况的异步边界的第二种和相反配置是当处理附加到 Destination,或 Consumer 的 Thread,并且可以在 元素的生产 是 CPU-密集的任务。 

 

第三种情况发生在生产和消费是 CPU 密集型任务时。因此,运行 中间转换的最有效方式是在单独的线程 对象上运行它:

读书笔记《hands-on-reactive-programming-in-spring-5》反应式流 - 新流的标准

图 3.12。管道各组件之间的异步边界示例

正如我们在上图中看到的,每个处理阶段都可能绑定到一个单独的线程。一般来说,有很多方法可以配置数据流的处理。每个案例都与其最适合的条件相关。例如,当源资源的负载低于目标资源时,第一个示例有效。因此,将转换操作放在源边界内是有利可图的,反之亦然,当目标消耗的资源少于源时,处理目标边界中的所有数据是合乎逻辑的。此外,有时转换可能是资源消耗最高的操作。在这种情况下,最好将转换与 Source 和 Destination 分开。

尽管如此,重要的是要记住,在不同线程之间拆分处理不是免费的,应该在合理的资源消耗以实现边界(线程和额外的数据结构)和高效之间取得平衡。元素处理。反过来,实现这种平衡是另一个挑战,如果没有库的 有用的 API,很难克服其实现和管理。

幸运的是,RxJava 和 Project Reactor 等反应式库提供了这样的 API。我们现在不会详细介绍提议的功能,但会在第 4 章 Project Reactor - 响应式应用程序的基础。

反应式景观的转变


JDK 9 包含 Reactive Streams 规范这一事实加强了它的重要性,并开始改变行业。开源软件行业的领导者(例如 Netflix、Red Hat、Lightbend、MongoDB、Amazon 等)已经开始在他们的产品中采用这种出色的解决方案。

RxJava 变身

通过这种方式,RxJava 提供了一个额外的模块,允许我们轻松地将一个 reactive 类型转换为另一个。下面看看如何将Observable<T>转换为Publisher<T>并采用rx .Subscriber org.reactivestreams.Subscriber

假设我们有一个应用程序使用 RxJava 1.x 和 Observable 作为组件之间的中心通信类型,如下例所示:

interface LogService {
   Observable<String> stream();
}

然而,随着 Reactive Streams 规范的发布,我们决定遵循标准并从以下特定依赖项中抽象出我们的接口:

interface LogService {
   Publisher<String> stream();
}

可能会注意到,我们很容易将 Observable 替换为 Publisher。但是,实现的重构可能比仅仅替换返回类型要花费更多的时间。幸运的是,我们可以随时 轻松地 将现有的 Observable 适配到 Publisher,如下例所示:

class RxLogService implements LogService {                         // (1)
   final HttpClient<...> rxClient = HttpClient.newClient(...);     // (1.1)

   @Override                                                      
   public Publisher<String> stream() {                              
      Observable<String> rxStream = rxClient.createGet("/logs")    // (2)
                                            .flatMap(...)          // 
                                            .map(Utils::toString); //

      return RxReactiveStreams.toPublisher(rxStream);              // (3)
   }
}

关键如下:

  1. 这是 RxLogService 类声明。该类代表旧的基于 Rx 的实现。在 (1.1) 我们使用 RxNetty HttpClient,它允许与外部服务异步交互, 使用包装在基于 RxJava 的 API 中的 Netty 客户端来阻止时尚。
  2. 这是外部请求执行。在这里,使用创建的 HttpClient 实例, 我们从外部服务请求日志流,将传入的元素转换为 String< /代码> 实例。
  3. 这是rxStreamPublisher的改编 使用RxReactiveStreams图书馆。

正如可能注意到的那样,RxJava 的开发人员很关心我们,并提供了一个额外的 RxReactiveStreams 类,从而可以转换  Observable< /code> 进入 Reactive Streams 的 Publisher。此外,随着 Reactive Streams 规范的出现,RxJava 开发人员还提供了对背压的非标准化支持,这使得转换后的 Observable 能够兼容 Reactive Streams规范。 

随着ObservablePublisher的转换,我们也可以转换rx.Subscriber< /code> 到 org.reactivestreams.Subscriber。例如,日志流 以前 存储在文件中。为此,我们有自定义的Subscriber,它负责 I/O 交互。反过来,迁移到 Reactive Streams 规范的代码变形如下所示: 

class RxFileService implements FileService {                       // (1)

   @Override                                                       // (2)
   public void writeTo(                                            //
      String file,                                                 //
      Publisher<String> content                                    //
   ) {                                                             //

      AsyncFileSubscriber rxSubscriber =                           // (3)
         new AsyncFileSubscriber(file);                            //

      content                                                      // (4)
         .subscribe(RxReactiveStreams.toSubscriber(rxSubscriber)); //
   }
}

 

关键如下:

  1. 这是 RxFileService 类声明。
  2. 这是 writeTo 方法实现,它接受 Publisher 作为组件之间交互的中心类型。
  3. 这是基于 RxJava 的 AsyncFileSubscriber 实例声明。
  4. 这是内容 订阅。为了重用基于 RxJava 的 Subscriber,我们使用相同的 RxReactiveStreams 实用程序类对其进行调整。

从前面的示例中我们可以看到, RxReactiveStreams 提供了广泛的转换器列表,使得将 RxJava API 转换为 Reactive Streams API 成为可能。 

同样,任何 Publisher 都可以转换回 RxJava Observable

Publisher<String> publisher = ... 

RxReactiveStreams.toObservable(publisher)
                 .subscribe();

一般来说,RxJava 开始以某种方式遵循 Reactive Streams 规范。不幸的是,由于向后兼容,无法实施规范,并且没有计划扩展 反应式流规范 用于 RxJava 1.x 未来。此外,从 2018 年 3 月 31st 开始,不再有支持 RxJava 1.x 的计划。

幸运的是,RxJava 的第二次迭代带来了新的希望。第二版库之父 Dávid Karnok 显着改进了整个库的设计,并引入了一种符合 Reactive Streams 规范的附加类型。除了 Observable(由于向后兼容性而保持不变)之外,RxJava 2 还提供了称为 Flowable 的新反应类型。

Flowable reactive 类型提供与 Observable 相同的 API,但扩展了 org.reactivestreams。 Publisher 从一开始。如下一个示例所示,它与 fluent API 合并, Flowable可以转换为任何常见的 RxJava 类型并返回到与 Reactive Streams 兼容的类型:

Flowable.just(1, 2, 3)
        .map(String::valueOf)
        .toObservable()
        .toFlowable(BackpressureStrategy.ERROR)
        .subscribe();

正如我们可能注意到的,将 Flowable 转换为 Observable 是一个运算符的简单应用。但是,要将 Observable back 转换为 Flowable,需要提供一些可用的背压策略。在 RxJava 2 中,Observable被设计为push-only流。因此,保持转换后的Observable符合 Reactive Streams 规范至关重要。

笔记

BackpressureStrategy 指的是生产者不尊重消费者需求时所采取的策略。换句话说,BackpressureStrategy 定义了当我们有一个快速生产者和一个慢消费者时流的行为。我们可能还记得,在本章的开头,我们讨论了相同的案例并考虑了三个中心策略。这些策略包括元素的无限缓冲、溢出时丢弃元素或基于消费者需求不足而阻塞生产者。一般来说,BackapressureStrategy以某种方式反映了所有描述的策略,除了阻塞生产者的策略。它还提供了诸如 BackapressureStrategy.ERROR 之类的策略,当需求不足时,它会向消费者发送错误并自动断开连接。我们不会在本章中详细介绍每个策略,但会在第 4 章 Project Reactor - 响应式应用程序的基础。

Vert.x 调整

随着 RxJava 的转型,其余的 reactive 库和框架供应商也开始采用 Reactive Streams 规范。遵循规范,Vert.x 包含一个额外的模块,该模块提供对 Reactive Streams API 的支持。以下示例演示了此添加:

...                                                                // (1)
.requestHandler(request -> {                                       //

   ReactiveReadStream<Buffer> rrs =                                // (2)
      ReactiveReadStream.readStream();                             //
   HttpServerResponse response = request.response();               //

   Flowable<Buffer> logs = Flowable                                // (3)
      .fromPublisher(logsService.stream())                         //
      .map(Buffer::buffer)                                         //
      .doOnTerminate(response::end);                               //

   logs.subscribe(rrs);                                            // (4)

   response.setStatusCode(200);                                    // (5)
   response.setChunked(true);                                      //
   response.putHeader("Content-Type", "text/plain");               //
   response.putHeader("Connection", "keep-alive");                 //

   Pump.pump(rrs, response)                                        // (6)
       .start();                                                   //
})
...

关键如下:

  1. 这是请求处理程序声明。这是一个通用请求处理程序,允许处理发送到服务器的任何请求。 
  2. 这是 Subscriber 和 HTTP 响应声明。这里 ReactiveReadStream 实现了 org.reactivestreams.SubscriberReadStream ,它允许将任何 Publisher 转换为与 Vert.x API 兼容的数据源。 
  3. 这是处理流程声明。在该示例中,我们引用了新的基于 Reactive Streams 的 LogsService 接口,并使用  Flowable API。
  4. 这是订阅阶段。一旦声明了处理流程,我们就可以订阅ReactiveReadStreamFlowable
  5. 这是一个响应准备阶段。
  6. 这是发送给客户端的最终响应。在这里, Pump 类在复杂的背压控制机制中发挥着重要作用,以防止底层 WriteStream buffer变得太满了。

正如我们所见,Vert.x 没有提供流畅的 API 来编写元素处理流。但是,它提供了一个 API,允许将任何 Publisher 转换为 Vert.x API,从而保持来自 Reactive Streams 的复杂背压管理到位。

 

Ratpack 改进

除了 Vert.x,另一个著名的 Web 框架 Ratpack 也提供对 Reactive Streams 的支持。与 Vert.x 相比,Ratpack 直接支持 Reactive Streams .例如,在 Ratpack 案例中发送日志流如下所示:

RatpackServer.start(server ->                                      // (1)
   server.handlers(chain ->                                        //
      chain.all(ctx -> {                                           //

         Publisher<String> logs = logsService.stream();            // (2)

ServerSentEvents events = serverSentEvents(               // (3)
            logs,                                                  //
            event -> event.id(Objects::toString)                   // (3.1)
                          .event("log")                            //
                          .data(Function.identity())               //
         );                                                        //

         ctx.render(events);                                       // (4)
      })                         
   )
);

关键如下:

  1. 这是服务器启动的动作和请求处理程序声明。 
  2. 这是日志流声明。
  3. 这是 ServerSentEvents的准备工作。在这里,提到的类在映射阶段发挥作用,它将 Publisher 中的元素转换为服务器发送事件的代表。我们可能已经注意到,ServerSentEvents 要求映射器函数声明,它描述了如何将元素映射到特定的 Event字段。
  4. 这是流到 I/O 的渲染。 

从示例中我们可以看出,Ratpack 在核心中提供了对 Reactive Streams 的支持。现在,可以重用相同的 LogService#stream 方法,而无需提供额外的类型转换或额外模块的要求来添加对特定反应库的支持。 

此外,与仅提供对 Reactive Streams 规范的简单支持的 Vert.x 相比,Ratpack 提供了自己的规范接口实现。此功能在 ratpack.stream.Streams 类中可用,它类似于 RxJava API:

Publisher<String> logs = logsService.stream();
TransformablePublisher publisher = Streams
   .transformable(logs)
   .filter(this::filterUsersSensitiveLogs)
   .map(this::escape);

在这里, Ratpack 提供了一个静态工厂来将任何 Publisher to TransformablePublisher 转换为使用熟悉的运算符和转换阶段灵活地处理事件流。

MongoDB 反应式流驱动程序

在前面的部分中,我们从响应式库和框架的角度概述了对 Reactive 流的支持。但是,该规范的应用领域不仅限于框架或反应式库。生产者和消费者之间的相同交互规则可以应用于通过数据库驱动程序与数据库进行通信。

通过这种方式,MongoDB 提供了一个基于 Reactive Streams 的驱动程序以及基于回调的驱动程序和 RxJava 1.x 驱动程序。反过来,MongoDB 提供了额外的、 流畅的 API 实现,它提供了某种基于书面转换的查询。例如,我们可能在新闻服务示例中看到的 DBPublisher 的内部实现可能通过以下方式实现:

public class DBPublisher implements Publisher<News> {              // (1)
private final MongoCollection<News> collection;                 //
   private final Date publishedOnFrom;                             //

   public DBPublisher(                                             // (2)
      MongoClient client,                                          //
      Date publishedOnFrom                                         //
   ) { ... }                                                       //  

@Override                                                       // (3)
public void subscribe(Subscriber<? super News> s) {             //
      FindPublisher<News> findPublisher =                          // (3.1)
         collection.find(News.class);                              //
                                                                   //
      findPublisher                                                // (3.2)
         .filter(Filters.and(                                      //
            Filters.eq("category", query.getCategory()),           //
            Filters.gt("publishedOn", today())                     //
         )                                                         //
         .sort(Sorts.descending("publishedOn"))                    //
         .subscribe(s);                                            // (3.3)
}                                                               //
}

关键如下:

  1. 这是 DBPublisher 类和相关字段声明。这里,publishedOnFrom 字段 指的是应该搜索新闻帖子的日期。
  2. 这是构造函数声明。在这里, DBPublisher 的构造函数中接受的参数之一是配置的 MongoDB 客户端,即 com.mongodb.reactivestreams。客户端.MongoClient
  3. 这是 Publisher#subscriber 方法的实现。在这里,我们在 FindPublisher 简化了 DBPublisher 的实现文字">(3.1) 和订阅 到给定的 Subscriber 在点 (3.3) 我们可能已经注意到,FindPublisher 公开了一个流畅的 API,允许使用函数式编程风格构建可执行查询。

除了对 Reactive Streams 标准的支持之外,基于 Reactive Streams 的 MongoDB 驱动程序还提供了一种简化的数据查询方式。我们不会详细介绍该驱动程序的实现和行为。相反,我们将在 第 7 章 中介绍这一点,  反应式数据库访问。

反应式技术的组合在行动

要了解有关这些技术的可组合性的更多信息,让我们尝试在一个基于 Spring Framework 4 的应用程序中组合几个 reactive 库。反过来,我们的应用程序基于重新访问的新闻服务功能,并通过一个普通的 REST 端点访问它。该端点负责从数据库和外部服务中查找新闻:

读书笔记《hands-on-reactive-programming-in-spring-5》反应式流 - 新流的标准

图 3.13。一个应用程序中的跨库通信示例

前面的图表向我们的系统介绍了三个反应式库。在这里,我们使用 Ratpack 作为 Web 服务器。使用 TransfromablePublisher,这使我们能够 轻松地 组合和处理来自多个来源的结果。反过来,其中一个来源是 MongoDB,它 返回 FindPublisher作为查询的结果。最后,这里我们可以访问外部的新服务并使用RxNetty HTTP客户端抓取一部分数据,返回< code class="literal">Observable 并因此适应 org.reactivestreams.Publisher

总结一下,我们在系统中有四个组件,第一个是Spring Framework 4,第二个是Retrofit,它起到了web框架的作用。最后,第三和第四是 RxNetty 和 MongoDB,用于提供对新闻的访问。我们不会详细介绍负责与外部服务通信的组件的实现,但我们将介绍端点的实现。这突出了 Reactive Streams 规范作为独立框架和库的可组合性标准的价值: 

@SpringBootApplication                                             // (1)
@EnableRatpack                                                     // (1.1)
public class NewsServiceApp {                                      //

@Bean                                                           // (2)
MongoClient mongoClient(MongoProperties properties) { ... }     // (2.1)
@Bean                                                           //
   DatabaseNewsService databaseNews() { ... }                      // (2.2)
   @Bean                                                           //
   HttpNewsService externalNews() { ... }                          // (2.3)

@Bean                                                           // (3)
public Action<Chain> home() {                                   // 
return chain -> chain.get(ctx -> {                           // (3.1)

         FindPublisher<News> databasePublisher =                   // (4)
            databaseNews().lookupNews();                           //
         Observable<News> httpNewsObservable =                     //
            externalNews().retrieveNews();                         //
         TransformablePublisher<News> stream = Streams.merge(      // (4.1)
            databasePublisher,                                     //
            RxReactiveStreams.toPublisher(httpNewsObservable)      //
);                                                        //

         ctx.render(                                               // (5)
            stream.toList()                                        //
                  .map(Jackson::json)                              // (5.1)
         );                                                        //
})                                                           //
   }                                                               //

public static void main(String[] args) {                        // (6)
      SpringApplication.run(NewsServiceApp.class, args);           //
}                                                               //
}

关键如下:

  1. 这是 NewsServiceApp 类声明。此类使用 @SpringBootApplication 注释进行注释,该注释假定使用 Spring Boot 功能。反过来,在 (1.1) 点有一个额外的 @EnableRatpack 注释,它是 ratpack-spring-boot 模块并为 Ratpack 服务器启用自动配置。
  2. 这是常见的 bean 声明。在这里,在 (2.1) 我们配置 MongoClient 豆。在(2.2)(2.3)这两个点都有新闻检索和查找的服务配置。 
  3. 这是请求的处理程序声明。在这里,要创建一个 Ratpack 请求处理程序,我们必须用 Action<Chain> 声明一个 Bean类型,它允许在 (3.1) 点提供处理程序的配置。 
  4. 这是服务调用和结果聚合。在这里,我们执行服务的方法并使用 Ratpack Streams API (4.1) 合并返回的流。
  5. 这是合并流阶段的渲染。在这里,我们将所有元素异步缩减为一个列表,然后将该列表转换为特定的渲染视图,例如 JSON (5.1)
  6. 这是主要方法的实现。在这里,我们使用一种常用技术来使 Spring Boot 应用程序栩栩如生。

前面的示例展示了 Reactive Streams 标准的强大功能。在这里,使用几个不相关的库的 API,我们可以轻松地构建一个处理流程并将结果返回给最终用户,而无需任何额外的努力来使一个库适应另一个库。该规则的唯一排除是 HttpNewsService,它在 retrieveNews 方法执行的结果中 returns Observable。尽管如此,我们可能还记得, RxReactiveStreams 为我们提供了一系列有用的方法,让我们能够 轻松 转换 RxJava 1.xObservablePublisher

概括


正如我们从前面的示例中了解到的,Reactive Streams 极大地提高了反应式库的可组合性。我们还了解到 验证Publisher 兼容性的最有用的方法就是应用技术兼容性测试工具包,它与 Reactive Streams规范一起提供.

同时,规范为Reactive Streams带来了pull-push通信模型。此添加解决了背压控制问题,同时通过提供使用哪种模型的选择来增强通信灵活性。

在 Reactive Streams Specification 被包含在 JDK9 中之后,它的重要性直线上升。然而,正如我们所了解的,这种改进带来了在规范的两个变体之间进行类型转换的一些开销。

正如我们从前几节中了解到的,Reactive Streams 规范允许操作员之间的多种通信方式。这种灵活性允许以不同的方式放置异步边界。但是,由于业务需求必须证明此类决策的合理性,因此它对响应式库的供应商提出了重要责任。此外,提供的解决方案应该足够灵活,可以从 API 端对其进行配置。

通过改变反应式流的行为,规范也改变了反应式格局。 Netflix、Redhead、Lightbend、Pivo​​tal 等开源行业的领导者已经在他们的响应式库中实施了该规范。然而,对于 Spring Framework 用户而言,反应式世界中发生的最重要的变化是引入了名为 Project Reactor 的新反应式库.

Project Reactor 发挥着重要作用,因为它是新的响应式 Spring 生态系统的构建块。因此,在深入了解 新反应式 Spring 的 内部实现 之前,我们应该探索 Project Reactor 并熟悉其角色的重要性。在下一章中,我们将通过查看示例来了解 Project Reactor 的概念构成及其应用。