vlambda博客
学习文章列表

JDK17 |java17学习 第 14 章 Java 标准流

Chapter 15: Reactive Programming

在本章中,您将了解反应式宣言和反应式编程的世界。我们首先定义和讨论反应式编程的主要概念——异步、非阻塞和响应式。使用它们,我们定义和讨论响应式编程,主要的响应式框架,并更详细地讨论 RxJava。

在本章中,我们将介绍以下主题:

  • 异步处理
  • 非阻塞 API
  • 反应式——响应式、弹性、弹性和消息驱动的系统
  • 反应式流
  • RxJava

在本章结束时,您将能够使用响应式编程编写异步处理代码。

Technical requirements

为了能够执行本章中提供的代码示例,您将需要以下内容:

  • 具有操作系统的计算机:Microsoft Windows、Apple macOS 或 Linux
  • Java SE 版本 17 或更高版本
  • 您喜欢的任何 IDE 或代码编辑器

本书第 1 章Java 17 入门。本章的文件和代码示例可从位于 https:// /github.com/PacktPublishing/Learn-Java-17-Programming.git。您可以在 examples/src/main/java/com/packt/learnjava 中找到它们/ch15_reactive 文件夹。

Asynchronous processing

异步表示请求者立即得到响应,但结果不存在。相反,请求者会一直等待,直到结果发送给他们,保存在数据库中,或者,例如,呈现为允许您检查结果是否准备好的对象。如果是后者,请求者会定期调用此对象的某个方法,并在结果准备好时,使用同一对象上的另一个方法检索它。异步处理的 优点是请求者可以在等待的同时做其他事情。

第 8 章中,< em class="italic">多线程和并发处理,我们演示了如何创建子线程。这样的子线程然后发送一个非异步(阻塞)请求并等待它的返回,什么都不做。同时,主线程继续执行,并定期调用子线程对象,查看结果是否就绪。这是最基本的异步处理实现。事实上,我们在使用并行流时已经使用了它。

在后台工作以创建子线程的并行流操作将流分成段,将每个段分配给专用线程进行处理,然后将来自所有段的部分结果聚合为最终结果。在上一章中,我们甚至编写了完成聚合工作的函数。提醒一下,该函数被 称为combiner。

让我们用一个例子来比较顺序流和并行流的性能。

Sequential and parallel streams

为了演示顺序处理和并行处理之间的区别,让我们想象一个从 10 个物理设备(例如传感器)收集数据的系统并且 计算平均值。以下是 get() 方法,该方法从由其 ID 标识的传感器收集测量值:

double get(String id){
    try{
        TimeUnit.MILLISECONDS.sleep(100);
    } catch(InterruptedException ex){
        ex.printStackTrace();
    }
    return id * Math.random();
}

我们加入了 100 毫秒的延迟,以模拟从传感器收集测量值所需的时间。至于得到的测量值,我们使用 Math.random() 方法。我们将使用 MeasuringSystem 类的对象来调用这个 get() 方法,这是该方法所在的位置。

然后,我们将计算一个平均值,以抵消个别设备的错误和其他特性:

void getAverage(Stream<Integer> ids) {
    LocalTime start = LocalTime.now();
    double a = ids.mapToDouble(id -> new MeasuringSystem()
                  .get(id))
                  .average()
                  .orElse(0);
    System.out.println((Math.round(a * 100.) / 100.) + " in " +
       Duration.between(start, LocalTime.now()).toMillis() +  
                                                         "ms");
}

请注意我们如何使用 mapToDouble() 操作将 ID 流转换为 DoubleStream 以便我们可以应用 average() 操作。 average() 操作返回一个 Optional 对象,我们称其为 orElse (0) 方法,返回计算值或零(例如,如果测量系统无法连接到其任何传感器并返回空流)。

getAverage() 方法的最后一行打印了结果和计算它所花费的时间。在实际代码中,我们将返回结果并将其用于其他计算。但是,出于演示目的,我们将仅打印它。

现在我们可以比较顺序流处理的性能和并行处理(参见 MeasuringSystem 类和 compareSequentialAndParallelProcessing() 方法):

List<Integer> ids = IntStream.range(1, 11)
                             .mapToObj(i -> i)
                             .collect(Collectors.toList());
getAverage(ids.stream());          //prints: 2.99 in 1030 ms
getAverage(ids.parallelStream());  //prints: 2.34 in  214 ms

如果您运行此示例,结果可能会有所不同,因为您可能还记得,我们将收集的测量值模拟为随机值。

如您所见,并行流的处理比顺序流的处理快五倍。结果是不同的,因为每次测量产生的结果都略有不同。

虽然并行流在后台使用了异步处理,但这并不是程序员在谈论请求的异步处理时所想到的。从应用程序的角度来看,它只是并行(也称为并发)处理。它比顺序处理要快,但主线程必须等到所有调用都完成并检索到数据。如果每个调用至少需要 100 毫秒(在我们的例子中),那么即使每个调用都由专用线程进行,也无法在更短的时间内完成所有调用的处理。

当然,我们可以创建一个服务,使用子线程进行所有调用,而主线程执行其他操作。稍后,主线程可以再次调用该服务并获取结果或从先前约定的位置获取结果。那确实是程序员正在谈论的异步处理。

但是在编写这样的代码之前,让我们先看看 java.util.concurrent 包中的 CompletableFuture 类。它完成了所描述的一切以及更多。

Using the CompletableFuture object

使用 CompletableFuture 对象,我们可以将向测量系统发送请求与从 CompletableFuture 对象获取结果分开。这正是我们在解释什么是异步处理时描述的场景。让我们在代码中演示一下(参见 MeasuringSystem 类和 completableFuture() 方法):

List<CompletableFuture<Double>> list = ids.stream()
     .map(id -> CompletableFuture.supplyAsync(() ->
  new MeasuringSystem().get(id))).collect(Collectors.toList());

supplyAsync() 方法不等待对测量系统的调用返回。相反,它会立即创建一个 CompletableFuture 对象并返回它。这样客户就可以在以后的任何时候使用这个对象来检索测量系统返回的结果。以下代码获取 CompletableFuture 对象列表并对其进行迭代,从每个对象中检索结果并计算平均值:

LocalTime start = LocalTime.now();
double a = list.stream()
               .mapToDouble(cf -> cf.join().doubleValue())
               .average()
               .orElse(0);
System.out.println((Math.round(a * 100.) / 100.) + " in " +
  Duration.between(start, LocalTime.now()).toMillis() + " ms"); 
                                         //prints: 2.92 in 6 ms

此外,一些方法允许您检查是否返回了值,但这不是本演示的重点,而是展示如何使用 CompletableFuture 类组织异步处理。

创建的 CompletableFuture 对象列表可以存储在任何地方并非常快速地处理(在我们的例子中,在 6 毫秒内),前提是已经收到测量值(所有 get() 方法都被调用并返回值)。在创建 CompletableFuture 对象列表之后并在对其进行处理之前,系统不会被 阻塞并且可以执行其他操作。这就是异步处理的优势。

CompletableFuture 类有许多方法,并受到其他几个类和接口的支持。例如,可以添加一个固定大小的线程池来限制线程的数量(参见 MeasuringSystem 类和 threadPool() 方法):

ExecutorService pool = Executors.newFixedThreadPool(3);
List<CompletableFuture<Double>> list = ids.stream()
        .map(id -> CompletableFuture.supplyAsync(() -> 
                         new MeasuringSystem().get(id), pool))
        .collect(Collectors.toList());

有多种这样的池用于不同的目的和不同的性能。但是使用池并不会改变整个系统的设计,所以我们省略了这样一个细节。

如您所见,异步处理的功能非常强大。还有一种异步 API 的变体,称为 非阻塞 API。我们将在下一节讨论这个问题。

Non-blocking APIs

非阻塞 API 的客户端 获得结果而不会被阻塞很长时间,因此允许客户端在准备结果期间执行其他操作.因此,非阻塞 API 的概念意味着高度响应的应用程序。请求的处理(即获取结果)可以同步或异步完成——这对客户端无关紧要。然而,在实践中,应用程序通常使用异步处理来促进 API 的吞吐量和性能的提高。

non-blocking 一词在 java.nio 包一起使用. 非阻塞输入/输出(NIO)提供对密集型输入/输出<的支持/strong>(I/O)操作。它描述了应用程序是如何实现的:它没有为每个请求指定一个执行线程,而是提供了几个异步并发处理的轻量级工作线程。

