vlambda博客
学习文章列表

读书笔记《hands-on-reactive-programming-in-spring-5》Spring 中的响应式编程 - 基本概念

第 2 章 Spring 中的响应式编程 - 基本概念

上一章解释了为什么构建响应式系统很重要,以及响应式编程如何帮助做到这一点。在本节中,我们将了解一些已经存在于 Spring Framework 中一段时间​​的工具集。我们还将通过探索 RxJava 库来学习反应式编程的重要基本概念,它是 Java 世界中第一个也是最著名的反应式库。

在本章中,我们将介绍以下主题:

  • Observer pattern
  • Publish-Subscribe implementation provided by Spring
  • Server-sent events
  • RxJava history and base concepts
  • Marble diagrams
  • Business cases implemented by applying reactive programming
  • The current landscape of reactive libraries

Spring 早期的响应式解决方案


我们之前提到有很多模式 和能够成为反应式系统构建块的编程技术。例如,回调和 CompletableFuture 通常用于实现消息驱动的架构。我们还提到了响应式编程作为此类角色的重要候选人。在我们更详细地探讨这一点之前,我们需要环顾四周并找到我们已经使用多年的其他解决方案。

第 1 章中, 为什么选择 Reactive Spring?,我们看到 Spring 4.x 引入了 ListenableFuture 类,它扩展了 Java Future 并且可以利用诸如 HTTP 请求之类的操作的异步执行。不幸的是,只有少数 Spring 4.x 组件支持较新的 Java 8 CompletableFuture,它为异步执行组合引入了一些简洁的方法。

尽管如此,Spring Framework 提供了其他一些基础设施,对于构建我们的反应式应用程序非常有用。现在让我们来看看其中的一些功能。

观察者模式

为了推动事情的发展,我们需要提醒自己 关于一个特别古老且众所周知的设计模式——the < span class="strong">观察者模式。那是二十三个著名的GoF四人帮之一span>) 设计模式。乍一看,观察者模式似乎与 reactive 编程无关。然而,正如我们稍后将看到的,经过一些小的修改,它定义了反应式编程的基础。

笔记

要了解有关 GoF 设计模式的更多信息,请参阅 Design Patterns: Elements of Reusable Object-Oriented Software作者 Erich Gamma、Richard Helm、Ralph Johnson , 和 John Vlissides (https://en.wikipedia.org/wiki/Design_Patterns )。

观察者模式涉及一个 subject 拥有一个list 的依赖者,称为 Observers。主体通知其观察者任何状态变化,通常通过调用他们的方法之一。当基于事件处理实现 系统时,这种模式是必不可少的。观察者模式是 MVC模型-视图-控制器) 模式。因此,几乎所有 UI 库都在内部应用它。

为了简化这一点,让我们使用日常情况的类比。我们可以将这种模式应用从一个技术门户网站订阅时事通讯。我们必须在我们感兴趣的网站的某个地方注册我们的电子邮件地址,然后它会以时事通讯的形式向我们发送通知,如下图所示:

读书笔记《hands-on-reactive-programming-in-spring-5》Spring 中的响应式编程 - 基本概念

图 2.1 日常生活中的观察者模式类比:来自技术门户的新闻订阅

观察者模式使得在运行时注册对象之间的一对多依赖成为可能。同时,它在不了解组件实现细节的情况下执行此操作(为了类型安全,观察者可能知道传入事件的类型)。这使我们能够解耦应用程序部分,即使这些部分主动交互。这种通信通常是单向的,有助于通过系统有效地分发事件,如下图所示:

读书笔记《hands-on-reactive-programming-in-spring-5》Spring 中的响应式编程 - 基本概念

图2.2 观察者模式UML类图

 

如上图 所示,一个典型的观察者模式由两个接口组成, Subject 和Observer。在这里,Observer 在 Subject 中注册并监听来自它的通知。 A Subject 可以自己生成事件,也可以被其他组件调用。让我们在 Java 中定义一个 Subject 接口:

public interface Subject<T> {
   void registerObserver(Observer<T> observer);
   void unregisterObserver(Observer<T> observer);
   void notifyObservers(T event);
}

这个通用接口 用事件类型T参数化,这提高了我们程序的类型安全性。它还包含管理订阅的方法( registerObserverunregisterObserver 和 notifyObservers< /code> methods) 触发事件的广播。反过来,Observer界面可能如下所示:

public interface Observer<T> {
   void observe(T event);
}

 Observer是一个通用接口,parametrized与 < code class="literal">T type .反过来,它只有一个 observe 方法 处理事件。 ObserverSubject对彼此的了解比在这些界面中描述的要多。  Observer 实现可能负责 订阅过程,或者 Observer实例可能不知道   Subject 的存在。在后一种情况下, 第三个组件可能负责查找 Subject的所有实例和所有注册过程。例如,这种角色可能会在依赖注入容器中发挥作用。这会使用 @EventListener 注释和正确的签名扫描每个Observer 的类路径。之后,它将找到的组件注册到Subject

笔记

依赖注入容器的一个经典示例是 Spring Framework 本身。如果不熟悉,请阅读 Martin Fowler 的文章 https:// martinfowler.com/articles/injection.html

现在,让我们实现两个非常简单的 observers,它们简单地接收 String 消息并将它们打印到输出溪流:

public class ConcreteObserverA implements Observer<String> {
   @Override
   public void observe(String event) {
      System.out.println("Observer A: " + event);
   }
}
public class ConcreteObserverB implements Observer<String> {
   @Override
   public void observe(String event) {
      System.out.println("Observer B: " + event);
   }
}

我们还需要编写 Subject<String>的实现,产生 String事件,如下所示代码:

public class ConcreteSubject implements Subject<String> {
   private final Set<Observer<String>> observers =                 // (1)
           new CopyOnWriteArraySet<>();

   public void registerObserver(Observer<String> observer) {
      observers.add(observer);
   }

   public void unregisterObserver(Observer<String> observer) {
      observers.remove(observer);
   }

   public void notifyObservers(String event) {                     // (2)
      observers.forEach(observer -> observer.observe(event));      // (2.1)
   }
}

从前面的例子我们可以看出,Subject的实现持有观察者Set >(1) 有兴趣接收通知。反过来,在 registerObserver Set 进行修改(订阅或取消订阅) code> 和 unregisterObserver 方法。为了广播事件,Subject 有一个 notifyObservers 方法 (2) ;遍历观察者列表 并调用 observe()方法 使用实际的 事件 (2.1) 对于每个 观察者。为了在多线程场景中保持安全,我们使用 CopyOnWriteArraySet,一个线程安全的Set实现,它创建了一个新的副本每次update操作发生时的元素。 更新CopyOnWriteArraySet的内容相对昂贵,尤其是当容器包含很多元素。但是,订阅者列表通常不会经常更改,因此对于线程安全的Subject实现来说,这是一个相当合理的选择。

 

观察者模式使用示例

