vlambda博客
学习文章列表

读书笔记《hands-on-cloud-native-applications-with-java-and-quarkus》用Vert.x统一命令式和反应式

Unifying Imperative and Reactive with Vert.x

传统上,企业应用程序面临的最大挑战之一是将本质上是同步的业务操作与调度这些操作的结果相结合,这些操作的结果也可以是异步的和事件驱动的。在本章中,我们将了解 Vert.x 工具包如何通过将标准命令式编程与可以在运行时创建、更改或组合的异步数据流相结合来解决 Quarkus 应用程序中的这一挑战。在本章结束时,您应该能够熟练使用 Vert.x 在 JVM 上编写响应式应用程序。

在本章中,我们将介绍以下主题:

  • An introduction to Reactive Programming and the Vert.x toolkit
  • Vert.x API models available in Quarkus
  • Managing the Reactive SQL Client with Vert.x

Demystifying Reactive Programming and Vert.x

命令式编程是大多数程序员每天编写代码的方式。等一下命令式编程是什么意思?在简洁的语句中,我们可以说命令式编程意味着代码行按顺序逐句执行,如下例所示:

URL url = new URL("http://acme.com/");
BufferedReader in = new BufferedReader(new InputStreamReader(url.openStream()));

String line;
while ((line = in.readLine()) != null) {
  System.out.println(line);
}
in.close();

如您所见,命令式编程可以使用循环或条件语句来跳转到代码的不同部分。不过,不要被这个愚弄。只要您的调试器清楚地指向代码中的一条语句(因此很明显接下来将执行哪一行),您肯定在使用命令式编程。

虽然命令式编程模型显然更易于理解,但随着连接数量的增加,它会严重影响可伸缩性。事实上,系统线程的数量必须相应增加,导致您的操作系统花费大量 CPU 周期来进行线程调度管理。这就是 Vert.x 发挥作用的地方。

让我们从一个定义开始:Vert.x 到底是什么? Vert.x 不是应用程序服务器或框架,而只是一个工具包,或者,如果您愿意,可以将一组纯 JAR 文件作为依赖项添加到您的项目中。十分简单。您也不需要特定的开发环境或插件来使用 Vert.x 开发应用程序。