The java.io package versus the java.nio package

向外部存储器(例如,硬盘驱动器)写入和读取数据的操作比仅在内存中处理要慢得多。最初,java.io 包中已有的类和接口运行良好,但偶尔会造成性能瓶颈。创建了新的 java.nio 包以提供更有效的 I/O 支持。

java.io 实现 基于 I/O 流处理。正如我们在上一节中看到的,本质上,这是一个阻塞操作,即使在幕后发生了某种并发。为了提高速度,java.nio 实现是基于内存中缓冲区的读/写而引入的。这样的设计使其能够将填充/清空缓冲区的缓慢过程与快速读取/写入缓冲区分开。

在某种程度上,它类似于我们在 CompletableFuture 用法示例中所做的。将数据保存在缓冲区中的另一个优点是可以检查数据,与缓冲区一起往返,这在从流中顺序读取时是不可能的。它在数据处理过程中提供了更大的灵活性。此外,java.nio 实现引入了另一个名为 a channel 用于进出缓冲区的批量数据传输。

读取线程正在从通道中获取数据,并且只接收当前可用的内容或根本不接收(如果通道中没有数据)。如果数据不可用,线程可以做其他事情,而不是保持阻塞状态——例如,以与 CompletableFuture 示例可以在测量系统从其传感器获取数据时自由地做任何必须做的事情。

这样,代替 将一个 线程专用于一个 I/O 进程,几个工作线程可以服务于多个 I/O 进程.这样的解决方案最终被称为 NIO, 后来被应用到其他进程中,最突出的是 事件循环中的事件处理 ,这也是 称为运行循环。

The event/run loop

许多 非阻塞系统基于事件(或运行) loop – 一个持续执行的线程。它接收事件(请求和消息),然后将它们分派给相应的事件处理程序(工作人员)。事件处理程序没有什么特别之处。它们只是程序员专用于处理特定事件类型的方法(函数)。

这样的设计被称为反应器设计模式。它是围绕同时处理事件和服务请求而构建的。此外,它还为 反应式编程和反应式系统命名对事件做出反应同时处理它们。

基于事件循环的设计广泛用于操作系统和图形用户界面。它从 Spring 5 开始在 Spring WebFlux 中可用,并且可以在 JavaScript 和流行的执行环境 Node.js 中实现。后者使用事件循环作为其处理主干。工具包 Vert.x 也是围绕事件循环构建的。

在采用事件循环之前,为每个传入请求分配了一个专用线程——就像我们演示的流处理一样。每个线程都需要分配一定数量的资源,这些资源不是特定于请求的,所以一些资源——主要是内存分配——被浪费了。然后,随着请求数量的增加,CPU 需要更频繁地将其上下文从一个线程切换到另一个线程,以允许或多或少地同时处理所有请求。在负载下,切换上下文的开销足以影响应用程序的性能。

实现 事件循环已经解决了这两个问题。它通过避免为每个请求创建线程并消除切换上下文的开销来消除资源浪费。有了事件循环,每个请求都需要更小的内存分配来捕获其细节,这使得可以在内存中保留更多请求,以便它们可以同时处理。由于上下文大小的减小,CPU 上下文切换的开销也变得小得多。

非阻塞 API 是一种处理请求的方式,以便系统能够处理更大的负载,同时保持高度响应和弹性​​。

Reactive