现在,让我们编写一个简单的 JUnit 测试,它使用我们的类并演示所有 它们 一起玩。此外,在以下示例中,我们使用的是 Mockito 库 (http://site.mockito.org< /a>) 为了在 的支持下验证期望间谍模式

@Test
public void observersHandleEventsFromSubject() {
   // given
   Subject<String> subject = new ConcreteSubject();
   Observer<String> observerA = Mockito.spy(new ConcreteObserverA());
   Observer<String> observerB = Mockito.spy(new ConcreteObserverB());

   // when
   subject.notifyObservers("No listeners");

   subject.registerObserver(observerA);
   subject.notifyObservers("Message for A");

   subject.registerObserver(observerB);
   subject.notifyObservers("Message for A & B");

   subject.unregisterObserver(observerA);
   subject.notifyObservers("Message for B");

   subject.unregisterObserver(observerB);
   subject.notifyObservers("No listeners");

   // then
   Mockito.verify(observerA, times(1)).observe("Message for A");
   Mockito.verify(observerA, times(1)).observe("Message for A & B");
   Mockito.verifyNoMoreInteractions(observerA);

   Mockito.verify(observerB, times(1)).observe("Message for A & B");
   Mockito.verify(observerB, times(1)).observe("Message for B");
   Mockito.verifyNoMoreInteractions(observerB);
}

通过运行前面的测试,将产生以下输出。它显示哪个 Observer 收到了哪些消息:

Observer A: Message for A
Observer A: Message for A & B
Observer B: Message for A & B
Observer B: Message for B

 

在我们不需要取消订阅的情况下,我们可以利用 Java 8 的特性并将 Observer 实现类替换为 lambdas 。让我们编写相应的测试:

@Test
public void subjectLeveragesLambdas() {
   Subject<String> subject = new ConcreteSubject();

   subject.registerObserver(e -> System.out.println("A: " + e));
   subject.registerObserver(e -> System.out.println("B: " + e));
   subject.notifyObservers("This message will receive A & B");
   ...
}

值得一提的是,当前的 Subject 实现是基于 CopyOnWriteArraySet,这并不是最有效的。但是,该实现至少是线程安全 ,这意味着我们可以使用我们的Subject< /code> 在多线程环境中。例如,当事件通过许多独立组件分发时,它可能很有用,这些组件通常由多个线程工作(现在尤其适用,当大多数应用程序不是单线程时)。在本书的整个过程中,我们将讨论线程安全和其他多线程问题。

请记住,当我们有很多观察者处理具有一些 明显延迟的事件时——由下游处理引入——我们可以使用额外的线程或线程池并行消息传播。这种方法可能会导致  notifyObservers 方法的下一个实现:

private final ExecutorService executorService = 
   Executors.newCachedThreadPool();

public void notifyObservers(String event) {
   observers.forEach(observer ->
           executorService.submit(
                   () -> observer.observe(event)
           )
   );
}

 

然而,有了这样的改进,我们正在走上本土解决方案的滑坡,这些解决方案通常不是最有效的,而且很可能隐藏错误.例如,我们可能忘记来限制线程池大小,这 最终导致 an OutOfMemoryError。在客户端要求调度任务的频率高于执行器完成当前任务的频率的情况下,简单配置的ExecutorService 可能会创建越来越多的线程。由于在 Java 中每个线程消耗大约 1 MB 一个典型的 JVM 应用程序有机会通过创建几千个线程来耗尽所有可用内存。

笔记

有关 JVM 线程容量实验的更详细描述,请参阅 Peter Lawrey 的文章http://vanillajava.blogspot.com/2011/07/java-what-is-limit-to-number-of-threads.html 。它已经很老了,但从那时起 JVM 内存模型并没有太大变化。要获取有关 Java 设置的默认堆栈大小的信息,请运行以下命令:

java -XX:+PrintFlagsFinal -version | grep ThreadStackSize

为防止过度使用资源,我们可能会限制 thread 池大小并违反 < strong>活跃度应用程序的属性。当所有可用线程尝试将 some 事件推送到 same 缓慢 Observer。在这里,我们只是触及了可能发生的潜在问题的表面。此外,如白皮书所述:

“改进的多线程单元测试”(http://users.ece.utexas.edu/~gligoric/papers/JagannathETAL11IMunit.pdf),“众所周知,多线程代码很难开发和测试”。

因此,当需要多线程 Observer 模式时,最好使用久经考验的库。

笔记

说到liveness,我们指的是并发计算   将其描述为一组需要并发系统才能取得进展的属性,即使它的执行组件可能必须进入临界区。这最初是由 Lasley Lamport 在

证明多进程程序的正确性

(http ://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.137.9454&rep=rep1&type=pdf)。

如果不提及 ObserverObservable 类如何形成 java.util 包。这些类是随 JDK 1.0 发布的,所以它们已经很老了。如果我们查看源代码,我们会发现一个非常简单的实现,它与我们在本章前面所做的非常相似。由于这些类是在 Java 泛型 之前引入的,因此它们使用 Object 类型的事件 并且因此不是类型安全的。此外,这种实现效率不是很高,尤其是在多线程环境中。考虑到我们提到的(所有这些问题和其他一些问题),这些类在 Java 9 中已被弃用,因此将它们用于新应用程序是没有意义的。

笔记

有关弃用 JDK ObserverObservable 原因的更多详细信息,请访问: https://dzone.com/articles/javas-observer-and -observable-are-deprecated-in-jd

当然,在开发应用程序时,我们可能会使用观察者模式的手工实现。这使我们能够解耦事件源和观察者。然而,解决许多对现代多线程应用程序至关重要的方面是很麻烦的。这包括错误处理、异步执行、线程安全、对最高性能的需求等等。我们已经看到,JDK 附带的事件实现在教育用途之外是不够的。因此,毫无疑问,使用由权威机构提供的更成熟的实现会更好。

使用@EventListener 发布-订阅模式

开发软件时需要一次又一次地重新实现相同的软件模式是很尴尬的。幸运的是,我们有 Spring 框架、大量可爱的库和其他优秀的框架(Spring 不是唯一的)。众所周知,Spring Framework 为软件开发提供了我们可能需要 的大部分构建块。当然,长期以来,框架都有自己的观察者模式实现,这被广泛用于跟踪应用程序的生命周期事件。从 Spring Framework 4.2 开始,此实现和伴随的 API 被扩展为不仅用于处理应用程序事件,还用于处理业务逻辑事件。反过来,出于事件分发的目的,Spring 现在为事件处理提供了一个 @EventListener 注释,并为 ApplicationEventPublisher 类>事件 发布。

这里需要说明的是 @EventListenerApplicationEventPublisher 实现了 Publish -订阅模式,可以看作是观察者模式的一种变体。

笔记

可以在以下位置找到对发布-订阅模式的良好描述:http: //www.enterpriseintegrationpatterns.com/patterns/messaging/PublishSubscribeChannel.html

与观察者模式相比, 在发布-订阅模式中,发布者和订阅者 不需要相互了解,如图所示下图: 

读书笔记《hands-on-reactive-programming-in-spring-5》Spring 中的响应式编程 - 基本概念

图 2.3 观察者模式(左侧)与发布-订阅模式(右侧)

发布-订阅模式在发布者和订阅者之间提供了额外的间接级别。订阅者知道广播通知的事件通道,但通常不关心发布者的身份。此外,每个事件通道可能同时有几个发布者。上图应该有助于发现观察者模式和发布-订阅模式之间的区别。  Event Channel(也称为消息代理或事件总线)可以额外过滤传入消息并分发 它们在订阅者之间。过滤和路由可能基于消息内容、消息主题,有时甚至两者兼而有之。因此,基于主题的系统中的订阅者将收到发布到感兴趣主题的所有消息。 

Spring 框架的 @EventListener 注释 使得 可以同时应用这两个主题基于和基于内容的路由。消息类型可以起到主题的作用; condition属性enables 基于内容的路由基于 Spring表达式语言的事件处理SpEL)。

 

笔记

作为基于 Spring 的 Publish-Subscribe 模式实现的替代方案,有一个名为 MBassador 的流行开源 Java 库。它的唯一目的是提供实现发布-订阅模式的轻量级、高性能事件总线。作者声称 MBassador 在提供高性能的同时保留了资源。这是因为它几乎没有依赖关系,也不会限制我们应用程序的设计。更多详情请参考 GitHub 上的项目页面(https://github.com/bennidi /大使)。此外, Guava 库提供 EventBus,它实现发布-订阅模式。以下文章描述了 API 并包含 Guava EventBus 的代码示例:(https:/ /github.com/google/guava/wiki/EventBusExplained)。

使用 @EventListener 构建应用程序

要在 Spring 框架中使用 Publish-Subscribe 模式,让我们做一个练习。反过来,假设我们必须实现一个简单的 Web 服务来显示房间的当前温度。为此,我们有一个温度传感器,它会不时发送以摄氏度为单位的当前温度的事件。我们可能希望同时拥有移动和 Web 应用程序,但为了简洁起见,我们只实现了一个简单的 Web 应用程序。此外,由于与微控制器通信的问题超出了本书的范围,我们正在使用随机数发生器模拟温度传感器。

为了使我们的应用程序遵循 响应式设计,我们不能使用旧的拉取模型进行数据检索。 幸运的是,现在我们有一些很好的-采用了从服务器到客户端的异步消息传播协议,即 WebSockets 和 服务器发送事件SSE)。在 current 示例中,我们将使用最后一个。 SSE 允许客户端从服务器接收自动更新,通常用于向浏览器发送消息更新或连续数据流。随着 HTML5 的出现,所有现代浏览器都有一个名为 EventSource 的 JavaScript API,请求特定 URL 以接收事件流的客户端使用它。  EventSource 默认情况下也会自动重新连接 在通信问题的情况下。 重要的是要强调 SSE 是满足组件之间通信需求的优秀候选者。反应系统。与 WebSocket 一样,本书中大量使用了 SSE。

 

笔记

要了解更多关于服务器发送事件,请阅读章节

高性能浏览器网络

 由 Ilya Grigorik 在 https://hpbn.co/server-发送事件-sse/。此外,Mark Brown 的以下文章对 WebSockets 和服务器发送事件进行了很好的比较:https://www.sitepoint.com/real-time-apps-websockets-server-sent-events/

引导 Spring 应用程序

为了实现我们的用例,我们使用著名的 Spring 模块 Spring Web 和 Spring Web MVC。我们的应用程序不会使用 Spring 5 的新功能,因此它将在 Spring Framework 4.x 上类似地运行。为了简化我们的开发过程,甚至更多,我们正在利用 Spring Boot,稍后将更详细地描述。为了引导我们的应用程序,我们可以从 Spring Initializer website at start.spring.io。现在,我们需要为  web 选择首选的 Spring Boot 版本和依赖项(Gradle 配置中的实际依赖项标识符将是 org.springframework.boot:spring-boot-starter-web ),如以下屏幕截图所示:

读书笔记《hands-on-reactive-programming-in-spring-5》Spring 中的响应式编程 - 基本概念

图 2.4 基于 Web 的 Spring Initializer 简化了新 Spring Boot 应用程序的引导

或者,我们可以使用 cURL 和 Spring Boot Initializer 站点的 HTTP API 生成一个新的 Spring Boot 项目。以下命令将 有效地 创建并下载具有所有所需依赖项的相同空项目:

curl https://start.spring.io/starter.zip \
  -d dependencies=web,actuator \
  -d type=gradle-project \
  -d bootVersion=2.0.2.RELEASE \
  -d groupId=com.example.rpws.chapters \
  -d artifactId=SpringBootAwesome \
  -o SpringBootAwesome.zip

实现业务逻辑

我们现在可以在下图中概述我们的 system 的设计:

读书笔记《hands-on-reactive-programming-in-spring-5》Spring 中的响应式编程 - 基本概念

图 2.5 从温度传感器到用户的事件流

在此用例中,域模型将仅包含 Temperature 类,其中只有 double 值。为简单起见,它 也 用作事件对象,如下代码所示:

final class Temperature {
   private final double value;
   // constructor & getter...
}

为了模拟 传感器,让我们实现 TemperatureSensor类并用 @Component注解装饰它来注册 Spring豆,如下:

@Component
public class TemperatureSensor {
   private final ApplicationEventPublisher publisher;              // (1)
   private final Random rnd = new Random();                        // (2)
   private final ScheduledExecutorService executor =               // (3)
           Executors.newSingleThreadScheduledExecutor();

   public TemperatureSensor(ApplicationEventPublisher publisher) {
      this.publisher = publisher;
   }

   @PostConstruct
   public void startProcessing() {                                 // (4)
      this.executor.schedule(this::probe, 1, SECONDS);
   }

   private void probe() {                                          // (5)
      double temperature = 16 + rnd.nextGaussian() * 10;
      publisher.publishEvent(new Temperature(temperature));

      // schedule the next read after some random delay (0-5 seconds)
      executor
        .schedule(this::probe, rnd.nextInt(5000), MILLISECONDS);    // (5.1)
   }
}

所以,我们模拟的温度传感器 只 依赖 ApplicationEventPublisherclass(1),由Spring Framework提供。此类可以将事件发布到系统。需要有一个随机生成器(2) 来设计具有一些随机间隔的温度。事件生成过程发生在单独的 ScheduledExecutorService (3) 中,其中每个事件的生成都会安排下一轮事件的生成 具有随机 延迟 (5.1)。所有这些逻辑都在 probe() 方法 (5) 中定义。 反过来,所提到的类具有用 @PostConstruct startProcessing() 方法>(4),当 bean 准备好时由 Spring Framework 调用并触发整个随机温度值序列。

使用 Spring Web MVC 的异步 HTTP

Servlet 3.0 中引入的异步 support 扩展了在非容器线程中处理 HTTP 请求的能力。这样的功能对于长时间运行的任务非常有用。通过这些更改,在 Spring Web MVC 中,我们不仅可以在 @Controller 中返回 typeT 的值,还可以返回 Callable<T> 或一个 DeferredResult<T>。  Callable<T> 可以在非容器线程中运行,但仍然是阻塞调用。相比之下, DeferredResult<T> 通过调用 setResult(T result ) 方法,因此它可以在事件循环中使用。

从 4.2 版本开始,Spring Web MVC 可以返回 ResponseBodyEmitter,其行为类似于 DeferredResult,但可用于发送多个对象,其中每个对象单独编写一个消息转换器的实例(由 HttpMessageConverter 接口定义)。

 

 

 SseEmitter 扩展了 ResponseBodyEmitter 并且可以根据 SSE 的协议要求为一个传入请求发送多个传出消息. 除了 ResponseBodyEmitter 和 SseEmitter, Spring Web MVC 也 尊重 StreamingResponseBody 接口。当从 @Controller返回时,它允许我们异步发送原始数据(有效负载字节)。StreamingResponseBody对于在不阻塞 Servlet 线程的情况下流式传输大文件。

暴露 SSE 端点

下一步需要在@RestController注解中添加 TemperatureController类,表示该组件用于HTTP通信,如以下代码中的所示

@RestController
public class TemperatureController {
   private final Set<SseEmitter> clients =                          // (1)
      new CopyOnWriteArraySet<>();                    

   @RequestMapping(
      value = "/temperature-stream",                                // (2)
      method = RequestMethod.GET)
   public SseEmitter events(HttpServletRequest request) {           // (3)
      SseEmitter emitter = new SseEmitter();                        // (4)
      clients.add(emitter);                                         // (5)

      // Remove emitter from clients on error or disconnect
      emitter.onTimeout(() -> clients.remove(emitter));             // (6)
      emitter.onCompletion(() -> clients.remove(emitter));          // (7)

      return emitter;                                               // (8)
   }
   @Async                                                           // (9)
   @EventListener                                                   // (10)
   public void handleMessage(Temperature temperature) {             // (11)
      List<SseEmitter> deadEmitters = new ArrayList<>();            // (12)
      clients.forEach(emitter -> {                                      
         try {
            emitter.send(temperature, MediaType.APPLICATION_JSON);  // (13)
         } catch (Exception ignore) {
            deadEmitters.add(emitter);                              // (14)
         }
      });
      clients.removeAll(deadEmitters);                              // (15)
   }
}

 

现在,为了理解 TemperatureController类的逻辑,我们需要描述 SseEmitter。 Spring Web MVC 为该类提供发送 SSE 事件的唯一目的。当请求处理方法返回 SseEmitter 实例时,实际的请求处理将继续,直到 SseEnitter.complete(),一个错误,或发生超时。

 TemperatureController 为 URI /temperature-stream< 提供了一个请求处理程序 (3) /code> (2) 并返回 SseEmitter (8) 。在客户端请求该 URI 的情况下,我们创建并返回新的 SseEmitter 实例 (4) 之前在活动客户列表中的注册 ( 5) 。此外, SseEmitter 构造函数可能会消耗 timeout 参数。

对于 clients'集合,我们可以使用 java中的 CopyOnWriteArraySet类.util.concurrent package (1)。这样的实现允许我们修改列表并同时对其进行迭代。当 Web 客户端打开一个新的 SSE 会话时,我们将一个新的发射器添加到 clients' 集合中。  SseEmitter 当它完成处理或达到超时时      clients'列表 代码类="literal">(6) (7)。

现在,与客户建立沟通渠道意味着我们需要能够接收有关温度变化的事件。为此,我们的类有一个 handleMessage() 方法(11)。它用 @EventListener注解(10)修饰,以便接收来自Spring的事件。只有在接收到 Temperature 事件时,该框架才会调用 handleMessage()方法,因为这种方法的参数被称为  温度。  @Async 注解 (9) 将方法标记为 < strong>异步执行,所以在手动配置的线程池中调用。  handleMessage() 方法接收一个新的温度事件并 异步 将其 以JSON格式为每个事件并行发送给所有客户端 (13) 。此外,当发送到单个发射器时,我们会跟踪所有失败的发射器(14)并将它们从活动客户端列表中删除 (15)。这种方法可以发现不再运行的客户。不幸的是, SseEmitter不提供任何回调来处理错误,可以通过处理 send()方法 仅。

配置异步支持

要运行所有内容,我们需要一个 entry 点为我们的应用程序使用以下自定义方法:

@EnableAsync                                                         // (1)
@SpringBootApplication                                               // (2)
public class Application implements AsyncConfigurer {

   public static void main(String[] args) {
      SpringApplication.run(Application.class, args);
   }

   @Override
   public Executor getAsyncExecutor() {                              // (3)
      ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();// (4)
      executor.setCorePoolSize(2);
      executor.setMaxPoolSize(100);
      executor.setQueueCapacity(5);                                  // (5) 
      executor.initialize();
      return executor;
   }

   @Override
   public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler(){
      return new SimpleAsyncUncaughtExceptionHandler();              // (6)
   }
}

如我们所见,该示例是一个 Spring Boot 应用程序(2),通过@EnableAsync@EnableAsync 注释 (1)。在这里,我们可以为异步执行(6)抛出的异常配置一个异常处理程序。这也是我们为异步处理准备 Executor 的地方。在我们的例子中,我们使用 ThreadPoolTask​​Executor 和两个核心线程,最多可以增加到一百个线程。需要注意的是,如果没有正确配置队列容量 (5),线程池将无法增长。那是因为 SynchronousQueue 会被使用,限制并发。

构建具有 SSE 支持的 UI

为了完成我们需要做的最后一件事是我们的用例是一个带有一些JavaScript代码的HTML页面来与服务器通信.为简洁起见,我们将去除所有 HTML 标记并仅保留实现结果所需的最小值,如下所示:

<body>
<ul id="events"></ul>
<script type="application/javascript"> function add(message) { const el = document.createElement("li"); el.innerHTML = message; document.getElementById("events").appendChild(el); } var eventSource = new EventSource("/temperature-stream"); // (1) eventSource.onmessage = e => { // (2) const t = JSON.parse(e.data); const fixed = Number(t.value).toFixed(2); add('Temperature: ' + fixed + ' C'); } eventSource.onopen = e => add('Connection opened'); // (3) eventSource.onerror = e => add('Connection closed'); // </script>
</body>

在这里,我们使用了 EventSource对象指向 /temperature-stream  (1) 。这通过调用 onmessage() 函数 (2)、错误处理和对流打开的反应来处理传入消息,这以相同的方式完成(3)。我们应该将此页面保存为 index.html 并将其放在我们项目的 src/main/resources/static/ 文件夹中.默认情况下,Spring Web MVC 通过 HTTP 提供文件夹的内容。可以通过提供扩展WebMvcConfigurerAdapter 类的配置来更改这种行为。 

验证应用程序功能

 重建并完成我们的应用程序的启动后,我们应该能够在浏览器中访问 提到的网页 如下地址: http://localhost:8080(Spring Web MVC使用 port 8080 for web server 作为默认的。但是, 这可以在application.properties文件中使用配置行 server.port =9090)。几秒钟后,我们可能会看到以下输出:

Connection opened
Temperature: 14.71 C
Temperature: 9.67 C
Temperature: 19.02 C
Connection closed
Connection opened
Temperature: 18.01 C
Temperature: 16.17 C

正如我们所见,我们的网页响应式地接收事件,同时保留客户端和服务器资源。它还支持在网络问题或超时的情况下自动重新连接。由于当前的解决方案不是 JavaScript 独有的,我们可能会连接其他客户端,例如 curl。通过在终端中运行下一个命令,我们会收到以下原始但未格式化的事件流:

> curl http://localhost:8080/temperature-stream
data:{"value":22.33210856124129}
data:{"value":13.83133638119636}

笔记

要探索有关服务器发送事件技术及其与 Spring Framework 集成的更多信息,请阅读 Ralph Schaer 撰写的一篇优秀文章 https://golb.hplar.ch/p/Server-Sent-Events-with-Spring

对解决方案的批评

在这一点上,我们可能会称赞自己只使用几十行代码(包括 HTML 和 JavaScript)就实现了一个有弹性的反应式应用程序。但是,当前的解决方案存在一些问题。首先,我们使用的是 Spring 提供的发布-订阅基础设施。在 Spring Framework 中,最初引入了这种机制 用于处理应用程序生命周期事件,而不是 用于高负载、高性能的场景。当我们需要数千甚至数百万个单独的数据流而不是一个温度数据流时会发生什么? Spring 的实现是否能够有效地处理这样的负载?

此外,这种方法的一个重要缺点在于我们使用内部 Spring 机制来定义和实现我们的业务逻辑。这会导致框架中的一些细微更改可能会破坏我们的应用程序的情况。此外,如果不运行应用程序上下文,很难对我们的 业务规则进行单元测试。如 第 1 章 中所述,为什么选择 Reactive Spring?,同样有理由提到 一个应用程序有很多用 @EventListener< 装饰的方法/code> 注释,并且没有用一段简洁的代码描述整个工作流程的显式脚本。

此外, SseEmitter 具有错误和流结束的概念,而 @EventListener 没有。因此,为了表示组件之间的流结束或错误,我们必须定义一些特殊的对象或类层次结构,并且很容易忘记处理它们。此外,这些特定标记在不同情况下可能具有略微不同的语义,使解决方案复杂化并降低方法的吸引力。

另一个值得强调的缺点是我们将线程池分配给异步广播温度事件。在真正异步和反应式方法(框架)的情况下,我们不必这样做。

我们的温度传感器只生成一个事件流,而不管有多少客户端正在监听。但是,当没有人听时,它也会创建它们。这可能会导致资源浪费,尤其是当创建操作需要资源时。例如,我们的组件可能会与真实硬件通信并同时缩短硬件寿命。

 

为了解决所有这些问题以及其他问题,我们需要一个专门为此目的设计的反应库。幸运的是,我们有一些这样的。我们现在来看看 RxJava,它是第一个被广泛采用的响应式库,它改变了我们使用 Java 构建响应式应用程序的方式。

RxJava 作为响应式框架


有一段时间,有一个 standard 用于响应式编程 — 即 RxJava 1.x(参见https:// /github.com/ReactiveX/RxJava 了解更多详情)。正如我们在当今的 Java 世界中所知道的那样,该库为响应式编程铺平了道路。目前,它不是唯一的此类图书馆。我们还有 Akka Streams 和 Project Reactor 后者在 第 4 章Project Reactor - 响应式应用程序的基础。因此,目前,我们 有几个选项 从中我们可以选择。此外,随着 2.x 版本的发布,RxJava 本身也发生了很大变化。但是,要了解响应式编程的最基本概念及其背后的推理,我们将重点关注 RxJava 最基本的部分 仅, 在 API 上,自该库的早期版本以来一直没有改变。本节中的所有示例都应该适用于 RxJava 1.x 和 RxJava 2.x。

为了在一个应用程序类路径中同时使用, RxJava 2.x 和 RxJava 1.x 具有不同的组 ID(io.reactivex.rxjava2io.reactivex)和命名空间(io.reactivexrx)。

笔记

尽管 RxJava 1.x 的生命周期结束于 2018 年 3 月,但它仍然在少数库和应用程序中使用,主要是因为长期广泛采用。以下文章很好地描述了 RxJava 2.x 与 RxJava 1.x 相比的变化: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0

RxJava 库是 Reactive Extensions(也称为 ReactiveX )。 Reactive Extensions 是一组工具,它允许命令式语言处理数据流,而不管流是同步的还是异步的。 ReactiveX 通常被定义为观察者模式、迭代器模式和函数式编程的组合。了解更多关于 ReactiveX 的良好起点是 http://reactivex.io

 

反应式编程可能看起来很困难,尤其是当我们从命令式世界中接近它时,但主要思想实际上是直截了当的。在这里,我们将学习 RxJava 的基础知识,它是迄今为止应用最广泛的响应式库。我们不会深入研究所有细节,而是尝试了解反应式编程的所有重要概念。

观察者加迭代器等于反应流

在本章中,我们讨论了很多关于 Observer 模式的内容,它为我们提供了一个清晰的 生产者视图 事件 和消费者 事件。让我们回顾一下由该模式定义的接口,如以下代码所示:

public interface Observer<T> {
   void notify(T event);
}

public interface Subject<T> {
   void registerObserver(Observer<T> observer);
   void unregisterObserver(Observer<T> observer);
   void notifyObservers(T event);
}

正如我们之前看到的,这种方法对于无限的数据流很有吸引力,但如果能够发出数据流结束的信号就很棒了。此外,我们不希望生产者在消费者出现之前生成事件。在同步世界中,我们有一个模式——迭代器模式 这可以使用以下代码来描述:

public interface Iterator<T> {
   T next();
   boolean hasNext();
}

为了逐个检索项目,Iterator 提供了 next() 方法,还可以发出结束信号通过返回一个 false值作为 hasNext()调用的结果序列。那么如果我们试图将这个想法与观察者模式提供的异步执行混合起来会发生什么呢?结果如下所示:

public interface RxObserver<T> {
   void onNext(T next);
   void onComplete();
}

RxObserver 非常类似于 Iterator,但不是调用 next( ) 方法的Iterator, RxObserver 将通过 onNext() 回调。而不是检查 hasNext() 方法的结果是否为正,而是通知 RxObserver 流结束通过调用的 onComplete() 方法。这很好,但是错误呢? Iterator 在处理 next() 期间可能会抛出 Exception方法,如果有一种从 Producer 到 RxObserver 的错误传播机制,那就太好了。让我们为此添加一个特殊的回调——onError()。因此,最终的解决方案将如下所示:

public interface RxObserver<T> {
   void onNext(T next);
   void onComplete();
   void onError(Exception e);
}

这是因为我们刚刚设计了一个 Observer 接口,这是 RxJava 的基本概念。这个接口定义了数据如何在反应流的每个部分之间流动。作为库中最小的部分, Observer 接口随处可见。 RxObserver 类似于 观察者模式中的 观察者,如前所述。

 Observable Reactive 类 与 Observer 模式中的 Subject 相对应。因此, Observable 在发出项目时扮演了事件源的角色。它有数百种流转换方法,以及数十种初始化反应流的工厂方法。

 Subscriber 抽象类 实现 Observer 接口和消费项目。它还用作实际 Subscriber 实现的基础。 ObservableSubscriber之间的运行时关系由一个Subscription控制,它使得可以检查订阅状态并在需要时取消。这种关系如下图所示:

读书笔记《hands-on-reactive-programming-in-spring-5》Spring 中的响应式编程 - 基本概念

图 2.6 Observable-Observer 合约

RxJava 定义了关于发射项目的规则。  Observable允许发送任意数量的元素(包括零)。然后它通过声明成功或引发错误来表示执行结束。所以 Observable对于每个附加的订阅者调用onNext()任何数字次,然后调用onComplete() 或 onError() (但不能同时调用)。因此,禁止它调用 onNext() 在 onComplete() or onError()

生产和消费流

至此,我们应该对 RxJava 库足够很熟悉,从而创建我们的第一个small<一个 id="id325919730" class="indexterm"> 应用程序。让我们定义一个由Observable 类表示的流。目前,我们可以假设 Observable 是一种生成器,它知道如何在订阅者订阅后立即为订阅者传播事件:

Observable<String> observale = Observable.create(
   new Observable.OnSubscribe<String>() {
      @Override
      public void call(Subscriber<? super String> sub) {             // (1)
         sub.onNext("Hello, reactive world!");                       // (2)
         sub.onCompleted();                                          // (3)
      }
   }
);

因此,在这里我们创建一个 Observable,并在 Subscriber出现时立即应用一个回调(1)。在那一刻,我们的Observer 将产生一个字符串值(2),然后发出流结束信号给订阅者 (3)。我们还可以使用 Java 8 lambda 改进此代码:

Observable<String> observable = Observable.create(
   sub -> {
      sub.onNext("Hello, reactive world!");
      sub.onCompleted();
   }
);

与 Java Stream API 相比,Observable 是可重用的,每个订阅者都会收到 Hello, reactive world! 事件订阅后。

笔记

请注意,从 RxJava 1.2.7 开始, Observable 创建已被弃用并被视为不安全,因为它可能会生成太多元素并使订阅者过载。换句话说,这种方法不支持背压,我们稍后将详细研究这个概念。但是,为了介绍,该代码仍然有效。

 

所以,现在我们需要一个Subscriber,如下代码所示:

Subscriber<String> subscriber = new Subscriber<String>() {
   @Override
   public void onNext(String s) {                                    // (1)
      System.out.println(s);
   }

   @Override
   public void onCompleted() {                                       // (2)
      System.out.println("Done!");
   }

   @Override
   public void onError(Throwable e) {                                // (3)
      System.err.println(e);
   }
};

正如我们所见, Subscriber 必须实现 Observer 方法 并定义 新的反应事件(1)、流完成(2)和错误(3) 。现在,让我们将observablesubscriber instances 挂钩:

observable.subscribe(subscriber);

运行上述代码时,程序会生成以下输出:

Hello, reactive world!
Done!

万岁!我们刚刚编写了一个小而简单的响应式 hello-world 应用程序!正如我们可能怀疑的那样,我们可以使用 lambdas 重写这个示例,如下面的代码所示:

Observable.create(
   sub -> {
      sub.onNext("Hello, reactive world!");
      sub.onCompleted();
   }
).subscribe(
   System.out::println,
   System.err::println,
   () -> System.out.println("Done!")
);

 

RxJava 库为创建 Observable 和 Subscriber 实例提供了很大的灵活性。可以通过引用元素,使用旧样式创建 Observable 实例 just数组,或 来自 Iterable 集合,如下:

Observable.just("1", "2", "3", "4");
Observable.from(new String[]{"A", "B", "C"});
Observable.from(Collections.emptyList());

也可以引用 Callable (1) 甚至是 Future< /code> (2),如下代码所示:

Observable<String> hello = Observable.fromCallable(() -> "Hello ");  // (1)
Future<String> future =
        Executors.newCachedThreadPool().submit(() -> "World");
Observable<String> world = Observable.from(future);                  // (2)

此外,除了简单的创建功能外,Observable 流可以通过组合其他 Observable  实例来创建,这允许轻松实现相当复杂的工作流程。例如,每个传入流的 concat() 运算符通过将所有项目重新发送给下游观察者来消耗所有项目。然后将处理传入的流,直到发生终端操作(onComplete()onError()),并且处理顺序与 concat() 参数的顺序相同。以下代码演示了 concat() 用法的示例:

Observable.concat(hello, world, Observable.just("!"))
   .forEach(System.out::print);

在这里,作为几个使用不同来源的Observable 实例的直接组合的一部分,我们还使用 Observable 遍历结果.forEach() 方法,其方式类似于 Java 8 Stream API。这样的程序会生成以下输出:

Hello World!

笔记

请注意,即使不为异常定义处理程序很方便,但在发生错误的情况下,默认的Subscriber实现会抛出rx.exceptions .OnErrorNotImplementedException

 

 

生成异步序列

RxJava 不仅可以generate 不仅是未来的一个事件,而且是基于事件的异步序列,例如,在一个时间间隔上,如下代码所示:

Observable.interval(1, TimeUnit.SECONDS)
   .subscribe(e -> System.out.println("Received: " + e));
Thread.sleep(5000);                                                  // (1)

在这种情况下,输出如下:

Received: 0
Received: 1
Received: 2
Received: 3
Received: 4

此外,如果我们删除 Thread.sleep(...) (1),我们的应用程序将退出而没有任何输出.发生这种情况是因为会生成事件并因此在单独的守护线程中使用。所以,为了防止主线程完成执行,我们可以sleep() 或者做一些其他有用的任务。

当然,也有控制Observer-Subscriber合作的东西。这称为 Subscription,并具有以下接口声明:

interface Subscription {
   void unsubscribe();
   boolean isUnsubscribed();
}

 unsubscribe() 方法允许 Subscriber 通知 Observable无需发送新事件。换句话说,上述代码是订阅取消。另一方面,Observable 使用 isUn​​subscribed() 来检查Subscriber< /code> 仍在等待事件。

 

为了理解上面提到的 取消订阅功能,让我们考虑 订阅者是唯一对事件感兴趣的一方,并消费它们直到外部信号通过CountDawnLatch传播的情况  (1)。传入流每 100 毫秒生成一个新事件,这些事件产生 无休止 序列—0 , 1, 2, 3... (3)。以下代码演示了在定义反应式流时如何获取Subscription (2)。它还展示了如何取消订阅流(4)

CountDownLatch externalSignal = ...;                                 // (1)

Subscription subscription = Observable                               // (2)
        .interval(100, MILLISECONDS)                                 // (3)
        .subscribe(System.out::println);

externalSignal.await();
subscription.unsubscribe();                                          // (4)

所以在这里,订阅者接收到事件 0, 1, 2, 3,然后发生 externalSignal 调用,这导致订阅取消。

至此,我们已经了解到响应式编程由一个 Observable 流、一个 Subscriber 和某种 Subscription 传达Subscriber 接收来自Observable 生产者的事件的意图。现在是时候转换流经反应流的数据了。

流转换和大理石图

即使 ObservableSubscriber 单独使它可能 要实现很多工作流,RxJava 的 whole 力量隐藏在它的操作符中。运算符用于调整流的元素或改变  流结构本身。 RxJava 为几乎所有可能的场景提供了大量的操作符,但是学习所有这些操作符超出了本书的范围。现在让我们看看最常用和最基础的运算符;大多数其他只是基本的组合。

地图运算符

毫无疑问,RxJava 中使用最多的 运算符是 map,它有以下签名:

<R> Observable<R> map(Func1<T, R> func)

前面的方法声明意味着 func 函数 可以将 T 对象类型 转换为  R 对象类型,并应用map变换Observable<T>进入Observable<R>。但是,签名并不总是能很好地描述操作员的行为,尤其是当操作员进行复杂的转换时。为此,发明了大理石图。 Marble diagram 的视觉呈现流转换。它们对于描述操作符的行为非常有效,几乎所有的 RxJava 操作符都在 Javadoc 中包含带有大理石图的图像。  map 算子 如下图所示:

读书笔记《hands-on-reactive-programming-in-spring-5》Spring 中的响应式编程 - 基本概念

图 2.7 Operator map:通过对每个项目应用函数来转换 Observable 发出的项目

从上图来看, map应该是一对一的转换。此外,输出流与输入流具有相同数量的元素。

过滤运算符

与 map 运算符相比,  filter 产生的元素可能比少span> 它已收到。它只 发射那些成功通过谓词测试的元素,如下图所示:

读书笔记《hands-on-reactive-programming-in-spring-5》Spring 中的响应式编程 - 基本概念

图 2.8 过滤器操作符:仅从 Observable 中发出通过谓词测试的那些项目

计数运算符

 count 运算符的描述性很强;它发出唯一的 value 与输入流中的元素数。但是, count 在原始流完成的那一刻发出,因此,在无限流的情况下,  count 永远不会完成或返回任何内容,如下图所示:

读书笔记《hands-on-reactive-programming-in-spring-5》Spring 中的响应式编程 - 基本概念

图 2.9 Count 算子:统计 Observable 源发出的项目数  并且只发出这个值

 

邮编运算符

我们将看到的另一个运算符是 zip。这具有 more 复杂的行为,因为它通过应用 zip 函数。它通常用于数据丰富,尤其是在从不同来源检索部分预期结果时,如下图所示:

读书笔记《hands-on-reactive-programming-in-spring-5》Spring 中的响应式编程 - 基本概念

图 2.10 Zip 算子:通过指定函数将多个 Observable 的发射组合在一起,并根据该函数的结果为每个组合发射单个项目

在这里,Netflix 在流式传输推荐视频列表时使用 zip 运算符来组合电影 描述、电影海报和电影评级。但是,为了简单起见,我们只压缩两个字符串值流,如下面的代码所示:

Observable.zip(
        Observable.just("A", "B", "C"),
        Observable.just("1", "2", "3"),
        (x, y) -> x + y
).forEach(System.out::println);

如上图所示,前面的代码将两个流中的元素一个接一个地连接起来,并生成以下控制台输出:

A1
B2
C3 

 

要了解更多关于反应式编程中常用的运算符(不仅在 RxJava 中),请访问http://rxmarbles.com . 本网站包含 反映实际操作员行为的交互式图表。反过来,交互式 UI 允许我们可视化事件的转换,即每个事件在流中出现的顺序和时间。请注意,该站点本身是使用 RxJS 库构建的(请参阅 https://github.com/ReactiveX/rxjs 更多细节),这是 RxJava 在 JavaScript 世界中的对应物。

如前所述,RxJava 的 Observable 提供了数十种流转换运算符,可以自信地涵盖很多用例。当然,RxJava 并不 只 将开发者限制在库提供的操作符上。还可以通过实现一个派生自Observable.Transformer<T, R> 的类来编写自定义运算符。通过应用Observable.compose(transformer)运算符,可以将此类运算符逻辑包含在工作流中。目前,我们不打算深入探讨运营商的建筑理论或实践;我们将在后面的章节中部分介绍这一点。到目前为止,强调 RxJava 提供了一套健壮的工具来构建复杂的异步工作流就足够了,这些工具主要受限于我们的想象力,而不是库。

RxJava 的先决条件和好处

借助 RxJava,我们已经熟悉了 基础 r主动编程。不同的 reactive 库的 API 和实现细节可能略有不同,但概念保持不变——订阅者订阅一个可观察的流,它反过来触发事件生成的异步过程。在 生产者和订阅者之间,通常存在 一些订阅,使得打破生产者-消费者关系成为可能。这种方法非常灵活,可以控制产生和消耗的事件的数量,减少 CPU 周期的数量,这些周期通常被浪费在创建数据上,并且永远不会被使用。

为了证明反应式编程提供了节省资源的能力,让我们假设我们需要实现一个简单的内存搜索引擎服务。这应该返回包含所需短语的文档的 URL 集合。通常,客户端应用程序(Web 或移动应用程序)也会通过限制,例如,有用结果的最大数量。如果没有响应式编程,我们可能会使用以下 API 来设计这样的服务:

public interface SearchEngine {
   List<URL> search(String query, int limit);
}

 

正如我们可能从界面中注意到的那样,我们的服务执行 search 操作,收集 limit内的所有结果,并将它们放入List,并将其返回给客户端。在前面的场景中,服务的客户端接收整个结果集,即使有人在 UI 上绘制结果后选择了页面上的第一个或第二个结果。在那种情况下,我们的服务做了很多工作,而我们的客户已经等待了很长时间,但是客户忽略了大部分结果。这无疑是对资源的浪费。

但是,我们可以做得更好并处理遍历结果集的搜索结果。因此,只要客户端继续使用它们,服务器就会搜索下一个结果项。通常,服务器搜索进程不是针对每一行,而是针对某个固定大小的存储桶(比如 100 个项目)。这种方法称为 cursor 并且经常被数据库使用。对于客户端,生成的 cursoriterator 的形式表示以下 代码代表我们改进的服务API:

public interface IterableSearchEngine {
   Iterable<URL> search(String query, int limit);
}

可迭代的唯一缺点是我们的客户端线程在积极等待新数据时会被阻塞。这对于 Android UI 线程来说将是一场灾难。当 新结果到达时,搜索服务正在等待 next()调用。换句话说,客户端和服务通过Iterable接口打乒乓球。尽管如此,有时提到的交互可能是可以接受的,但在大多数情况下,它不足以构建一个高性能的应用程序。

反过来,我们的搜索引擎可能会返回 CompletableFuture 以便成为一个异步服务。 在这种情况下,我们客户端的线程可能会做一些有用的事情,而不用担心 搜索请求,因为服务在结果到达后立即调用回调。但是在这里我们再次收到全有或全无,因为 CompletableFuture 可能只包含一个值,即使它是一个结果列表,如下面的代码所示:

public interface FutureSearchEngine {
   CompletableFuture<List<URL>> search(String query, int limit);
}

使用 RxJava,我们将改进我们的解决方案并获得异步处理和对每个到达事件做出反应的能力。另外,我们的客户端可以随时unsubscribe(),减少搜索服务过程的工作量,如下代码所示:

public interface RxSearchEngine {
   Observable<URL> search(String query);
}

 

 

通过使用这种方法,我们大大提高了 响应能力 应用程序 。即使客户还没有收到所有的结果,它可能会处理已经到达的部分。作为人类,我们不喜欢等待结果。相反, 我们重视Time To First Byte关键渲染路径  指标。在所有这些情况下,反应式编程并不比传统方法差,而且通常会带来更好的结果。

笔记

要了解有关第一个字节的时间的更多信息,请参阅https://www.maxcdn.com/one/visual-glossary/time-to-first-byte。除此之外, 关键渲染路径 在 https://developers.google.com/web/fundamentals/performance/critical-rendering-path

正如我们之前看到的,RxJava 使得以一种更加通用和灵活的方式异步组合数据流成为可能。同样,我们可以将老式的同步代码包装到异步工作流中。为了管理慢速 Callable 的实际执行线程,我们可以使用 subscriberOn(Scheduler) 运算符。此运算符定义流处理在哪个 Scheduler(Java 的 ExecutorService 的响应式对应物)上启动。 Chapter 4中详细介绍了线程调度,  ;Project Reactor - 响应式应用的基础。以下代码演示了这样一个用例:

String query = ...;
Observable.fromCallable(() -> doSlowSyncRequest(query))
   .subscribeOn(Schedulers.io())
   .subscribe(this::processResult);

当然,通过这种方法,我们不能依赖一个线程将处理整个请求这一事实。我们的工作流程可能从一个线程开始,迁移到少数其他线程,然后在一个完全不同的新创建线程中完成处理。必须强调的是,使用这种方法会使对象发生变异是很危险的,唯一合理的策略是不变性。这不是一个新概念。它是函数式编程的核心原则之一。对象一旦创建,就可能不会改变。这样一个简单的规则可以防止并行应用程序中的一整类问题。

在 Java 8 引入 lambda 之前,很难充分利用反应式编程和函数式编程的强大功能。如果没有 lambda,我们必须创建许多匿名或内部类,这些类会污染应用程序代码并创建比有意义的行更多的样板。在 RxJava 诞生之初,尽管速度较慢,但​​ Netflix  广泛使用 Groovy 进行开发,主要是因为对 lambda 的支持。这使我们得出结论,作为一等公民的功能是成功和愉快地使用反应式编程所必需的。幸运的是,这对 Java 来说已经不是问题了,即使在 Android 平台上也是如此(https://github.com/orfjackal/retrolambda) 启用旧的 lambda 支持Java 版本。

 

使用 RxJava 重建我们的应用程序

为了感受 RxJava,让我们用 RxJava重写 之前编写的温度传感应用程序。要在应用程序中使用该库,让我们将以下依赖项添加到build.gradle 文件中:

compile('io.reactivex:rxjava:1.3.8')

在这里,我们使用相同的值类来表示当前温度,如下代码所示:

final class Temperature {
   private final double value;
   // constructor & getter
}

实现业务逻辑

 TemperatureSensor 类 以前将事件发送到 Spring ApplicationEventPublisher,但现在它应该返回一个带有温度事件。 Reactive 实现TemperatureSensor可能如下所示:

@Component                                                          // (1)
public class TemperatureSensor {
   private final Random rnd = new Random();                         // (2)

   private final Observable<Temperature> dataStream =               // (3)
      Observable
         .range(0, Integer.MAX_VALUE)                               // (4)
         .concatMap(tick -> Observable                              // (5)
            .just(tick)                                             // (6)
            .delay(rnd.nextInt(5000), MILLISECONDS)                 // (7)
            .map(tickValue -> this.probe()))                        // (8)
         .publish()                                                 // (9)
         .refCount();                                               // (10)

   private Temperature probe() {
      return new Temperature(16 + rnd.nextGaussian() * 10);         // (11)
   }

   public Observable<Temperature> temperatureStream() {             // (12)
      return dataStream;
   }
}

 

在这里,我们通过应用 @Component注解 TemperatureSensor注册为Spring bean >(1),所以这个 bean 可以自动装配到其他 bean 中。  TemperatureSensor 实现使用了以前没有详细解释过的 RxJava API。尽管如此,我们正试图通过探索类逻辑来阐明所使用的转换。

我们的传感器拥有 随机数生成器rnd来模拟实际的硬件传感器 测量(2)。在声明中, (3),我们定义了一个名为 dataStream的私有字段,由公共方法返回  ;temperatureStream()(12). 因此,dataStream 是组件定义的唯一 Observable 流。此流通过应用工厂方法range(0, Integer.MAX_VALUE) 生成有效的无限数字流(4) .  range() 方法生成从 0 开始的整数序列,其中有 Integer.MAX_VALUE 元素。对于这些值中的每一个,我们应用 变换(5)concatMap(tick -> ...)。方法 concatMap() 接收一个函数, f,它转换一个 勾选 item到一个可观察的元素流中,将f 函数 应用于传入流的每个元素,并将生成的流一一连接.在我们的例子中, f 函数在随机延迟后进行传感器测量(以匹配先前实现的行为)。为了探测传感器,我们创建了一个只有一个元素tick(6) 的新流。为了模拟随机延迟,我们应用 delay(rnd.nextInt(5000), MILLISECONDS) (7) 0operator,它及时移动元素。

对于下一步,我们探测传感器并通过应用 map(tickValue -> this.probe()))transformation(8), 依次调用 probe()  方法 与之前的数据生成逻辑相同(11)。在这种情况下,我们忽略 tickValue,因为它只需要生成一个元素流。因此,在应用 concatMap(tick -> ...) 后, 我们有一个返回传感器值的流,在发射之间的随机间隔最长为 5 秒元素。

实际上,我们可以在不应用运算符 (9)(10) 的情况下返回一个流,但在这种情况下,每个订阅者(SSE 客户端)将触发对流的新订阅和新的传感器读数序列。这意味着传感器读数不会在订阅者之间共享,这可能导致硬件过载和降级。为了防止这种情况发生,我们使用 publish() (9) 操作符, 从到所有目标流的源流。  publish() 运算符返回一种特殊的Observable,称为 ConnectableObservable< /代码>。后者提供了 refCount()(10) 运算符,它仅在以下情况下创建对传入共享流的订阅至少有一个传出订阅。与 Publisher-Subscriber 实现相比,这一实现可以在无人收听时不探测传感器。

自定义 SseEmitter

通过使用 TemperatureSensor,它暴露使用温度值的流,我们可以将每个新的 SseEmitter订阅到Observable流并发送接收到的 onNext 向 SSE 客户端发出信号。为了处理错误和关闭正确的 HTTP 连接,让我们编写以下SseEmitter扩展:

class RxSeeEmitter extends SseEmitter {
   static final long SSE_SESSION_TIMEOUT = 30 * 60 * 1000L;
   private final Subscriber<Temperature> subscriber;                // (1)

   RxSeeEmitter() {
      super(SSE_SESSION_TIMEOUT);                                   // (2)

      this.subscriber = new Subscriber<Temperature>() {             // (3)
         @Override
         public void onNext(Temperature temperature) {
            try {
               RxSeeEmitter.this.send(temperature);                 // (4)
            } catch (IOException e) {
               unsubscribe();                                       // (5)
            }
         }

         @Override
         public void onError(Throwable e) { }                       // (6)

         @Override
         public void onCompleted() { }                              // (7)
      };

      onCompletion(subscriber::unsubscribe);                        // (8)
      onTimeout(subscriber::unsubscribe);                           // (9)
   }

   Subscriber<Temperature> getSubscriber() {                        // (10)
      return subscriber;
   }
}

RxSeeEmitter 扩展了 众所周知的 SseEmitter。它还封装了 Temperature 事件 (1) 的订阅者。在构造函数中, RxSeeEmitter 使用必要的 SSE 会话超时调用超类构造函数 (2) 并创建一个 Subscriber<Temperature>(3) 的实例。该订阅者通过将接收到的 onNext 信号重新发送到 SSE 客户端 (4) 来做出反应。在数据发送失败的情况下, subscriber 将自己从传入的 observable 流 (5) 中取消订阅。在当前的实现中,我们知道 温度流是无限的,不会产生任何错误,所以 onComplete()onError() handlers是空的(6)(7),但在实际应用中,最好有一些那里的处理程序。

(8)(9) 行为 SSE 会话完成或超时注册清理操作。  RxSeeEmitter 订阅者应取消订阅。要使用订阅者, RxSeeEmitter 通过利用 getSubscriber() 方法公开它(10)

暴露 SSE 端点

要公开 SSE 端点,我们需要一个 REST controller 与 温度传感器 实例。以下代码显示了控制器,它利用了 RxSeeEmitter

@RestController
public class TemperatureController {
   private final TemperatureSensor temperatureSensor;                // (1)

   public TemperatureController(TemperatureSensor temperatureSensor) {
      this.temperatureSensor = temperatureSensor;
   }

   @RequestMapping(
      value = "/temperature-stream",
      method = RequestMethod.GET)
   public SseEmitter events(HttpServletRequest request) {
      RxSeeEmitter emitter = new RxSeeEmitter();                     // (2)

      temperatureSensor.temperatureStream()                          // (3)
         .subscribe(emitter.getSubscriber());                        // (4)

      return emitter;                                                // (5)
   }
}

 

 

 TemperatureController 与之前的 Spring Web MVC @RestController 相同。它包含对TemperatureSensor bean (1)的引用。当创建一个新的 SSE 会话时,控制器会实例化我们增强的 RxSeeEmitter(2)  并订阅 RxSeeEmitter 订阅者 (4) 到 TemperatureSensor实例 (3)引用的温度流。然后将 RxSeeEmitter实例返回给Servlet容器处理(5)

正如我们在 RxJava 中看到的那样,REST 控制器拥有较少的逻辑,不管理 dead SseEmitter 实例,也不关心同步。反过来,响应式实现管理 TemperatureSensor 的值、读取和发布的例程。  RxSeeEmitter 将反应流转换为传出的 SSE 消息,并且 TemperatureController 仅将新的 SSE 会话绑定到新的  RxSeeEmitter。此外,此实现不使用 Spring 的 EventBus,因此更便携,无需初始化 Spring 上下文即可进行测试。

应用程序配置

由于我们不使用 Publish-Subject 方法 和 Spring 的 @EventListener注解,我们不依赖 Async 支持,所以应用配置变得更简单:

@SpringBootApplication
public class Application {
   public static void main(String[] args) {
      SpringApplication.run(Application.class, args);
   }
}

可以看到,这次我们不需要使用 @EnableAsync注解来开启Async Support,我们也 也不需要配置 Spring的Executor用于事件处理。当然,如果需要,我们可以配置一个 RxJavaScheduler 在处理响应式流时进行细粒度的线程管理,但是这样的配置不会依赖于 Spring Framework。

反过来,我们不需要更改应用程序 UI 部分的代码;它应该像以前一样工作。在这里,我们必须强调  基于 RxJava 的实现,当没有人听时,温度传感器不会被探测。这种行为是 响应式编程具有主动订阅的概念这一事实的自然结果。基于 Publish-Subject 的实现没有这样的属性,因此受到更多限制。

 

反应式库的简史


既然我们已经熟悉了RxJava,甚至还编写了一些响应式工作流,让我们看看它的历史以识别其中的上下文哪个反应式编程诞生了以及它旨在解决的问题。

奇怪的是,我们今天所知的 RxJava 历史和响应式编程的历史开始于微软内部。 2005 年,Erik Meijer 和他的云可编程团队正在试验适合构建大规模异步和数据密集型互联网服务架构的编程模型。经过几年的试验,Rx 库的第一个版本于 2007 年夏天诞生。另外两年时间专门用于库的不同方面,包括多线程和协作重新调度。 Rx.NET的第一个公共版本于 2009 年 11 月 18 日发布。后来,微软将该库移植到不同的语言,例如 JavaScript、C++ 、Ruby 和 Objective-C,以及Windows Phone平台。随着 Rx 开始流行,微软在 2012 年秋天开源了 Rx.NET。

笔记

要了解有关 Rx 库诞生的更多信息,请阅读 Erik Meijer 在 Reactive Programming with RxJava中的前言,作者 Tomasz Nurkiewicz 和 Ben Christensen。

在某个时候,Rx 的想法传播到了微软之外,2012 年,来自 GitHub Inc. 公司的 Paul Betts 和 Justin Spahr-Summers 实施并发布了 ReactiveCocoa for Objective-C。同时,来自 Netflix 的 Ben Christensen 移植了 Rx.NET到 Java 平台,并于 2013 年初在 GitHub 上开源了 RxJava 库 。

当时,Netflix 面临着处理流媒体产生的海量互联网流量的非常复杂的问题。一个名为 RxJava 的异步响应式库帮助他们构建了响应式系统,该系统在 2015 年拥有北美 37% 的互联网流量份额!现在, 系统中的很大一部分流量 由 RxJava 处理。为了承受所有这些巨大的负载,Netflix 不得不发明新的架构模式并在库中实现它们。其中最知名的是:

 

在所有情况下,RxJava 都是命名库以及整个 Netflix 生态系统本身的重要组成部分。 Netflix 在微服务和流媒体架构方面的成功促使其他公司采用相同的方法,包括RxJava。

如今,RxJava 在一些 NoSQL Java 驱动程序中被原生使用​​,例如 Couchbase (https://blog.couchbase.com/why-couchbase-chose-rxjava-new-java -sdk/) 和 MongoDB (https://mongodb.github.io/mongo-java-driver-rx/)。

还需要注意的是,RxJava 受到了 Android 开发人员和公司 例如 SoundCloud、Square、NYT、和 SeatGeek 使用 RxJava 实现他们的移动应用程序。这种积极参与导致了名为RxAndroid的病毒库的出现。这极大地简化了在 Android 中编写反应式应用程序的过程。在 iOS 平台上,开发人员拥有 RxSwift, Rx 库的 Swift 变体。

目前,如果没有移植 Rx 库,很难找到一种流行的编程语言。在 Java 世界中,我们有 RxScala、RxGroovy、RxClojure、RxKotlin 和 RxJRuby,而这个列表还远未完成。要找到我们最喜欢的语言的 Rx 实现,请参阅此网页 http:// reactivex.io/languages.html

说 RxJava 是反应式编程的第一个也是唯一的先驱是不公平的。重要的是,异步编程的广泛采用为响应式技术奠定了坚实的基础和需求。 NodeJS 及其社区 (https://nodejs.org)。

反应性景观


在前面的部分中,我们学习了如何使用纯 RxJava 以及如何将其与 Spring Web MVC 结合使用。为了证明这样做的好处,我们更新了温度监控应用程序并通过应用 RxJava 改进了设计。然而,值得注意的是 Spring Framework 和 RxJava 并不是唯一有效的组合。许多应用程序服务器也重视反应式方法的力量。因此,名为 Ratpack 的成功响应式服务器的作者也决定采用 RxJava。

 

 

除了回调和基于 Promise 的 API,Ratpack 还提供 RxRatpack,一个单独的模块, 允许将 Ratpack Promise 转换为RxJava Observable 很容易,反之亦然,如下代码所示:

Promise<String> promise = get(() -> "hello world");
RxRatpack
   .observe(promise)
   .map(String::toUpperCase)
   .subscribe(context::render);

笔记

要了解有关 Ratpack 服务器的更多信息,请访问该项目的官方网站 https ://ratpack.io/manual/current/all.html

另一个在 Android 世界著名的例子是 HTTP 客户端 Retrofit,,它还围绕自己的 Futures 和 Callbacks 实现创建了一个 RxJava 包装器。以下示例显示了在 Retrofit 中至少可以使用四种不同的编码风格:

interface MyService {
   @GET("/user")
   Observable<User> getUserWithRx();

   @GET("/user")
   CompletableFuture<User> getUserWithJava8();

   @GET("/user")
   ListenableFuture<User> getUserWithGuava();

   @GET("user")
   Call<User> getUserNatively()
}

尽管 RxJava 可以改进任何解决方案,但响应式环境并不局限于它或其包装器。在 JVM 世界中,有许多其他库和服务器创建了它们的响应式实现。例如, 知名的响应式服务器 Vert.x 在一段时间内只使用基于回调的通信,但后来用io.vertx.core.streams< /code>package,包含以下接口:

  • ReadStream<T>: This interface represents a stream of items that can be read from
  • WriteStream<T>: This describes a stream of data that can be written to
  • Pump: This is used for moving data from ReadStream to a WriteStream and performing flow control

 

让我们看一下带有 Vert.x示例的代码片段:

public void vertexExample(HttpClientRequest request, AsyncFile file) {
   request.setChunked(true);
   Pump pump = Pump.pump(file, request);
   file.endHandler(v -> request.end());
   pump.start();
}

笔记

Eclipse Vert.x 是一个事件驱动的应用程序框架,在设计上类似于 Node.js。它提供了一个简单的并发模型、用于异步编程的原语以及一个渗透到浏览器内 JavaScript 的分布式事件总线。如果对 Vert.x 及其 Reactive Streams 的实现感兴趣,请访问此网页:http://vertx.io/docs/

RxJava 的采用和替代实现 的数量是巨大的,而且远不限于上述解决方案。世界各地的许多公司和开源项目都创建了自己的类似于 RxJava 的解决方案,或者他们扩展了已经存在的解决方案。

诚然,库之间的自然演化和竞争并没有错,但很明显,当我们尝试在一个 Java 应用程序中组合几个不同的响应式库或框架时,就会出现问题。此外,我们最终会发现 反应式库的行为大体上是相似的,但在细节上略有不同。由于难以发现和修复的隐藏错误,这种情况可能会危及整个项目。因此,鉴于所有这些 API 差异,在一个应用程序中混合几个不同的响应式库(例如, Vert.x 和 RxJava)并不是一个好主意。在这一点上,很明显整个响应式 landscape 需要一些标准或通用 API,这将提供任何之间的兼容性保证实施。当然,设计了这样一个标准  它被称为 Reactive Streams。下一章将详细介绍这一点。 

 

 

概括


在本章中,我们重新审视了 GoF 的一些著名设计模式——包括 Observer、Publish-Subscribe 和 Iterator,以构建响应式编程的基础。我们编写了一些实现来回顾我们已经拥有的用于异步编程的工具的强项和弱点。我们还利用 Spring Framework 对 Server-Sent Events、WebSockets 的支持,并使用 Spring 提供的 Event-Bus。此外,我们使用 Spring Boot 和 start.spring.io 来快速启动应用程序。尽管我们的示例非常简单,但它们展示了由用于异步数据处理的不成熟方法引起的潜在问题。

我们还查看了响应式编程的历史以突出架构问题,而响应式编程的发明就是为了解决这些问题。在这种情况下,Netflix 的成功故事表明,像 RxJava 这样的小型库可能会成为在竞争非常激烈的商业领域取得重大成功的起点。我们还发现,随着 RxJava 的成功,许多公司和开源项目在考虑到这些因素的情况下重新实现了响应式库,这导致了通用的响应式环境。这种多功能性激发了对响应式标准的需求,我们将在下一章中讨论。