Vert.x 的核心是一个响应式工具包,它满足 Reactive Manifesto ( https://www.reactivemanifesto.org/)。这些要求可以概括为以下几点:

  • Responsive: A reactive system needs to be capable of handling requests in a reasonable time.
  • Resilient: A reactive system must be designed to handle failures and deal with them appropriately.
  • Elastic: A reactive system must be able to scale up and down according to loads without compromising the responsiveness of the system.
  • Message-driven: The reactive system's components interact with each other by exchanging asynchronous messages.

基于以上几点,很明显 Vert.x 促进了一种设计和构建分布式系统的新方法,同时将异步性、可伸缩性和反应性注入应用程序的核心。因此,对于我们之前的例子,可以用反应式的方式重写,如下:

vertx.createHttpClient().getNow(80, "acme.com", "", response -> {
   response.bodyHandler(System.out::println);
 });

与前一个示例不同,通过使用 Vert.x,在与 HTTP 服务器建立连接的同时释放正在运行的线程。然后,当收到响应时,处理程序编码为 Lambda 表达式 ( https://docs.oracle.com/javase/tutorial/java/javaOO/lambdaexpressions.html) 被回调以处理响应。

在前面的示例中,每次扩展称为 Verticle 的 Vert.x 基本部署单元时,都可以在代码中使用 vertx 字段。本质上,Verticle 通过事件循环处理传入事件,为异步编程模型奠定了基础。 Verticle 可以用各种语言编写,不仅是 Java,因此您可以混合不同的环境作为更大的反应系统的一部分。

Event Bus 是允许不同 Verticle 相互通信的主要工具,通信是通过异步消息传递进行的。下图显示了 Event Bus 如何适应此架构:

读书笔记《hands-on-cloud-native-applications-with-java-and-quarkus》用Vert.x统一命令式和反应式

事件总线对你使用的数据格式没有任何限制,尽管 JSON 是首选的交换格式,因为它是结构化数据的流行选项,允许用不同语言编写的 Verticles 进行通信。事件总线支持以下通信模式:

  • Point-to-point messaging, which means that messages are routed to just one of the handlers registered at that address.
  • Request-response messaging, which is similar to point-to-point messaging, except that it includes an optional reply handler that can be specified while sending the message so that the recipient can decide whether to reply to the message. If they do so, the reply handler will be called.
  • Publish-subscribe, which allows you to broadcast messages using a publish function. In this case, the event bus will route messages to all the handlers that are registered against that address.

由于存在多种通信模式,因此已经为 Vert.x 设计了几个 API 模型,它们都基于通过回调以异步方式执行流程的概念。下一节讨论 Quarkus 中可用的各种 Vert.x API 模型。

Vert.x API models in Quarkus

Vert.x 提供了一个集成到 Quarkus 中的大型反应式 API 生态系统。更具体地说,Quarkus 使用 Vert.x 作为响应式引擎,为您的应用程序提供单一依赖项:

<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-vertx</artifactId>
</dependency>

这允许您通过简单的代码注入访问托管的 Vert.x 实例:

@Inject io.vertx.core.Vertx vertx;

Vertx 对象是 Vert.x 应用程序的控制中心。这是您进入 Vert.x 领域的通行证,允许您创建异步和非阻塞客户端和服务器,获取对事件总线的引用以及许多其他事情。

但是,在 Quarkus 中使用 Vert.x API 时,没有可供您访问的 Vertx 对象。事实上,Quarkus 提供了三种不同的 Vert.x API:

  • io.vertx.core.Vertx: This is the entry point into the Vert.x core API and allows you to achieve asynchronous and non-blocking clients and servers using callbacks.
  • io.vertx.reactivex.core.Vertx: This API allows us to use observable patterns in our Vert.x applications wherever we can use streams or asynchronous results. Additionally, it allows us to use a large set of data transformation operators on our streams.
  • io.vertx.axle.core.Vertx: This API has been specifically designed to integrate with Quarkus' MicroProfile patterns, proving a solid foundation for sending and receiving asynchronous messages, thus enforcing loose coupling between services.

为了了解 Vert.x 的所有三种不同变体,我们在本书 GitHub 存储库的 Chapter09 文件夹中提供了相同数量的示例。让我们详细看看它们。

Managing the Vert.x core API

为了了解 Vert.x 核心 API,我们将使用我们在 第 4 章将 Web 界面添加到 Quarkus 服务。您可以在本书的 GitHub 存储库中找到 Chapter09/core/customer-service 文件夹示例的源代码。我们建议您将项目导入 IDE。

现在,让我们直接进入代码。由于 Vert.x 核心 API 基于回调机制,为了利用异步和非阻塞 API,对于我们的客户服务示例,我们添加了两个函数,它们将从文件系统中读取和写入 JSON 格式的客户列表.我们应该在哪里写我们的客户名单?答案在 application.properties 文件中,该文件定义了一个名为 file.path 的属性,将写入客户列表:

file.path=/tmp/customer.json

现在,让我们看一下代码。负责提供客户数据的核心类是 CustomerRepository。此时我们将在那里注入一个 io.vertx.core.Vertx 的实例。我们还将注入存储数据的路径:

public class CustomerRepository {
    @Inject io.vertx.core.Vertx vertx;
    @ConfigProperty(name = "file.path" )
    String path;

现在来了有趣的一点,即编写一个使用 vertx 实例的方法来展平文件系统上的客户列表:

public CompletionStage<String> writeFile( ) {

    JsonArrayBuilder jsonArray = javax.json.Json.createArrayBuilder();
    for (Customer customer:customerList) {
        jsonArray.add(javax.json.Json.createObjectBuilder().
                 add("id", customer.getId())
                .add("name", customer.getName())
                .add("surname", customer.getSurname()).build());
    }

    JsonArray array = jsonArray.build();
    CompletableFuture<String> future = new CompletableFuture<>();

    vertx.fileSystem().writeFile(path, Buffer.buffer(array

    .toString()), handler -> {
        if (handler.succeeded()) {
            future.complete("Written JSON file in " +path);
        } else {
            System.err.println("Error while writing in file:
             " + handler.cause().getMessage());
        }
    });
    return future;
}

您可能首先注意到的是 CompletionStage 方法的签名。如果您一直在编写异步 Java 代码,您可能熟悉 java.util.concurrent.Future API。它用于执行以下操作:

  • Check whether the execution has completed via the isDone() method
  • Cancel the execution using the cancel() method
  • Fetch the result of the execution using the blocking get() method

这种方法的主要限制是调用者不能手动完成任务,也不能链接多个 Future 执行。

另一方面,CompletionStage 基于阶段的概念,被认为是多个中间计算,可能是异步的,也可能不是异步的。无论如何,我们必须在达到最终结果之前完成它们。这些中间计算称为完成阶段。

通过使用 CompletionStage 阶段,您可以通过执行以下操作轻松解决 java.util.concurrent.Future API 的限制:

  • Manually completing CompletableStage using complete(T value)
  • Chaining multiple CompletableStage in a block

让我们回到我们的例子。一旦我们从客户列表中创建了 JsonArray,我们就可以使用 Vert.x 核心 API 访问我们的 FileSystem。我们还可以注册一个处理程序,该处理程序负责在文件成功写入后立即完成我们的 CompletionStage

我们看一下readFile方法,它负责读取包含客户列表的文件:

public CompletionStage<String> readFile() {
    CompletableFuture<String> future = new CompletableFuture<>();
    long start = System.nanoTime();

    // Delay reply by 100ms
    vertx.setTimer(100, l -> {
        // Compute elapsed time in milliseconds
        long duration = MILLISECONDS.convert(System.nanoTime() -
         start, NANOSECONDS);

        vertx.fileSystem().readFile(path, ar -> {
            if (ar.succeeded()) {
                String response = ar.result().toString("UTF-8");
                future.complete(response);
            } else {
                future.complete("Cannot read the file: " + 
                ar.cause().getMessage());
            }
        });

    });
    
    return future;
}

readFile 方法故意稍微复杂一些。事实上,我们已经将两个不同的阶段链接到其中。第一个执行一个一次性计时器,它将在 100 毫秒内触发下一次执行。计时器是 Vert.x 的核心构造,应该在您想要延迟执行某些代码或重复执行它的任何地方使用:

vertx.setPeriodic(1000, l -> {
  // This code will be called every second
  System.out.println("timer fired!");
});

在任何情况下,计时器都是您可以在 Vert.x 术语中延迟执行的方式,以代替其他机制,例如 Thread.sleep,它会阻塞事件循环,因此应该从不< /strong>,永远在 Vert.x 上下文中使用。

If you forget our gentle warning, Vert.x will remind you each time you attempt to use a blocking code in the Vert.x context with a log message similar to Thread vertx-eventloop-thread-1 has been blocked for 22258 ms.

readFile 方法的其余部分与 writeFile 方法完全相反;也就是说,它读取 JSON 文件并在文件被读取后立即完成该阶段。

为了向客户端应用程序公开此功能,我们向 CustomerEndpoint 类添加了两个包装器方法,以便通过 REST API 公开这些功能:

@GET
@Path("writefile")
@Produces("text/plain")
public CompletionStage<String> writeFile() {
    return customerRepository.writeFile();
}

@GET
@Path("readfile")
public CompletionStage<String> readFile() {
    return customerRepository.readFile();
}

值得注意的是,writeFile 方法会生成文本信息,因为它应该向调用者返回一个简单的文本消息。另一方面,readFile 方法依赖于类的默认 application/json 格式来显示 JSON 文本文件。

现在,让我们转到客户端。我们可以使用另外两个 AngularJS 处理程序轻松捕获 CompletionStage 事件,这将在结果可用时立即捕获:

$scope.writefile = function () {

$http({
    method: 'GET',
    url: SERVER_URL+'/writefile'
  }).then(_successStage, _error);
};

scope.readfile = function () {

  $http({
    method: 'GET',
    url: SERVER_URL+'/readfile'
  }).then(_successStage, _error);
};

function _successStage(response) {
   _clearForm()
   $scope.jsonfile = JSON.stringify(response.data);
}

这两个功能都将通过在我们的主页上添加两个简单的按钮来触发:

<a ng-click="writefile()" class="myButton">Write File</a>  
<a ng-click="readfile()" class="myButton">Read File</a> 

除了这样做之外,我们还在 HTML 模式中添加了一个 div 部分,该部分将显示信息:

<div ng-app="displayfile" >
        <span ng-bind="jsonfile"></span>
</div>

事不宜迟,让我们使用以下命令构建并运行应用程序:

mvn install quarkus:dev

以下是我们的新 UI,其中包括 Read FileWrite File 按钮。我们刚刚保存了一组 Customer 对象,如下图所示:

读书笔记《hands-on-cloud-native-applications-with-java-and-quarkus》用Vert.x统一命令式和反应式

相反,如果我们点击 Read File 按钮,其内容将以 JSON 格式显示在页面下方的 div 中:

读书笔记《hands-on-cloud-native-applications-with-java-and-quarkus》用Vert.x统一命令式和反应式

我们已经完成了 Vert.x 核心的第一轮。现在,让我们继续看看将 Vert.x 与 ReactiveX (RxJava) 结合使用。

Managing Vert.x API for RxJava

RxJava (https://github.com/ReactiveX/RxJava) 是一个 Java 库,可让您创建异步和使用 Java VM 的 Observable 序列的基于事件的应用程序。为了理解这个框架的核心特性,我们需要定义 ReactiveX 的核心actors,它们如下:

  • Observables:这些代表要发出的数据的来源。一旦订阅者开始收听,observable 就会开始提供数据。一个 observable 可能会发出可变数量的项目,最终会以成功或错误终止。

  • 订阅者:它们监听由可观察对象发出的事件。一个 observable 可以有一个或多个订阅者。

下图显示了这两个组件之间的关系:

读书笔记《hands-on-cloud-native-applications-with-java-and-quarkus》用Vert.x统一命令式和反应式

根据发出的项目数量和项目流的控制,我们可以区分不同类型的可观察对象:

Observable type Description
Flowable<T> Emits 0 or n items and terminates with a success or an error event. Supports backpressure, which allows us to control the rate of source emission.
Observable<T> Emits 0 or n items and terminates with a success or an error event.
Single<T> Emits either one value or an error notification.
Maybe<T> Emits a single item, no items, or an error event. The reactive version of an optional call.
Completable Wraps the deferred computation without any value but only as an indication of completion or an exception.

让我们提供一个极简主义的例子。以下是发出单个项目的 ObservableHello world 示例:

Observable.just("Hello world!").subscribe(System.out::println);

当订阅者收到该项目时,它只是将其打印在输出流上。以下代码略有不同,因为它使用 Flowable 可观察对象来控制项目的流动,以防您以高速率推出数据,这可能会使您的订阅者泛滥:

Flowable.just("Hello world!").subscribe(System.out::println);

RxJava 编程的一个重要概念是操作符;运算符是一个 函数,它定义了一个Observable 以及它应该如何以及何时发出数据流。我们已经遇到过一个,即 just 运算符,它允许您将一个对象或一组对象转换为 Observable。在我们的第一个示例中,对象是 Hello world 字符串。

还有更多操作符,所有这些都可以在RxJava的文档中找到(http://reactivex.io/documentation/operators.html)。例如,您可以使用 distinct 运算符抑制数据流中的重复项:

Observable.just(2, 3, 4, 4, 2, 1)
        .distinct()
        .subscribe(System.out::println);

在这种情况下,订阅者的预期输出如下:

2,3,4,1

您还可以链接另一个运算符以过滤掉不符合模式的项目,如下所示:

Observable.just(1, 2, 3, 4, 5, 6)  
     .distinct()
     .filter(x -> x % 2 == 0)
     .subscribe(System.out::println);

正如您可能已经猜到的那样,输出将进一步限制为以下内容:

2,4

尽管我们几乎没有触及 RxJava 强大功能的表面,但我们对如何将这些概念插入示例应用程序的背景知识很少。

Using RxJava with Quarkus

为了了解 RxJava,我们将浏览本书 GitHub 存储库中 Chapter09/rx2java/customer-service 文件夹中包含的示例。

首先你应该知道的是,为了在 Quarkus 中使用 RxJava,你必须添加一个 Vertx 的实例,它可以在 io.vertx.reativex 下找到。核心 命名空间:

@Inject io.vertx.reactivex.core.Vertx vertx;

话虽如此,在我们的项目中包含 ReactiveX 的主要优势之一是它将大大增强在可观察对象和订阅者之间流动的数据的转换能力。

例如,让我们看一下以下用例:

  • We want to produce a file with a list of customers to be imported in a spreadsheet. Therefore, we will create a plain CSV file out of our customer list.
  • Then, we want to convert the CSV file into any other format is coded in the customer's toString method.

让我们学习如何对 CustomerRepository 类进行正确的更改。正如我们之前提到的,第一个更改是将 io.vertx.core.Vertx 实例替换为对应的 io.vertx.reativex.core.Vertx 实例。然后,我们将对 writeFilereadFile 方法进行一些更改。让我们先从 writeFile 方法开始:

public CompletionStage<String> writeFile() {
    CompletableFuture<String> future = new CompletableFuture<>();
    StringBuffer sb = new StringBuffer("id,name,surname");
    sb.append(System.lineSeparator());

    Observable.fromIterable(customerList)
            .map(c -> c.getId() + "," + c.getName() + "," + 
             c.getSurname() + System.lineSeparator())
            .subscribe(
                    data ->   sb.append(data),
                    error -> System.err.println(error),
                    () ->  vertx.fileSystem().writeFile(path, 
                     Buffer.buffer(sb.toString()), handler -> {
                        if (handler.succeeded()) {
                            future.complete("File written in "+path);
                        } else {
                            System.err.println("Error while 
                            writing in file: " + handler.cause()
                            .getMessage());

                        }
                    }));

    return future;
}

如果您发现我们对 observables 的介绍直观,那么前面的代码看起来并不会过于复杂,尽管 Lambda 表达式有很多。在这里,我们添加了一长串运算符来产生所需的结果。

首先,我们通过使用 Observable.fromIterable 操作符对客户列表进行迭代,生成了一组 observables。由于我们需要生成 CSV 文件,因此我们需要将单个客户字段映射为 CSV 格式,该格式使用逗号 (,) 分隔值。为此,我们使用了 map 运算符。然后,我们完成了转换,结果将是我们选择的格式的 observables 列表。

观察者(或订阅者)要查看 Observable 发出的项目,以及来自 Observable 的错误或已完成通知,它必须订阅该 Observable< /kbd> 使用 subscribe 运算符。简而言之,subscribe 运算符是将订阅者连接到 Observable 的粘合剂。

我们的订阅者将在添加新项目时收到通知,以便将它们附加到已使用 CSV 标头初始化的 StringBuffer 中。如果发生错误,订阅者还将收到通知,最终,当项目流完成时,通过 () 处理程序。在这种情况下,将使用 writeFile 函数将 CSV 文件写入文件系统,该函数在 io.vertx.reativex.core.Vertx 文件系统上下文中也可用。

然后,readFile 方法需要将我们已经写入的 CSV 文件反转为 Customer 对象的表示,由其 toString 提供方法。代码如下:

public CompletionStage<String> readFile() {

    CompletableFuture<String> future = new CompletableFuture<>();
    StringBuffer sb = new StringBuffer();

    vertx.fileSystem().rxReadFile(path)
            .flatMapObservable(buffer -> 
              Observable.fromArray(buffer.toString().split(System.
              lineSeparator())))
            .skip(1)
            .map(s -> s.split(","))
            .map(data-> new Customer(Integer.
             parseInt(data[0]),data[1],data[2]))
            .subscribe(
                    data ->  sb.append(data.toString()),
                    error -> System.err.println(error),
                    () -> future.complete(sb.toString()));

    return future;

}

在这里,我们必须熟悉更多的运算符。由于我们想逐行读取和处理文件,因此我们使用 flatMapObservable 运算符来生成我们的多个 Observable 实例数组。在实践中,此运算符允许我们生成一组 Observable 实例,这些实例是由 CSV 文件中的行中的单个项目发出的函数的结果。

我们使用字符串类的 split 方法方便地将文件拆分为一个数组。然后,我们使用 skip 运算符跳过第一项,即 CSV 标头。之后,我们对数据应用了两个 map 转换:

  • The first one creates an array of string objects, out of the CSV line, using the comma (,) as a separator
  • Next, we created an instance of the Customer object using the data arriving from the string array

现在我们已经收集了目标数据,即 Customer 对象,我们准备好流式传输这些数据,最终将由订阅者收集。订阅者依次接收每个项目并将其输出的 toString() 添加到 StringBuffer。您可以在 toString() 方法中包含任何格式,但为了简单起见,我们让 IDE (IntelliJ IDEA) 自行生成它:

public String toString() {
    return "Customer{" +
            "id=" + id +
            ", name='" + name + '\'' +
            ", surname='" + surname + '\'' +
            '}';
}

我们要做的最后一件事是设置 readFile 的媒体类型,使其与 toString 数据的格式一致。由于我们正在生成简单的文本,它将如下所示:

@GET
@Path("readfile")
@Produces("text/plain")
public CompletionStage<String> readFile() {
    return customerRepository.readFile();
}

现在,您可以运行应用程序并检查新结果。添加一些客户并单击 Write File 按钮后,您的 UI 应如下所示:

读书笔记《hands-on-cloud-native-applications-with-java-and-quarkus》用Vert.x统一命令式和反应式

然后,通过点击 Read File 按钮,下部 HTML div 将包含每个客户的 toString 数据:

读书笔记《hands-on-cloud-native-applications-with-java-and-quarkus》用Vert.x统一命令式和反应式

如您所见,尽管 UI 日志极简,但仍有大量工作在后台完成以管理不同格式的数据转换。

那是我们对 Vert.x 和 Quarkus 的第二次实现。我们仍然要处理第三个野兽,即io.vertx.axle.core.Vertx

Decoupling events with Vert.x axle libraries

通常,我们希望将服务入口点(适配器)与作为应用程序一部分的业务逻辑分开。一种常见的模式是将服务保存在一个不同的 bean 中,该 bean 被注入到我们的服务 REST 入口点中。然而,在接近反应式编程时,我们可以通过将 Vert.x 事件总线引入图片来解耦更多的组件。

在这种架构中,组件通过向虚拟地址发送消息来相互通信。要管理消息的分发,可以使用以下组件:

  • EventBus: This is a lightweight distributed messaging system that allows communication between the different parts of your application in a loosely coupled way.
  • Message: This contains data that is received from the Event Bus in a handler. Messages have a body and a header, both of which can be null. By adding a reply handler in the message, it is possible to apply a request-response pattern to the communication.

让我们了解如何使用 Chapter09/axle/customer-service 文件夹中的示例应用程序来检测简单的消息传递模式。

Adding an EventBus layer to Quarkus applications

为了在我们的应用程序中包含分布式对等消息传递模式,我们需要将 EventBus 实例注入到 CDI bean 中,它将充当接收器:

@Inject EventBus bus;

在我们的例子中,我们会将 EventBus 添加到 CustomerEndpoint 类中。

请注意,每个 Vert.x 实例只有一个事件总线实例。

现在,在同一个类中,让我们创建一个新的端点方法,它将负责发送消息:

@GET
@Path("/call")
@Produces("text/plain")
public CompletionStage<String> call(@QueryParam("id") Integer customerId) {
    return bus.<String>send("callcustomer", 
    customerRepository.findCustomerById(customerId))
            .thenApply(Message::body)
            .exceptionally(Throwable::getMessage);

我们通过 "callcustomer" 地址在总线上传递消息。消息正文包含 Customer 对象,该对象由 findCustomerById 方法检索。如果发生错误,将抛出带有错误 getMessage 内容的 throwable。

现在,我们需要一个消息消费者,所以我们将添加另一个名为 CustomerService 的类,其中包含一个注释为 @ConsumeEvent 的方法:

@ApplicationScoped
public class CustomerService {

        @ConsumeEvent("callcustomer")
        public String reply(Customer c) {
            return "Hello! I am " + c.getName() + " " 
             +c.getSurname() + ". How are you doing?";
        }
}

@ConsumeEvent 注释中,我们指定了消费消息的地址。归根结底,我们只是返回包含来自客户的消息的响应。

要完成循环,我们需要进行以下更改:

  • We need to add one more button to the index.html page:
<a ng-click="call( customer )" class="myButton">Call</a>
  • We need to add one more AngularJS controller to handle the response, which will display (in an alert window) the message:
$scope.call = function (customer) {

  $http({
    method: 'GET',
    url: SERVER_URL+'/call/?id='+customer.id
  }).then(_callCustomer, _error);
};

 function _callCustomer(response) {
   window.alert(response.data);
}

现在我们已经添加了所有内容,让我们运行我们的应用程序。

Rolling up the application

当所有更改都到位后,您应该能够看到 Call 按钮已添加到每个客户的行中,如下面的屏幕截图所示:

读书笔记《hands-on-cloud-native-applications-with-java-and-quarkus》用Vert.x统一命令式和反应式

当您单击 Call 按钮时,将通过事件总线发送一条消息。消耗完后,您应该会看到以下响应:

读书笔记《hands-on-cloud-native-applications-with-java-and-quarkus》用Vert.x统一命令式和反应式

除了点对点消息传递之外,您还可以使用 Vert.x 轴 API 流式传输 服务器端事件 (SSE)。

Streaming SSE with Vert.x

传统上,Web 应用程序能够向服务器发送请求以接收响应。这是标准范式。但是,对于服务器发送事件,服务器应用程序可以随时通过将事件(消息)推送到网页来将新数据发送到网页。这些传入消息被视为与网页内的数据相结合的事件。

现在,让我们演示如何使用 Vert.x 轴 API 在 Quarkus 中流式传输 SSE。我们项目中包含的以下类负责每两秒向主页发送一次 SSE:

@Path("/streaming")
public class StreamingEndpoint {

    @Inject io.vertx.axle.core.Vertx vertx;

    @Inject CustomerRepository customerRepository;
    @GET
    @Produces(MediaType.SERVER_SENT_EVENTS)
    public Publisher<String> stream() {

        return
                ReactiveStreams.fromPublisher
                (vertx.periodicStream(2000).
                 toPublisher())
                        .map(l -> String.format
                    ("Number of Customers %s . 
                    Last one added: %s %n",customerRepository.
                    findAll().size(),
                                customerRepository.findAll().size()
                                 > 0 ? 
                                (customerRepository.findAll().
                                get(customerRepository.findAll().
                                size() -1)).toString()  : "N/A"))
                        .buildRs();


    }
}

首先,请注意我们正在使用 io.vertx.axle.core.Vertx 的实例来处理事件流。然后,绑定到 "/streaming" URI 的 REST 方法使用不同的媒体类型进行注释,即 SERVER_SENT_EVENTS。该方法返回一个发布者类型,这是发布反应流所必需的。

通过使用 ReactiveStreams.fromPublisher 方法,我们根据 vert.xperiodicStream 指定的频率推送流事件。在我们的例子中,消息将每两秒发送一次。在发送实际事件之前,内容将由 map 运算符进行转换,该运算符将创建一条带有一些 Customer 统计信息的消息,例如客户数量和最后一个添加。通过使用三元运算符,我们设法将这个登录压缩到一个语句中,但代价是稍微复杂的可读性。

这就是您在服务器端所需要的一切。在客户端,我们做了一些其他的调整:

  • We added one more button to trigger the SSE:
<a ng-click="stats()" class="myButton">Stats</a></div>
  • We added a callback method, in JavaScript, to handle the event that was received:
$scope.stats = function () {

  var eventSource = new EventSource("/streaming");
  eventSource.onmessage = function (event) {
  var container = document.getElementById("divcontainer");
  var paragraph = document.createElement("p");
  paragraph.innerHTML = event.data;
  container.appendChild(paragraph);
};
  • We added a div where messages will be displayed:
<div id="divcontainer" style="width: 800px; height: 200px; overflow-y: scroll;">

当我们运行更新后的应用程序时,预期的结果是 UI 底部包含 Stats 按钮:

读书笔记《hands-on-cloud-native-applications-with-java-and-quarkus》用Vert.x统一命令式和反应式

较低的 div 将根据客户列表中包含的数据每两秒更新一次。

Canceling events

值得一提的是,可以通过保留对 Subscription 对象的引用来取消 SSE 订阅,以便您可以随时取消订阅:

publisher
   .subscribe(new Subscriber<String>() {
     volatile Subscription subscription;
 
     @Override
     public void onSubscribe(Subscription subscription) {
       this.subscription = subscription;
     }
 
     @Override
     public void onNext(String s) {
       // when no more event is needed
       subscription.cancel();
     }
 
     @Override
     public void onError(Throwable throwable) {
       // handle error
     }
 
     @Override
     public void onComplete() {
       // handle complete
     }
});

在前面的代码片段中,当事件发出时,观察者的 onNext 方法与项目一起被调用,而 onComplete 方法被立即调用。另一方面,当回调失败时,会调用观察者的 onError 方法。在任何回调方法中,我们都可以使用订阅对象上的 cancel 方法取消订阅。

这是我们使用响应式事件的最后一项工作,但不是 Vert.x 的最后一项工作。我们还有一件事要介绍:Quarkus 的反应式 SQL 客户端。这是一个专注于以最小开销实现可扩展 JDBC 连接的 API。

Managing the Reactive SQL Client

反应式 SQL 客户端是一个 API,允许您使用 Vert.x 的反应式和非阻塞特性来访问关系数据库。这会在您访问数据的方式方面带来一些变化。让我们把成本和收益放在桌面上:

  • On one side, you will need to use SQL statements to enable your RDBMS to access data, instead of the abstract HQL. Also, automatic mapping between Java classes and DB is not available anymore since Hibernate is out of the game here.
  • On the other hand, you will be able to use a fully event-driven, non-blocking, lightweight alternative to stream the result of your SQL statements.

根据您的要求,您可以坚持使用 Hibernate 的 API 或切换到 Reactive 的 SQL 客户端。假设您很勇敢并且想切换到 Reactive SQL。为此,您需要配置您的应用程序,以便它可以使用 PostgreSQL 反应式客户端 API。

Configuring your application to use the PostgreSQL reactive client

为了深入研究响应式客户端 API,请参阅本书 GitHub 存储库中 Chapter09/pgpool 文件夹中包含的示例。由于此示例不会使用 PostgreSQL JDBC 驱动程序,因此添加了以下依赖项作为替代:

<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-reactive-pg-client</artifactId>
</dependency>

我们添加的另一个配置是 JDBC URL,它需要采用以下格式:

vertx-reactive:postgresql://<Host>:<Port>/<DBName>

因此,在我们的示例中,我们将在 application.properties 中添加此设置:

quarkus.datasource.url=vertx-reactive:postgresql://localhost:5432/quarkusdb
quarkus.datasource.username=quarkus
quarkus.datasource.password=quarkus

现在,让我们看看我们的应用程序中的变化。为了使事情尽可能简单,我们对示例进行了分解,使其仅使用 CustomerEndpointCustomer POJO 类。

让我们从 CustomerEndpoint 开始,它需要访问 io.vertx.axle.pgclient.PgPoolio.vertx.core.Vertx

public class CustomerEndpoint {

    @Inject PgPool client;
    @Inject Vertx vertx;

在同一个类中,我们添加了一个 init 方法来在启动时创建一些数据:

@PostConstruct
private void initdb() {

    client.query("DROP TABLE IF EXISTS CUSTOMER")
            .thenCompose(r -> client.query("CREATE SEQUENCE IF
              NOT EXISTS  customerId_seq"))
            .thenCompose(r -> client.query("CREATE TABLE CUSTOMER
             (id SERIAL PRIMARY KEY, name TEXT NOT NULL,surname
             TEXT NOT NULL)"))
            .thenCompose(r -> client.query("INSERT INTO CUSTOMER
             (id, name, surname) VALUES ( nextval('customerId
              _seq'), 'John','Doe')"))
            .thenCompose(r -> client.query("INSERT INTO CUSTOMER
              (id, name, surname) VALUES ( nextval('customerId
                _seq'), 'Fred','Smith')"))
            .toCompletableFuture()
            .join();
}

作为查询的结果,Pgpool 的 query 方法返回一个带有 RowSet 数据的 CompletionStage 对象。请注意我们如何链接多个语句来生成一个 CompletableFuture,它会在另一个线程中分拆执行。在这个简单的方法中,您可以体验到响应式 SQL 客户端在创建事件驱动的非阻塞 SQL 执行时的强大功能。最后执行CompletableFuturejoin方法,最终得到所有语句的组合结果。

CustomerEndpoint 的其他方法使用相同的组合模式将 CRUD 语句的执行委托给 Customer 类:

@GET
public CompletionStage<Response> getAll() {
    return Customer.findAll(client).thenApply(Response::ok)
            .thenApply(ResponseBuilder::build);
}


@POST
public CompletionStage<Response> create(Customer customer) {
    return customer.create(client).thenApply(Response::ok)
            .thenApply(ResponseBuilder::build);
 }

@PUT
public CompletionStage<Response> update(Customer customer) {
    return customer.update(client)
            .thenApply(updated -> updated ? Status.OK : 
             Status.NOT_FOUND)
            .thenApply(status -> Response.status(status).build());
}

@DELETE
public CompletionStage<Response> delete(@QueryParam("id") Long customerId) {
    return Customer.delete(client, customerId)
            .thenApply(deleted -> deleted ? Status.NO_CONTENT : 
             Status.NOT_FOUND)
            .thenApply(status -> Response.status(status).build());
}

Customer 类中,我们编写了执行 CRUD 操作所需的所有方法。第一个,create,通过使用 PreparedStatementCUSTOMER 表中执行 INSERT,它应用包含的元组姓名和姓氏作为参数:

public CompletionStage<Long> create(PgPool client) {
    return client.preparedQuery("INSERT INTO CUSTOMER (id, name, 
     surname) VALUES ( nextval('customerId_seq'), $1,$2)
     RETURNING (id)", Tuple.of(name,surname))
            .thenApply(pgRowSet -> pgRowSet.iterator()
            .next().getLong("id"));
}

以同样的方式,update 方法通过 PreparedStatement 执行 UPDATE,并将客户的数据元组作为参数应用:

public CompletionStage<Boolean> update(PgPool client) {
    return client.preparedQuery("UPDATE CUSTOMER SET name = $1,
     surname = $2 WHERE id = $3", Tuple.of(name, surname, id))
            .thenApply(pgRowSet -> pgRowSet.rowCount() == 1);
}

要删除客户,delete 方法执行 PreparedStatement,它使用客户 id 作为参数:

public static CompletionStage<Boolean> delete(PgPool client, Long id) {
    return client.preparedQuery("DELETE FROM CUSTOMER WHERE
     id = $1", Tuple.of(id))
            .thenApply(pgRowSet -> pgRowSet.rowCount() == 1);
}

最后,findAll 方法用于从数据库中查询客户列表并将其作为 Java 列表返回:

public static CompletionStage<List<Customer>> findAll(PgPool client) {
    return client.query("SELECT id, name, surname FROM CUSTOMER 
    ORDER BY name ASC").thenApply(pgRowSet -> {
        List<Customer> list = new ArrayList<>(pgRowSet.size());
        for (Row row : pgRowSet) {
            list.add(from(row));
        }
        return list;
    });
}

我们已经完成了应用程序的编码。让它运行起来吧!

Running the example

在运行示例之前,请确保您已引导 PostgreSQL 数据库;否则,部署应用程序时初始语句将失败:

$ docker run --ulimit memlock=-1:-1 -it --rm=true --memory-swappiness=0 --name quarkus_test -e POSTGRES_USER=quarkus -e POSTGRES_PASSWORD=quarkus -e POSTGRES_DB=quarkusdb -p 5432:5432 postgres:10.5

然后,像往常一样使用以下命令运行应用程序:

mvn  install quarkus:dev

UI 隐藏了我们已经从普通对象切换到真实数据库的事实,尽管您可以从页面标题(现在是 Quarkus Vert.X PgPool ExampleQuarkus Vert.X PgPool Example

读书笔记《hands-on-cloud-native-applications-with-java-and-quarkus》用Vert.x统一命令式和反应式

但是,如果您登录到数据库容器,您将确认 Customer 表已创建及其项目。让我们为此目的找到容器 ID:

$ docker ps
 CONTAINER ID        IMAGE               COMMAND                  CREATED             STATUS              PORTS                    NAMES
 6b1b13b0547f        postgres:10.5       "docker-entrypoint..."   2 minutes ago       Up 2 minutes        0.0.0.0:5432->5432/tcp   quarkus_test

现在,让我们使用 docker exec 命令进入 PostgreSQL 容器的 bash shell:

$ docker exec -it 6b1b13b0547f /bin/bash

 root@6b1b13b0547f:/# psql -U postgres
 psql (10.5 (Debian 10.5-2.pgdg90+1))
 Type "help" for help.

您可以使用 \dt 快捷方式检查关系列表:

 postgres=# \dt;
 List of relations
  Schema |   Name   | Type  |  Owner   
 --------+----------+-------+----------
  public | customer | table | postgres
 (1 row)

我们也可以查询Customer表的行,如下:

postgres=# select * from customer;
  id | name | surname
 ----+------+---------
   5 | John | Doe
   6 | Fred | Smith
 (2 rows)

伟大的!我们已经使用 Quarkus 完成了我们的第一个反应式 SQL 应用程序。这也标志着我们进入 Vert.x 土地的旅程结束。

Summary

从这个反应式编程的旋风之旅中,您应该精通在 JVM 上编写 反应式应用程序。您的编程技能现在包括如何使用 Vert.x 核心 API 来编写异步和非阻塞服务。您还学习了如何使用 Vert.x Reactive API 将 Observable 模式与流或异步结果相结合。然后,我们快速探索了最后一个 Vert.x 范式 Vert.x Axle,它允许不同的 bean 使用异步消息进行交互并强制松散耦合。最后,我们使用响应式 API 来访问使用 Vert.x 的 PostgreSQL 客户端扩展的关系数据库。

尽管您已经掌握了反应式编程 API,但请注意,它的大部分功能只能在构建实时数据管道和流数据时才能发挥出来。我们将在下一章中介绍这些内容。