通常,术语 reactive 是 在反应式编程的上下文中使用和反应系统。反应式编程(也称为 Rx 编程)基于异步数据流(也称为 反应式流)。它是作为 Java 的 Reactive Extension (RX) 引入的,它也是称为RxJava (http: //reactivex.io)。后来,在 java.util.concurrent 包中将 RX 支持添加到 Java 9。它允许 Publisher 生成 Subscriber 可以异步订阅的数据流。

反应式流和标准流(也称为 Java 8 流 和 之间的一个主要区别位于 java.util.stream 包)是响应式流的源(发布者)以自己的速率将元素推送给订阅者,而在标准流中,只有在前一个已经被处理(实际上,它就像一个 for 循环)。

如您所见,即使没有这个新的 API,我们也能够通过使用 CompletableFuture 异步处理数据。但是在写了几次这样的代码之后,你可能会注意到大部分代码只是管道,所以你会觉得必须有一个更简单、更方便的解决方案。这就是响应式流倡议 (http://www.reactive-streams.org) 的方式 诞生了。工作范围定义如下:

Reactive Streams 的范围是找到一组最小的接口、方法和协议,这些接口、方法和协议将描述实现目标所需的操作和实体——具有非阻塞背压的异步数据流。

non-blocking backpressure 一词指的是 异步处理的问题之一:协调传入数据的速度与系统无需停止(阻塞)数据输入即可处理它们的能力。解决方案是通知来源消费者难以跟上输入。此外,处理应该以比阻塞流更灵活的方式对传入数据速率的变化做出反应,因此名称为 reactive

几个库已经实现了响应式 streams API:RxJava (http://reactivex.io< /a>)、Reactor (https://projectreactor.io)、Akka Streams (https://akka.io/docs) 和 Vert.x (https://vertx.io/) 是最知名的。使用 RxJava 或其他异步 流库编写代码 构成反应式编程。它实现了 Reactive Manifesto (https://www.reactivemanifesto .org)通过构建响应弹性弹性的反应系统消息驱动

Responsive

这个术语是相对不言自明的。及时响应的能力是任何系统的主要品质之一。有很多方法可以实现它。即使是由足够多的服务器和其他基础设施支持的传统阻塞 API,也可以在不断增长的负载下实现良好的响应能力。

反应式编程有助于使用更少的硬件来做到这一点。这是有代价的,因为响应式代码需要改变我们对控制流的看法。但一段时间后,这种新的思维方式变得和其他任何熟悉的技能一样自然。

在接下来的部分中,我们将看到相当多的反应式编程示例。

Resilient

失败是不可避免的。硬件崩溃、软件有缺陷、接收到意外数据或采用了未经测试的执行路径——这些事件中的任何一个或它们的组合都可能随时发生。 弹性是系统在意外情况下继续提供预期结果的能力。

例如,可以使用可部署组件和硬件的冗余,使用系统的各个部分隔离以降低多米诺骨牌效应的可能性,通过设计具有自动可更换部件的系统,或通过发出警报以便有资格的人员能够干扰。此外,我们将分布式系统作为设计弹性系统的一个很好的例子进行了讨论。

分布式架构消除了单点故障。此外,将系统分解为许多使用消息相互通信的专用组件可以更好地调整最关键部分的复制,并为它们的隔离和潜在的故障遏制创造更多机会。

Elastic

通常, 承受最大可能负载的能力与 可扩展性 相关联.但是,在变化的负载下(不仅仅是在不断增长的负载下)保持相同性能特征的能力是,称为弹性。

弹性系统的客户端不应该注意到空闲时间段和峰值负载时间段之间的任何差异。非阻塞响应式的实现方式有助于实现这种质量。此外,将程序分解成更小的部分并将它们转换为可以独立部署和管理的服务,可以对资源分配进行微调。

这样的小服务称为微服务,它们中的许多一起可以组成一个既可扩展又具有弹性的反应式系统。我们将在以下部分和下一章更详细地讨论这种架构。

Message-driven

我们已经 已经确定组件隔离和系统分布是有助于保持系统响应性、弹性和弹性的两个方面。松散和灵活的连接也是支持这些品质的重要条件。响应式系统的异步特性并没有给设计者留下任何其他选择,只能在组件和消息之间建立通信。

它在每个组件周围创造了呼吸空间,没有它,系统将成为一个紧密耦合的单体,容易受到各种问题的影响,更不用说维护噩梦了。

在下一章中,我们将研究一种架构风格,该风格可用于将应用程序构建为使用消息进行通信的松散耦合微服务的集合。

Reactive streams

在 Java 9 中引入的反应式流 API 由以下四个接口组成:

@FunctionalInterface
public static interface Flow.Publisher<T> {
    public void subscribe(Flow.Subscriber<T> subscriber);
}
public static interface Flow.Subscriber<T> {
    public void onSubscribe(Flow.Subscription subscription);
    public void onNext(T item);
    public void onError(Throwable throwable);
    public void onComplete();
}
public static interface Flow.Subscription {
    public void request(long numberOfItems);
    public void cancel();
}
public static interface Flow.Processor<T,R>
               extends Flow.Subscriber<T>, Flow.Publisher<R> {
}

Flow.Subscriber 对象 可以作为参数传递到 subscribe() Flow.Publisher 方法。然后,发布者调用订阅者的 onSubscribe() 方法并将 Flow.Subscription 对象作为参数传递给它。现在,订阅者可以在订阅对象上调用 request(long numberOfItems) 来向发布者请求数据。这就是 pull 模型的实现方式,这让订阅者决定何时请求另一个项目加工。订阅者可以通过调用订阅的 cancel() 方法取消订阅发布者服务。

作为回报,发布者可以通过调用订阅者的 onNext() 方法将新项目传递给订阅者。当没有更多数据到来时(即所有来自源的数据都已发出),发布者调用订阅者的 onComplete() 方法。此外,通过调用订阅者的 onError() 方法,发布者可以告诉订阅者它遇到了问题。

Flow.Processor 接口描述了一个既可以充当订阅者又可以充当发布者的实体。它允许您创建此类处理器的链(或管道),因此订阅者可以从发布者接收项目,对其进行转换,然后将结果传递给下一个订阅者或处理器。

在推送模型中,发布者可以在没有订阅者任何请求的情况下调用 onNext()。如果处理速度低于项目发布速度,订阅者可以使用各种策略来缓解压力。例如,它可以跳过项目或创建一个缓冲区用于临时存储,希望项目的生产速度会减慢,订阅者能够赶上。

这是响应式流计划定义的最小接口集,用于支持具有非阻塞背压的异步数据流。如您所见,它允许订阅者和发布者相互交谈并协调传入数据的速率;因此,它为我们在 Reactive 部分中讨论的背压问题提供了多种解决方案。

有很多方法可以实现这些接口。目前,在 JDK 9 中,其中一个接口只有一种实现:SubmissionPublisher 类实现 Flow.Publisher。原因是这些接口不应该被应用程序开发者使用。它是一个 Service Provider Interface (SPI),供反应式流库的开发人员使用。如果需要,使用已经存在的工具包之一来实现我们之前提到的反应式流 API:RxJava、Reactor、Akka Streams、Vert.x 或您喜欢的任何其他库。

RxJava

在我们的示例中,我们将 使用 RxJava 2.2.21 (http://reactivex.io) 。可以使用以下依赖项将其添加到 项目中:

<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>2.2.21</version>
</dependency>

首先,让我们比较使用 java.util.stream 包和 io.reactivex 包的相同功能的两个实现。示例程序将非常简单:

  • 创建整数流:12345
  • 只过滤偶数(即 24)。
  • 计算每个过滤后数字的平方根。
  • 计算所有平方根的总和。

以下是使用 java.util.stream 包实现 的方法(参见 ObservableIntro 类和 squareRootSum() 方法):

double a = IntStream.rangeClosed(1, 5)
                    .filter(i -> i % 2 == 0)
                    .mapToDouble(Double::valueOf)
                    .map(Math::sqrt)
                    .sum();
System.out.println(a);          //prints: 3.414213562373095

此外,使用 RxJava 实现的相同功能如下所示:

Observable.range(1, 5)
      .filter(i -> i % 2 == 0)
      .map(Math::sqrt)
      .reduce((r, d) -> r + d)
      .subscribe(System.out::println); 
                                    //prints: 3.414213562373095

RxJava是基于Observable对象(扮演Publisher的角色)和Observer< /code> 订阅 Observable 对象并等待数据发出。

Stream 功能相比,Observable 具有明显不同的功能。例如,流一旦关闭就不能重新打开,而 Observable 对象可以再次使用。这是一个示例(参见 ObservableIntro 类和 reuseObservable() 方法):

Observable<Double> observable = Observable.range(1, 5)
     .filter(i -> i % 2 == 0)
     .doOnNext(System.out::println)    //prints 2 and 4 twice
     .map(Math::sqrt);
observable
     .reduce((r, d) -> r + d)
     .subscribe(System.out::println);  
                                    //prints: 3.414213562373095
observable
     .reduce((r, d) -> r + d)
     .map(r -> r / 2)
     .subscribe(System.out::println);  
                                   //prints: 1.7071067811865475

在前面的示例中,从注释中可以看出,doOnNext() 操作被调用了两次,这意味着 observable 对象也发出了两次值,一次

JDK17 |java17学习 第 14 章 Java 标准流

如果我们不希望 Observable 运行两次,我们可以通过添加 cache() 操作来缓存它的数据(参见ObservableIntro 类和 cacheObservableData() 方法):

Observable<Double> observable = Observable.range(1,5)
     .filter(i -> i % 2 == 0)
     .doOnNext(System.out::println)  //prints 2 and 4 only once
     .map(Math::sqrt)
     .cache();
observable
     .reduce((r, d) -> r + d)
     .subscribe(System.out::println); 
                                    //prints: 3.414213562373095
observable
     .reduce((r, d) -> r + d)
     .map(r -> r / 2)
     .subscribe(System.out::println);  
                                   //prints: 1.7071067811865475

如您所见, 看到,第二次使用相同的 Observable 对象利用了缓存数据,从而获得了更好的性能:

JDK17 |java17学习 第 14 章 Java 标准流

RxJava 提供了如此丰富的功能,以至于我们无法在本书中全部回顾。相反,我们将尝试涵盖最流行的功能。 API 描述了可用于使用 Observable 对象的 调用的方法。此类方法也称为 operations(与标准 Java 8 流的情况一样)或 operators(该术语主要用于与反应流有关)。我们将这三个术语——方法、操作和运算符——作为同义词互换使用。

Observable types

谈到 RxJava 2 API(注意它与 RxJava 1 完全不同),我们将使用 在线文档,其中可以在 http://reactivex.io/RxJava/2.x/javadoc 找到/index.html

观察者订阅从可观察对象接收值,该对象可以表现为以下类型之一:

  • 阻塞:这个等到结果返回。
  • 非阻塞:这 异步处理发出的元素。
  • Cold:这 在观察者的请求下发出一个元素。
  • Hot:无论观察者是否订阅,这都会发出元素。

可观察对象 可以是 io.reactivex 包的以下类之一的对象:

  • Observable<T>:这个可以不发射、发射一个或多个元素;它不支持背压。
  • Flowable<T>:这个 可以不发射、发射一个或多个元素;它支持背压。
  • Single<T>:这个 可以发出一个元素或一个错误;背压的概念不适用。
  • Maybe<T>:这 表示延迟计算。它可以发出无值、一个值或错误;背压的概念不适用。
  • Completable:这 表示没有任何值的延迟计算。这表示任务的完成或错误;背压的概念不适用。

这些类中的每一个的对象都可以表现为阻塞的、非阻塞的、冷的或热的 observable。它们在可以发出的值的数量、延迟返回结果或仅返回任务完成标志的能力以及处理背压的能力方面彼此不同。

Blocking versus non-blocking

为了 演示这种行为,我们创建了一个发出五个连续整数的 observable,从 1 开始(参见 BlockingOperators 类和 observableBlocking1() 方法):

Observable<Integer> obs = Observable.range(1,5);

Observable 的所有阻塞方法(操作符)都以“阻塞”开头。例如,blockingLast() 运算符会阻塞管道,直到发出最后一个元素:

Double d2 = obs.filter(i -> i % 2 == 0)
               .doOnNext(System.out::println)  //prints 2 and 4
               .map(Math::sqrt)
               .delay(100, TimeUnit.MILLISECONDS)
               .blockingLast();
System.out.println(d2);                        //prints: 2.0

在这个例子中,我们只选择偶数,打印选择的元素,然后计算平方根并等待 100 毫秒(模拟长时间运行的计算)。这个例子的结果如下:

JDK17 |java17学习 第 14 章 Java 标准流

相同功能的非阻塞版本如下(见BlockingOperators类和observableBlocking1()的后半部分方法):

List<Double> list = new ArrayList<>();
obs.filter(i -> i % 2 == 0)
   .doOnNext(System.out::println)  //prints 2 and 4
   .map(Math::sqrt)
   .delay(100, TimeUnit.MILLISECONDS)
   .subscribe(d -> {
        if(list.size() == 1){
            list.remove(0);
        }
        list.add(d);
   });
System.out.println(list);          //prints: []

我们使用 List 对象来捕获结果,因为您可能还记得,lambda 表达式不允许我们使用非最终变量。

正如您所见,结果列表为空。那是因为管道计算是在没有阻塞(异步)的情况下执行的。我们设置了 100 毫秒的延迟(模拟处理,需要很长时间),但是没有阻塞操作,所以控制下到打印列表内容的下一行,仍然是空的。

为了防止控件过早进入这一行,我们可以在它前面设置一个延迟(参见 BlockingOperators 类和 observableBlocking2( ) 方法):

try {
    TimeUnit.MILLISECONDS.sleep(250);
} catch (InterruptedException e) {
    e.printStackTrace();
}
System.out.println(list);   //prints: [2.0]

请注意,延迟必须至少为 200 毫秒,因为流水线处理两个元素,每个元素都有 100 毫秒的延迟。现在您可以看到列表包含 2.0 的预期值。

从本质上讲,这就是阻塞和非阻塞运算符之间的区别。其他表示 observable 的类也有类似的阻塞操作符。下面是一些阻塞FlowableSingle也许(参见 BlockingOperators 类和 flowableBlocking()singleBlocking()maybeBlocking() 方法):

Flowable<Integer> obs = Flowable.range(1,5);
Double d2 = obs.filter(i -> i % 2 == 0)
        .doOnNext(System.out::println)  //prints 2 and 4
        .map(Math::sqrt)
        .delay(100, TimeUnit.MILLISECONDS)
        .blockingLast();
System.out.println(d2);                 //prints: 2.0
Single<Integer> obs2 = Single.just(42);
int i2 = obs2.delay(100, TimeUnit.MILLISECONDS).blockingGet();
System.out.println(i2);                 //prints: 42
Maybe<Integer> obs3 = Maybe.just(42); 
int i3 = obs3.delay(100, TimeUnit.MILLISECONDS).blockingGet(); 
System.out.println(i3);                 //prints: 42 

Completable 类具有允许我们设置超时的阻塞运算符(参见 BlockingOperators 类和 completableBlocking() 方法):

(1) Completable obs = Completable.fromRunnable(() -> {
         System.out.println("Run");           //prints: Run
         try {
              TimeUnit.MILLISECONDS.sleep(200);
         } catch (InterruptedException e) {
              e.printStackTrace();
         }
    });                                           
(2) Throwable ex = obs.blockingGet();
(3) System.out.println(ex);                   //prints: null
//(4) ex = obs.blockingGet(15, TimeUnit.MILLISECONDS);
// java.util.concurrent.TimeoutException: 
//      The source did not signal an event for 15 milliseconds.
(5) ex = obs.blockingGet(150, TimeUnit.MILLISECONDS);
(6) System.out.println(ex);                   //prints: null
(7) obs.blockingAwait();
(8) obs.blockingAwait(15, TimeUnit.MILLISECONDS);

上述代码的结果 如下图所示:

JDK17 |java17学习 第 14 章 Java 标准流

 

第一个 Run 消息来自第 2 行,以响应阻塞 blockingGet() 方法的调用。第一个 null 消息来自第 3 行。第 4 行抛出异常,因为超时设置为 15 毫秒,而实际处理设置为 100 毫秒的延迟。第二条 Run 消息来自第 5 行,以响应 blockingGet() 方法调用。这次,超时设置为 150 毫秒,比 100 毫秒多,因此该方法能够在超时之前返回。

最后 两行,第 7 和第 8 行,演示了 blockingAwait() 方法在有和没有超时的情况下的用法。此方法不返回值,但允许可观察管道运行其过程。有趣的是,即使超时设置为小于管道完成时间的值,它也不会因异常而中断。显然,它在管道完成处理后开始等待,除非它是稍后将修复的缺陷(关于这一点的文档尚不清楚)。

尽管阻塞操作确实存在(我们将在以下部分讨论每种可观察类型时查看更多),但它们仅用于并且应该仅在无法实现使用非阻塞操作所需功能的情况下使用只要。反应式编程的主要目的是努力以非阻塞方式异步处理所有请求。

Cold versus hot

到目前为止,我们看到的所有 示例都只演示了一个cold observable,它只在处理管道的请求后,在处理完上一个值后提供下一个值。这是另一个示例(参见 ColdObservable 类和 main() 方法):

Observable<Long> cold = 
        Observable.interval(10, TimeUnit.MILLISECONDS);
cold.subscribe(i -> System.out.println("First: " + i));
pauseMs(25);
cold.subscribe(i -> System.out.println("Second: " + i));
pauseMs(55);

我们使用 interval() 方法创建了一个 Observable 对象,该对象表示在每个指定间隔(在我们的例子中,每 10 毫秒)。然后,我们订阅创建的对象,等待 25 毫秒,再次订阅,再等待 55 毫秒。 pauseMs() 方法如下:

void pauseMs(long ms){
    try {
        TimeUnit.MILLISECONDS.sleep(ms);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

如果我们运行前面的示例,输出将类似于以下内容:

JDK17 |java17学习 第 14 章 Java 标准流

如您所见,每个管道都处理了冷可观察对象发出的每个值。

要将 cold observable 转换为 hot,我们使用 publish() 方法,它将 observable 转换为 ConnectableObservable 对象,该对象扩展了 Observable 对象(参见 HotObservable 类和 hot1() 方法):

ConnectableObservable<Long> hot = 
      Observable.interval(10, TimeUnit.MILLISECONDS).publish();
hot.connect();
hot.subscribe(i -> System.out.println("First: " + i));
pauseMs(25);
hot.subscribe(i -> System.out.println("Second: " + i));
pauseMs(55);

如您所见, 必须调用 connect() 方法,以便 ConnectableObservable 对象开始发出值。输出类似于以下内容:

JDK17 |java17学习 第 14 章 Java 标准流

前面的输出显示第二个管道没有收到前三个值,因为它后来订阅了 observable。因此,observable 发出的值与观察者处理它们的能力无关。如果处理落后,并且新值不断出现而之前的值尚未完全处理,则 Observable 类会将它们放入缓冲区中。如果这个缓冲区变得足够大,JVM 可能会耗尽内存,因为正如我们前面提到的,Observable 类无法进行背压管理。

对于这种情况,Flowable 类是 observable 的更好候选者,因为它确实具有处理背压的能力。这是一个示例(参见 HotObservable 类和 hot2() 方法):

PublishProcessor<Integer> hot = PublishProcessor.create();
hot.observeOn(Schedulers.io(), true)
   .subscribe(System.out::println, Throwable::printStackTrace);
for (int i = 0; i < 1_000_000; i++) {
    hot.onNext(i);
}

PublishProcessor 类扩展了 Flowable 并有一个 onNext(Object o) 方法强制它发出传入的对象。在调用它之前,我们已经使用 Schedulers.io() 线程订阅了 observable。我们将在多线程(调度程序)部分讨论调度程序。

subscribe() 方法有几个重载版本。我们决定使用一个接受两个 Consumer 函数的函数:第一个函数处理传入的值,第二个函数处理任何管道抛出的异常操作(它的工作方式类似于 Catch 块)。

如果我们运行前面的示例,它将成功打印前 127 个值,然后抛出 MissingBackpressureException,如下图所示:

JDK17 |java17学习 第 14 章 Java 标准流

异常中的消息提供了一条线索:由于缺少请求而无法发出值。显然,发出值的速率高于消耗它们的速率,而内部缓冲区只能保存 128 个元素。如果我们添加延迟(模拟更长的处理时间),结果会更糟(参见 HotObservable 类和 hot3() 方法):

PublishProcessor<Integer> hot = PublishProcessor.create();
hot.observeOn(Schedulers.io(), true)
   .delay(10, TimeUnit.MILLISECONDS)
   .subscribe(System.out::println, Throwable::printStackTrace);
for (int i = 0; i < 1_000_000; i++) {
    hot.onNext(i);
}

即使是前 128 个元素也不会通过,输出只会有 MissingBackpressureException

为了解决这个问题,必须设置背压策略。例如,让我们删除管道无法处理的每个值(参见 HotObservable 类和 hot4() 方法):

PublishProcessor<Integer> hot = PublishProcessor.create();
hot.onBackpressureDrop(v -> System.out.println("Dropped: "+ v))
   .observeOn(Schedulers.io(), true)
   .subscribe(System.out::println, Throwable::printStackTrace);
for (int i = 0; i < 1_000_000; i++) {
    hot.onNext(i);
}

请注意,策略必须在 observeOn() 操作之前设置,因此它会被创建的 Schedulers.io()< /代码>线程。

输出显示许多发出的值被丢弃。这是一个输出片段:

JDK17 |java17学习 第 14 章 Java 标准流

当我们概述相应的运算符时,我们将在 Operators 部分讨论其他背压策略。

Disposable

注意一个subscribe()方法实际上返回一个Disposable对象 可以查询管道处理是否已经完成并被销毁(参见DisposableUsage类和disposable1() 方法):

Observable<Integer> obs = Observable.range(1,5);
List<Double> list = new ArrayList<>();
Disposable disposable =
     obs.filter(i -> i % 2 == 0)
        .doOnNext(System.out::println)     //prints 2 and 4
        .map(Math::sqrt)
        .delay(100, TimeUnit.MILLISECONDS)
        .subscribe(d -> {
            if(list.size() == 1){
                list.remove(0);
            }
            list.add(d);
        });
System.out.println(disposable.isDisposed()); //prints: false
System.out.println(list);                    //prints: []
try {
    TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
    e.printStackTrace();
}
System.out.println(disposable.isDisposed());  //prints: true
System.out.println(list);                     //prints: [2.0]

此外,可以强制处置管道,从而有效地取消处理(参见 DisposableUsage 类和 disposable2() 方法):

Observable<Integer> obs = Observable.range(1,5);
List<Double> list = new ArrayList<>();
Disposable disposable =
     obs.filter(i -> i % 2 == 0)
        .doOnNext(System.out::println)       //prints 2 and 4
        .map(Math::sqrt)
        .delay(100, TimeUnit.MILLISECONDS)
        .subscribe(d -> {
            if(list.size() == 1){
                list.remove(0);
            }
            list.add(d);
        });
System.out.println(disposable.isDisposed()); //prints: false
System.out.println(list);                    //prints: []
disposable.dispose();
try {
    TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
    e.printStackTrace();
}
System.out.println(disposable.isDisposed()); //prints: true
System.out.println(list);                    //prints: []

如您所见,通过添加对 disposable.dispose() 的调用,我们已经停止了处理,因此即使延迟了 200 毫秒,列表仍然为空(见最后前面示例的行)。

这种强制处理的方法可以用来确保没有跑路的线程。每个创建的 Disposable 对象都可以像在 finally 块中释放资源的方式进行处理。 CompositeDisposable 类帮助我们以协调的方式处理多个 Disposable 对象。

onCompleteonError 事件发生时,管道会自动处理。

例如,您可以使用 add() 方法并将新创建的 Disposable 对象添加到 CompositeDisposable 对象。然后,在必要时,可以在 CompositeDisposable 对象上调用 clear() 方法。它将删除收集到的 Disposable 对象并在每个对象上调用 dispose() 方法。

Creating an observable

已经在我们的示例中看到了一些创建可观察对象的方法。还有很多其他的工厂方法,包括ObservableFlowableSingle可能可完成。但是,并非所有这些接口都提供以下所有方法(请参阅注释;all 表示 列出的接口有它):

  • create():这会通过提供完整的实现(全部)来创建一个 Observable 对象。
  • defer():每次新的 Observer 对象都会创建一个新的 Observable 对象> 订阅(全部)。
  • empty():这将创建一个空的 Observable 对象,该对象在订阅后立即完成(除了 单个)。
  • never():这会创建一个 Observable 对象,该对象不发出任何东西,也什么也不做;它甚至没有完成(全部)。
  • error():这将创建一个 Observable 对象,该对象在订阅时立即发出异常(全部)。
  • fromXXX():这将创建一个 Observable 对象,其中 XXX 可以是 Callable, Future (all), Iterable, Array, Publisher (Observable and Flowable), Action< /em>,或 Runnable也许Completable);这意味着它会根据提供的函数或对象创建一个 Observable 对象。
  • generate():这个 创建一个冷的 Observable 对象,它根据在提供的函数或对象上(仅 ObservableFlowable)。
  • range()、rangeLong()、interval()、intervalRange():这将创建一个 Observable 对象,该对象发出顺序 intlong 值,可能会或可能不会受到指定范围的限制,并以指定的时间间隔(ObservableFlowable)。
  • just():这会根据提供的对象或一组对象创建一个 Observable 对象(除了 可完成)。
  • timer():这将创建一个 Observable 对象,该对象在指定时间后发出一个 0L 信号(全部),然后为 ObservableFlowable 完成。

还有还有很多其他有用的方法,比如repeat()startWith()< /code> 等等。我们只是没有足够的空间来列出所有这些。请参阅在线文档 (http://reactivex.io/RxJava/2.x /javadoc/index.html)。

让我们看一个 create() 方法使用的例子。 Observablecreate()方法如下:

public static Observable<T> create(ObservableOnSubscribe<T> source)

传入的对象必须是 ObservableOnSubscribe<T> 功能接口的实现,它只有一个抽象方法,subscribe()< /代码>:

void subscribe(ObservableEmitter<T> emitter)

ObservableEmitter<T> 接口 包含以下方法:

  • boolean isDisposed():如果处理管道被释放或发射器被终止,则返回 true
  • ObservableEmitter<T> serialize():这提供了调用 onNext()onError() 所使用的序列化算法,和 onComplete(),位于 Emitter 基类中。
  • void setCancellable(Cancellable c):在此发射器上设置 Cancellable 实现(即,具有只有一种方法,cancel())。
  • void setDisposable(Disposable d):在这个发射器上设置一个 Disposable 实现(这是一个有两个方法的接口:isDispose()dispose())。
  • boolean tryOnError(Throwable t):处理错误情况,尝试发出提供的异常,如果发出则返回false不被允许。

创建一个observable,前面的所有接口都可以实现如下(参见CreateObservable类和main() 方法):

ObservableOnSubscribe<String> source = emitter -> {
    emitter.onNext("One");
    emitter.onNext("Two");
    emitter.onComplete();
};
Observable.create(source)
          .filter(s -> s.contains("w"))
          .subscribe(v -> System.out.println(v),
                     e -> e.printStackTrace(),
                    () -> System.out.println("Completed"));
pauseMs(100); 

让我们仔细看看前面的例子。我们创建了一个 ObservableOnSubscribe 函数作为 source 并实现了发射器:我们告诉发射器发射 在第一次调用 onNext() 时发出一个,在第二次调用 Two "literal">onNext(),然后调用onComplete()。我们将 source 函数传递给 create() 方法并构建管道来处理所有发出的值。

为了让它更有趣,我们添加了 filter() 运算符,它只允许您使用 w 进一步传播值特点。此外,我们选择了 subscribe() 方法版本,其中包含三个参数:Consumer onNext消费者 onErrorAction onComplete 函数。第一个在每次下一个值到达方法时调用,第二个在发出异常时调用,第三个在源发出 onComplete() 信号时调用。创建管道后,我们暂停了 100 毫秒,让异步过程有机会完成。结果如下:

JDK17 |java17学习 第 14 章 Java 标准流

如果我们从发射器实现中删除 emitter.onComplete() 行,则只会显示消息 Two

因此,这些 是如何使用 create() 方法的基础。如您所见,它允许完全自定义。实际上,它很少使用,因为创建 observable 的方法要简单得多。我们将在以下部分中回顾它们。

此外,您将在本章其他部分的示例中看到其他工厂方法的示例。

Operators

在每个可观察接口中, 实际上有数百个(如果我们计算所有重载版本)运算符,Observable, Flowable, 单一, 也许< /code> 和 Completable

ObservableFlowable 接口中,方法的数量超过了 500。这就是为什么在本节中,我们将仅提供概述和一些示例,以帮助您在可能选项的迷宫中导航。

我们将所有运算符分为 10 类:转换、过滤、组合、从 XXX 转换、异常处理、生命周期事件处理、实用程序、条件和布尔值、背压和可连接。

请注意,这些不是所有可用的运算符。您可以在在线文档中查看更多信息 (http://reactivex.io/RxJava/ 2.x/javadoc/index.html)。

Transforming

下面的 运算符转换了可观察对象发出的值:

  • buffer():根据提供的参数或使用提供的函数将发出的值收集到包中。它周期性地一次一个地发出这些包。
  • flatMap():这会根据当前的 observable 生成 observables 并将它们插入到当前流中;它是最受欢迎的运营商之一。
  • groupBy():这将当前的 Observable 对象分成若干组 observables (GroupedObservables 对象)。
  • map():这使用提供的函数转换发出的值。
  • scan():这会将所提供的函数与作为先前将相同函数应用于先前值的结果而产生的值结合起来应用于每个值。
  • window():这会发出类似于 buffer() 的值组,但作为可观察对象,每个值都会发出值的子集从原始 observable 开始,然后以 onCompleted() 结束。

以下代码演示了map()flatMap()的使用, 和 groupBy()(参见 NonBlockingOperators 类和 transforming() 方法):

Observable<String> obs = Observable.fromArray("one", "two");
obs.map(s -> s.contains("w") ? 1 : 0)
   .forEach(System.out::print);              //prints: 01
System.out.println();
List<String> os = new ArrayList<>();
List<String> noto = new ArrayList<>();
obs.flatMap(s -> Observable.fromArray(s.split("")))
        .groupBy(s -> "o".equals(s) ? "o" : "noto")
        .subscribe(g -> g.subscribe(s -> {
            if (g.getKey().equals("o")) {
                os.add(s);
            } else {
                noto.add(s);
            }
        }));
System.out.println(os);                  //prints: [o, o]
System.out.println(noto);                //prints: [n, e, t, w]

Filtering 

以下运算符(及其多个重载版本)选择哪些值将继续流经管道:

  • debounce():仅当指定的时间跨度过去且 observable 没有发出另一个值时,才会发出一个值。
  • distinct():仅选择唯一值。
  • elementAt(long n):这只会发出一个在流中具有指定 n 位置的值。
  • filter():这只会发出符合指定条件的值。
  • firstElement():这只会发出第一个值。
  • ignoreElements():这不会发出值;只有 onComplete() 信号通过。
  • lastElement():这只会发出最后一个值。
  • sample():这会发出在指定时间间隔内发出的最新值。
  • skip(long n):这会跳过第一个 n 值。
  • take(long n):这只会发出第一个 n 值。

以下代码展示了上述运算符的一些使用示例(请参阅 NonBlockingOperators 类和 filtering() 方法) :

Observable<String> obs = Observable.just("onetwo")
        .flatMap(s -> Observable.fromArray(s.split("")));
// obs emits "onetwo" as characters           
obs.map(s -> {
            if("t".equals(s)){
               NonBlockingOperators.pauseMs(15);
            }
            return s;
        })
        .debounce(10, TimeUnit.MILLISECONDS)
        .forEach(System.out::print);               //prints: eo
obs.distinct().forEach(System.out::print);      //prints: onetw
obs.elementAt(3).subscribe(System.out::println);   //prints: t
obs.filter(s -> s.equals("o"))
   .forEach(System.out::print);                    //prints: oo
obs.firstElement().subscribe(System.out::println); //prints: o
obs.ignoreElements().subscribe(() -> 
       System.out.println("Completed!"));  //prints: Completed!
Observable.interval(5, TimeUnit.MILLISECONDS)
   .sample(10, TimeUnit.MILLISECONDS)
   .subscribe(v -> System.out.print(v + " ")); 
                                            //prints: 1 3 4 6 8 
pauseMs(50);

Combining

以下运算符(及其多个重载版本)使用多个源可观察对象创建新的可观察对象:

  • concat(src1, src2):这将创建一个 Observable 对象,该对象发出 src1 的所有值,然后是 src2 的所有值。
  • combineLatest(src1, src2, combiner):这将创建一个 Observable 对象,该对象发出由两个源中的任何一个组合发出的值每个源使用提供的 combiner 函数发出的最新值。
  • join(src2, leftWin, rightWin, combiner):这结合了两个可观察对象在leftWinrightWin 时间窗口根据 combiner 函数。
  • merge():这个将多个observables合并为一个;与 concat() 相比,它可能交错它们,而 concat() 从不交错来自不同 observable 的发射值。
  • startWith(T item):这会在从源 observable 发出值之前添加指定的值。
  • startWith(Observable<T> other):这会在从源 observable 发出值之前添加来自指定 observable 的值。
  • switchOnNext(Observable observables) :这将创建一个新的 Observable 对象,该对象发出指定的最近发出的值可观察的。
  • zip():这使用提供的函数组合指定的 observables 的值。

以下代码演示了其中一些运算符的使用(请参阅 NonBlockingOperators 类和 combined() 方法):

Observable<String> obs1 = Observable.just("one")
             .flatMap(s -> Observable.fromArray(s.split("")));
Observable<String> obs2 = Observable.just("two")
             .flatMap(s -> Observable.fromArray(s.split("")));
Observable.concat(obs2, obs1, obs2)
          .subscribe(System.out::print);    //prints: twoonetwo
Observable.combineLatest(obs2, obs1, (x,y) -> "("+x+y+")")
          .subscribe(System.out::print); //prints: (oo)(on)(oe)
System.out.println();
obs1.join(obs2, i -> Observable.timer(5, 
                TimeUnit.MILLISECONDS),i -> Observable.timer(5, 
                TimeUnit.MILLISECONDS),(x,y) -> "("+x+y+")")
                                 .subscribe(System.out::print); 
                 //prints: (ot)(nt)(et)(ow)(nw)(ew)(oo)(no)(eo)
Observable.merge(obs2, obs1, obs2)
          .subscribe(System.out::print);  
                       //prints: twoonetwo obs1.startWith("42")
    .subscribe(System.out::print);         //prints: 42one
Observable.zip(obs1, obs2, obs1,  (x,y,z) -> "("+x+y+z+")")
          .subscribe(System.out::print); 
                                      //prints: (oto)(nwn)(eoe) 

Converting from XXX

这些运算符 非常简单。以下是 Observable 类的 from-XXX 运算符列表:

  • fromArray(T... items):这会从 varargs 创建一个 Observable 对象。
  • fromCallable(Callable<T>supplier):这会从 Callable< 创建一个 Observable 对象/代码>函数。
  • fromFuture(Future<T>future):这会从 Future< 创建一个 Observable 对象/代码>对象。
  • fromFuture(Future :这会从 Observable 对象"literal">Future 对象,其超时参数应用于 future
  • fromFuture(Future :这会从一个 Observable对象创建一个<将超时参数应用于 future 调度程序的 code class="literal">Future 对象(注意建议使用 Schedulers.io();请参阅 多线程(调度程序) 部分)。
  • fromFuture(Future<T> future, Scheduler scheduler):这会从 Observable 对象>Future 指定调度器上的对象(注意推荐Schedulers.io();请看多线程(调度器)< /em> 部分)。
  • fromIterable(Iterable<T> source):这会从可迭代对象(例如,列表 )。
  • fromPublisher(Publisher :这会创建一个 Observable 对象,例如来自 Publisher 对象。

Exceptions handling

subscribe() 运算符 具有接受 Consumer 的重载版本code> 函数,它处理管道中任何地方引发的异常。它的工作方式类似于包罗万象的 try-catch 块。如果您将此函数传递给 subscribe() 运算符,则可以确定这是所有异常都将结束的唯一地方。

但是,如果需要处理流水线中间的异常,值流可以由其余算子恢复处理,也就是算子有抛出异常。以下运算符(及其多个重载版本)可以帮助解决这个问题:

  • onErrorXXX():这会在捕获到异常时恢复提供的序列; XXX 表示操作符的作用:onErrorResumeNext()onErrorReturn()onErrorReturnItem( )
  • retry():这将创建一个 Observable 对象,该对象重复从源发出的排放;如果它调用 onError(),它会重新订阅源 Observable

演示代码如下所示(参见 NonBlockingOperators 类和 exceptions() 方法):

Observable<String> obs = Observable.just("one")
              .flatMap(s -> Observable.fromArray(s.split("")));
Observable.error(new RuntimeException("MyException"))
  .flatMap(x -> Observable.fromArray("two".split("")))
  .subscribe(System.out::print,
      e -> System.out.println(e.getMessage()) 
                                          //prints: MyException
  );
Observable.error(new RuntimeException("MyException"))
          .flatMap(y -> Observable.fromArray("two".split("")))
          .onErrorResumeNext(obs)
          .subscribe(System.out::print);          //prints: one
Observable.error(new RuntimeException("MyException"))
          .flatMap(z -> Observable.fromArray("two".split("")))
          .onErrorReturnItem("42")
          .subscribe(System.out::print);          //prints: 42

Life cycle events handling

这些运算符是,每个都在管道中任何地方发生的特定事件上调用。它们的工作方式类似于 异常处理 部分中描述的运算符。

这些操作符的格式是doXXX(),其中XXX是事件的名称:onComplete, onNextonError 等。并非所有类都可用,其中一些在ObservableFlowable单一可能可完成。但是,我们没有空间列出所有这些类的所有变体,并将我们的概述限制在 Observable 类的生命周期事件处理运算符的几个示例:

  • doOnSubscribe(Consumer onSubscribe) :当观察者订阅时执行。
  • doOnNext(Consumer :当源 observable 调用 onNext
  • doAfterNext(Consumer :这会将提供的 Consumer 函数应用于当前值,然后将其推送到下游。
  • doOnEach(Consumer :这将为每个发出的值执行 Consumer 函数。
  • doOnEach(Observer<T>observer):这会通知 Observer 对象每个发出的值和它发出的终端事件。
  • doOnComplete(Action onComplete):在源 observable 生成 之后执行提供的 Action 函数onComplete 事件。
  • doOnDispose(Action onDispose):在管道被下游处理后执行提供的Action函数。
  • doOnError(Consumer onError) :当发送 onError 事件时执行。
  • doOnLifecycle(Consumer onSubscribe, Action onDispose) :这会调用对应的onSubscribe onDispose 函数对应的事件。
  • doOnTerminate(Action onTerminate):执行提供的Action函数时source observable 生成 onComplete 事件或引发异常(onError 事件)。
  • doAfterTerminate(Action onFinally):在源 observable 生成 之后执行提供的 Action 函数onComplete 事件或异常(onError 事件)被引发。
  • doFinally(Action onFinally):在源 observable 生成 之后执行提供的 Action 函数onComplete 事件或异常(onError 事件)被引发,或者管道被下游处理。

这是演示代码(参见 NonBlockingOperators 类和 events() 方法):

Observable<String> obs = Observable.just("one")
            .flatMap(s -> Observable.fromArray(s.split("")));
obs.doOnComplete(() -> System.out.println("Completed!")) 
        .subscribe(v -> {
            System.out.println("Subscribe onComplete: " + v);
        });        
pauseMs(25);

如果我们运行 这段代码,输出将如下:

JDK17 |java17学习 第 14 章 Java 标准流

您还将在 Multithreading (scheduler) 部分看到这些运算符的其他用法示例。

Utilities

各种 有用的运算符(及其多个重载版本)可用于控制管道行为:

  • delay():延迟指定时间的发射。
  • materialize():这将创建一个 Observable 对象,该对象代表发出的值和发送的通知。
  • dematerialize():这将反转 materialize() 运算符的结果。
  • observeOn():这指定 ObserverScheduler(线程) > 应该观察 Observable 对象(参见 Multithreading (scheduler) 部分)。
  • serialize():这会强制对发出的值和通知进行序列化。
  • subscribe():订阅来自 observable 的发射和通知;各种重载版本接受用于各种事件的回调,包括 onCompleteonError;只有在调用 subscribe() 之后, 值才开始流经管道。
  • subscribeOn():这会将 Observer 订阅到 Observable 对象,使用异步指定的 Scheduler(参见 Multithreading (scheduler) 部分)。
  • timeInterval(), timestamp():这会将发出值的 Observable<T> 类转换为 Observable<Timed<T>>,它依次发出发射之间经过的时间量或相应的时间戳。
  • timeout():这会重复源Observable的发射;如果在指定的时间段后没有发生排放,则会产生错误。
  • using():这将创建一个与 Observable 对象一起自动处理的资源;它的工作原理类似于 try-with-resources 构造。

以下代码包含在管道中使用的其中一些运算符的示例(请参阅 NonBlockingOperators 类和 utilities() 方法):

Observable<String> obs = Observable.just("one")
          .flatMap(s -> Observable.fromArray(s.split("")));
obs.delay(5, TimeUnit.MILLISECONDS)
   .subscribe(System.out::print);           //prints: one
pauseMs(10);
System.out.println(); //used here just to break the line
Observable source = Observable.range(1,5);
Disposable disposable = source.subscribe();
Observable.using(
  () -> disposable,
  x -> source,
  y -> System.out.println("Disposed: " + y) 
                               //prints: Disposed: DISPOSED
)
.delay(10, TimeUnit.MILLISECONDS)
.subscribe(System.out::print);              //prints: 12345
pauseMs(25);

如果我们运行所有 这些示例,输出将如下所示:

JDK17 |java17学习 第 14 章 Java 标准流

如您所见,完成后,管道将 DISPOSED 信号发送到 using 运算符(第三个参数),因此我们作为第三个参数传递的 Consumer 函数可以处理管道使用的资源。

Conditional and Boolean

以下运算符(及其多个重载版本)允许您 评估一个或多个可观察对象或发出的值并更改逻辑相应的处理:

  • all(Predicate criteria):返回 Single true value,也就是说,如果所有发出的值都符合提供的标准。
  • amb():这接受两个或多个源 observables 并仅从开始发射的第一个发射值。
  • contains(Object value):返回 Single true ,也就是说,如果 observable 发出提供的值。
  • defaultIfEmpty(T value):如果源 Observable 没有发出任何内容,则会发出提供的值。
  • sequenceEqual():返回 Single true,即是,如果提供的源发出相同的序列;重载版本允许我们提供用于比较的相等函数。
  • skipUntil(Observable other):这会丢弃发出的值,直到提供的 Observable other 发出一个值。
  • skipWhile(Predicate condition):只要提供的条件保持 true,它就会丢弃发出的值。
  • takeUntil(Observable other):这会在提供的 Observable other 发出值后丢弃发出的值。
  • takeWhile(Predicate condition):这会在提供的条件变为 false 后丢弃发出的值。

后面的 代码包含一些演示示例(请参阅 NonBlockingOperators 类和conditional() 方法):

Observable<String> obs = Observable.just("one")
              .flatMap(s -> Observable.fromArray(s.split("")));
Single<Boolean> cont = obs.contains("n");
System.out.println(cont.blockingGet());          //prints: true
obs.defaultIfEmpty("two")
   .subscribe(System.out::print);                 //prints: one
Observable.empty().defaultIfEmpty("two")
          .subscribe(System.out::print);          //prints: two
Single<Boolean> equal = Observable.sequenceEqual(obs, 
                                 Observable.just("one"));
System.out.println(equal.blockingGet());        //prints: false
equal = Observable.sequenceEqual(Observable.just("one"), 
                                 Observable.just("one"));
System.out.println(equal.blockingGet());         //prints: true
equal = Observable.sequenceEqual(Observable.just("one"), 
                                 Observable.just("two"));
System.out.println(equal.blockingGet());        //prints: false

Backpressure

因此,我们讨论了 并展示了 背压 效应以及 Cold 与热门部分。另一种策略可能如下:

Flowable<Double> obs = Flowable.fromArray(1.,2.,3.);
obs.onBackpressureBuffer().subscribe();
//or
obs.onBackpressureLatest().subscribe();

缓冲策略允许您定义缓冲区大小并提供在缓冲区溢出时可以执行的函数。最新的策略告诉值生产者暂停(当消费者无法按时处理发出的值时)并根据请求发出下一个值。

请注意,背压运算符仅在 Flowable 类中可用。

Connectable 

此类别的运算符 允许我们连接可观察对象,从而实现更精确控制的订阅动态:

  • publish():这会将 Observable 对象转换为 ConnectableObservable 对象。
  • replay():这将返回一个 ConnectableObservable 对象,该对象在每次新的
  • connect():这指示 ConnectableObservable 对象开始向订阅者发送值。
  • refCount():这会将 ConnectableObservable 对象转换为 Observable 对象。

我们在 冷与热 部分中演示了 ConnectableObservable 的工作原理。 ConnectableObservableObservable 之间的一个主要区别是 ConnectableObservable 不会开始发射值直到它的 connect 操作符被调用。

Multithreading (scheduler)

默认,RxJava 是单线程的。这意味着源 observable 及其所有操作符通知同一线程上的观察者 subscribe() 操作符被调用。

Т这里有两个运算符,observeOn()subscribeOn(),它们允许您将单个操作的执行移动到不同的线。这些方法将 Scheduler 对象作为参数,该对象将单个操作安排在不同的线程上执行。

subscribeOn() 运算符 声明哪个调度程序应该发出这些值。

observeOn() 运算符 声明哪个调度程序应该观察和处理值。

Scheduler 类包含创建具有不同生命周期和性能配置的 Scheduler 对象的工厂方法:

  • computation():这将创建一个基于有界线程池的调度程序,其大小可达可用处理器的数量;它应该用于 CPU 密集型计算。使用 Runtime.getRuntime().availableProcessors() 来避免使用比可用处理器更多的这些类型的调度程序;否则,由于线程上下文切换的开销,性能可能会下降。
  • io():这个创建一个基于无界线程池的调度器用于与 I/O 相关的工作,例如当与源的交互本质上是阻塞时处理文件和数据库;避免使用它,因为它可能会旋转太多线程并对性能和内存使用产生负面影响。
  • newThread():这样每次都会创建一个新线程,不使用任何池;这是一种创建线程的昂贵方式,因此您应该确切地知道使用它的原因。
  • single():这将创建一个基于单个线程的调度程序,该线程按顺序执行所有任务;当执行顺序很重要时,这很有用。
  • trampoline():这会创建一个以先进先出的方式执行任务的调度器;这对于执行递归算法很有用。
  • from(Executor executor):这将创建一个基于提供的执行器(线程池)的调度程序,从而可以更好地控制最大线程数及其生命周期。

第 8 章中,< em class="italic">多线程和并发处理,我们谈到了线程池。提醒您,以下是我们讨论的池:

          Executors.newCachedThreadPool();
          Executors.newSingleThreadExecutor();
          Executors.newFixedThreadPool(int nThreads);
          Executors.newScheduledThreadPool(int poolSize);
          Executors.newWorkStealingPool(int parallelism);

如您所见,Schedulers 类的其他一些工厂方法由这些线程池之一提供支持,它们只是线程池的更简单和更短的表达式宣言。为了使示例更简单和更具可比性,我们将只使用 computation() 调度程序。让我们看一下 RxJava 中并行/并发处理的基础知识。

以下代码 是将 CPU 密集型计算委托给专用线程的 示例(请参阅 Scheduler 类和 parallel1() 方法):

Observable.fromArray("one","two","three")
          .doAfterNext(s -> System.out.println("1: " + 
                Thread.currentThread().getName() + " => " + s))
          .flatMap(w -> Observable.fromArray(w.split(""))
                           .observeOn(Schedulers.computation())
              //.flatMap(s -> {             
              //      CPU-intensive calculations go here
              // }  
                .doAfterNext(s -> System.out.println("2: " + 
                Thread.currentThread().getName() + " => " + s))
          )
          .subscribe(s -> System.out.println("3: " + s));
pauseMs(100);

在此示例中,我们决定从每个发出的单词创建一个字符子流,并让一个专用线程处理每个单词的字符。此示例的输出如下所示:

JDK17 |java17学习 第 14 章 Java 标准流

可以看到, 主线程用于发出单词,每个单词 的字符由专门的线。请注意,虽然在本例中,subscribe() 操作的结果顺序对应于发出单词和字符的顺序,但在现实生活中,计算每个值的时间不会相同。因此,不能保证结果会以相同的顺序出现。

如果需要,我们也可以将每个单词发射放在一个专用的非主线程上,这样主线程就可以自由地做其他事情。例如,请注意以下内容(参见 Scheduler 类和 parallel2() 方法):

Observable.fromArray("one","two","three")
        .observeOn(Schedulers.computation())
        .doAfterNext(s -> System.out.println("1: " + 
                Thread.currentThread().getName() + " => " + s))
        .flatMap(w -> Observable.fromArray(w.split(""))
                .observeOn(Schedulers.computation())
                .doAfterNext(s -> System.out.println("2: " + 
                Thread.currentThread().getName() + " => " + s))
        )
        .subscribe(s -> System.out.println("3: " + s));
pauseMs(100);

本例的输出如下:

JDK17 |java17学习 第 14 章 Java 标准流

正如您 所见,主线程不再发出这些单词。

在 RxJava 2.0.5 中,引入了一种新的、更简单的并行处理方式,类似于标准 Java 8 流中的并行处理。使用 ParallelFlowable,可以实现如下相同的功能(参见 Scheduler 类和 parallel3() 方法):

ParallelFlowable src = 
            Flowable.fromArray("one","two","three").parallel();
src.runOn(Schedulers.computation())
   .doAfterNext(s -> System.out.println("1: " + 
                Thread.currentThread().getName() + " => " + s))
   .flatMap(w -> Flowable.fromArray(((String)w).split("")))
   .runOn(Schedulers.computation())
   .doAfterNext(s -> System.out.println("2: " + 
                Thread.currentThread().getName() + " => " + s))
   .sequential()
   .subscribe(s -> System.out.println("3: " + s));
pauseMs(100);

可以看到,ParallelFlowable对象是通过应用parallel()创建的code> 运算符转换为常规的 Flowable 运算符。然后,runOn() 操作符 告诉创建的 observable 使用 computation()< /code> 调度程序来发出值。请注意,没有必要在 flatMap() 操作符中设置另一个调度程序(用于处理字符)。它可以设置在它之外——就在主管道中,这使得代码更简单。结果如下所示:

JDK17 |java17学习 第 14 章 Java 标准流

至于 subscribeOn() 运算符,它在管道中的位置没有任何作用。无论它是放置在哪里,它仍然告诉可观察的调度程序应该发出的值。这是一个示例(参见 Scheduler 类和 subscribeOn1() 方法):

Observable.just("a", "b", "c")
          .doAfterNext(s -> System.out.println("1: " + 
                Thread.currentThread().getName() + " => " + s))
          .subscribeOn(Schedulers.computation())
          .subscribe(s -> System.out.println("2: " + 
               Thread.currentThread().getName() + " => " + s));
pauseMs(100);

结果如下所示:

JDK17 |java17学习 第 14 章 Java 标准流

即使我们改变subscribeOn()操作符的位置,如下例所示,结果并没有改变(查看 Scheduler 类和 subscribeOn2() 方法):

Observable.just("a", "b", "c")
          .subscribeOn(Schedulers.computation())
          .doAfterNext(s -> System.out.println("1: " + 
                Thread.currentThread().getName() + " => " + s))
          .subscribe(s -> System.out.println("2: " + 
               Thread.currentThread().getName() + " => " + s));
pauseMs(100);

最后,这里 是两个运算符的示例(参见 Scheduler 类和 subscribeOnAndObserveOn( ) 方法):

Observable.just("a", "b", "c")
          .subscribeOn(Schedulers.computation())
          .doAfterNext(s -> System.out.println("1: " + 
                Thread.currentThread().getName() + " => " + s))
          .observeOn(Schedulers.computation())
          .subscribe(s -> System.out.println("2: " + 
               Thread.currentThread().getName() + " => " + s));
pauseMs(100);

现在结果显示使用了两个线程:一个用于订阅,另一个用于观察:

JDK17 |java17学习 第 14 章 Java 标准流

总结了我们对 RxJava 的简短概述,它是一个大型且仍在增长的库,具有很多可能性,其中许多我们只是没有在这本书中审查的空间。我们鼓励您尝试学习它,因为反应式编程似乎是现代数据处理的发展方向。

在接下来的章节中,我们将演示如何使用 Spring Boot 和 Vert.x 构建反应式应用程序(微服务)。

Summary

在本章中,您了解了什么是响应式编程以及它的主要概念是什么:异步、非阻塞、响应式等等。反应式流与 RxJava 库一起被简单地介绍和解释,这是第一个支持反应式编程原则的可靠实现。

现在,您可以使用反应式编程编写异步处理代码。

在下一章中,我们将讨论微服务作为创建反应式系统的基础,我们将回顾另一个成功支持反应式编程的库:Vert.x。我们将使用它来演示如何构建各种微服务。

Quiz

  1. 选择所有正确的陈述:
    1. 异步处理总是稍后提供结果。
    2. 异步处理总是能快速提供响应。
    3. 异步处理可以使用并行处理。
    4. 异步处理总是比阻塞调用更快地提供结果。
  2. CompletableFuture 可以不用线程池吗?
  3. java.nio 中的 nio 代表什么?
  4. event 循环是唯一支持非阻塞 API 的设计吗?
  5. RxJava 中的 Rx 代表什么?
  6. Java 类库 (JCL) 的哪个 Java 包支持响应式流?
  7. 从以下列表中选择所有可以表示反应流中的可观察对象的类:
    1. 流动
    2. 可能
    3. CompletableFuture
    4. 单个
  8. 你怎么知道 Observable 类的特定方法(操作符)正在阻塞?
  9. 冷的和热的 observable 有什么区别?
  10. Observablesubscribe() 方法返回一个 Disposable 对象。当在这个对象上调用 dispose() 方法时会发生什么?
  11. 选择创建 Observable 对象的所有方法名称:
    1. interval()
    2. new()
    3. generate()
    4. defer()
  12. 命名两个转换 Observable 运算符。
  13. 命名两个过滤 Observable 运算符。
  14. 说出两种背压处理策略。
  15. 命名两个 Observable 运算符,它们允许您将线程添加到管道处理中。