vlambda博客
学习文章列表

读书笔记《hands-on-cloud-native-applications-with-java-and-quarkus》与Quarkus进行反应性消息传递

Reactive Messaging with Quarkus

在本章中,我们将了解 SmallRye Reactive Messaging 的具体细节,可以在 Quarkus 中使用它来实现 Eclipse MicroProfile Reactive Messaging 规范。在本章结束时,您将为您的数据流应用程序建立一个可靠的开发模型,并知道如何连接到流平台,例如 Apache Kafka 和 ActiveMQ。

在本章中,我们将介绍以下主题:

  • Getting started with Reactive Messaging
  • Streaming messages with Apache Kafka
  • Streaming messages with Advanced Message Queuing Protocol (AMQP)

Getting started with Reactive Messaging

Reactive Streams 是一项旨在为跨异步边界交换数据流提供标准的计划。同时,它保证了接收端不会被强制缓冲任意数量的数据。

Reactive Stream 有几种可用的实现,我们已经了解了如何在 Vert.x 中使用 Reactive Programming。在本章中,我们将使用 SmallRye Reactive Messaging 实现来补充我们的知识,向您展示我们如何将它与流平台(如 Apache Kafka)或消息代理(如 ActiveMQ)集成,而只需进行最少的配置更改。

为了熟悉 MicroProfile Reactive Messaging,我们需要对一些关键概念有适当的了解。首先,MicroProfile Reactive Messaging 是一种使用 CDI bean 驱动消息流向某些特定通道的规范。

消息是包含要流式传输的有效负载的基本接口。 Message 接口被参数化以描述它包含的有效负载的类型。此外,Message 接口包含特定于用于消息交换的代理(例如,Kafka 或 AMQ)的属性和元数据。

另一方面,通道是一个字符串,指示使用哪个消息源或目标。有两种类型的通道:

  • Internal channels are local to the application and are used to implement a multi-step process for messages.
  • Remote channels are connected to remote brokers (such as Apache Kafka or AMQ) through connectors.

由于 MicroProfile Reactive Messaging 完全由 CDI 模型管理,因此使用两个核心注释来指示方法是消息的生产者还是消费者:

  • @Incoming: This annotation is used on a method to indicate that it consumes messages from the specified channel. The name of the channel is added to the annotation as an attribute. Here is an example:
@Incoming("channel")
public void consume(Message<String> s) {   
  // Consume message here:
}

将此注解放置在方法上的效果是,每次将消息发送到该通道时都会调用该方法。从用户的角度来看,传入消息是来自并置的 CDI bean 还是来自远程代理是完全透明的。但是,您可以决定明确说明该方法使用特定类型的消息,例如 KafkaMessage(继承自 Message)。这是一个例子:

@Incoming("channel")
public void consume(KafkaMessage<String> s) {     
   // Consume message here:
}
  • @Outgoing: This annotation indicates that a method publishes messages to a channel. In much the same way, the name of the channel is stated in the annotation's attribute:
@Outgoing("channel")
 public Message<String> produce() {
   // Produce and return a Message implementation
 }

在使用 @Outgoing 注释的方法中,我们返回 Message 接口的具体实现。

请注意,您只能注释一个方法 @Outgoing 用于一个频道。如果您尝试在多个频道中使用同一频道 @Outgoing 注释方法,部署时会发出错误。

您还可以同时使用 @Incoming@Outgoing 注释方法,使其表现得像 消息处理器,它会转换消息的内容数据:

@Incoming("from")
@Outgoing("to")
public String translate(String text) {
   return MyTranslator.translate(text);
}

从前面的示例中,我们可以看到消息从 @Outgoing 流生产者流向 @Incoming 流消费者,并且 Reactive Messaging 透明地连接了两个端点。为了解耦 ProducerConsumer 消息,您可以使用 MicroProfile API 提供的连接器添加一个组件,例如 Apache Kafka。在下一节中,我们将介绍我们的第一个使用 Apache Kafka 的反应式消息传递示例。

Streaming messages with Apache Kafka

Apache Kafka (https://kafka.apache.org/) 是一个可以使用的分布式数据流平台以惊人的速度实时发布、订阅、存储和处理来自多个来源的数据流。

Apache Kafka 可以插入到在系统之间分发数据的流数据管道中,也可以插入到使用该数据的系统和应用程序中。由于 Apache Kafka 减少了对数据共享的点对点集成的需求,因此它非常适合对高吞吐量和可扩展性至关重要的一系列用例。

此外,一旦将 Kafka 与 Kubernetes 结合起来,您将获得 Kafka 的所有优势以及 Kubernetes 的优势,例如:

  • Scalability and high availability: You can easily scale up and down resources with Kubernetes, which means you can automatically determine the pool of resources that Apache Kafka will share with other applications while guaranteeing the high availability of your Kafka cluster at the same time.
  • Portability: By running Kafka with Kubernetes, your cluster of Kafka nodes can span across on-site and public, private, or hybrid clouds, even using different operating systems.

要管理 Kafka 环境,您需要一个名为 ZooKeeper 的软件,该软件可以管理命名和配置数据,以便在分布式系统中提供灵活和健壮的同步。 ZooKeeper 控制 Kafka 集群节点的状态,并跟踪 Kafka 主题、分区和您需要的所有 Kafka 服务。我们不会在本章中介绍 ZooKeeper 管理的细节,尽管值得一提的是它的作用,因为你需要掌握它才能获得 Kafka 管理员的工作。

为了演示 Apache Kafka 和 MicroProfile Streaming 在 Quarkus 上的强大组合,我们将设计一个简单的应用程序来模拟通过购买和销售实时更新的股票交易代码。准备好并开始营业!

Composing our stock trading application

让我们从我们的股票交易应用程序的架构开始。为了建立一个复杂程度最低的应用程序,我们将创建以下通道:

  • An outgoing producer that's bound to the "stock-quote" channel, where messages containing stock orders will be written into a topic named "stocks"
  • An incoming consumer that's bound to the "stocks" channel, which read messages that are available in the "stocks" topic
  • An outgoing producer that's bound to the "in-memory-stream" channel, which broadcasts the new stock quote to all the available subscribers internally
  • An incoming consumer that's bound to the "in-memory-stream" channel, which reads the new stock quote and sends it as SSE to clients

下图描述了将在我们的示例中使用的基本消息流:

读书笔记《hands-on-cloud-native-applications-with-java-and-quarkus》与Quarkus进行反应性消息传递

示例应用程序可以在本书 GitHub 存储库的 Chapter10/kafka 文件夹中找到。我们建议您在继续之前将项目导入您的 IDE。

从这个项目的 pom.xml 文件中可以看到,我们已经包含了以下扩展,以便我们可以将消息流式传输到 Apache Kafka 服务器:

<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>

在深入研究代码之前,我们需要满足一些要求才能在容器中运行 Kafka。正如我们之前提到的,Kafka 需要 ZooKeeper 来管理其集群,因此我们需要启动这两个服务。您可以在开发或测试环境中使用的实用解决方案是使用 Docker Compose,它是一种用于在以 YAML 格式编写的单个配置文件中管理和同步多个容器应用程序的工具。

Docker Compose 的安装在其文档页面(https://docs.docker.com/compose/ install/),但对于 Linux 机器,您可以使用以下 shell 命令安装它的稳定版本:

sudo curl -L "https://github.com/docker/compose/releases/download/1.24.1/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose

完成后,在 docker-compose 工具上申请正确的权限:

chmod a+x /usr/local/bin/docker-compose

现在,您可以验证安装的版本,如下所示:

docker-compose --version

您应该看到以下输出:

docker-compose version 1.24.1, build 1110ad01

现在我们已经完成了初步要求,是时候添加一些代码了!

Coding bean classes

我们将添加的第一个类是QuoteGenerated,它是一个ApplicationScoped CDI bean,它每两秒为一家公司生成随机报价。这是此的代码:

@ApplicationScoped
public class QuoteGenerator {

    private Random random = new Random();

    @Outgoing("stock-quote")
    public Flowable<String> generate() {
       return Flowable.interval(2, TimeUnit.SECONDS)
         .map(tick -> generateOrder(random.nextInt(2), 
          random.nextInt(5),   random.nextInt(100)));
 }

    private String generateOrder(int type, int company, int amount) {
       Jsonb jsonb = JsonbBuilder.create();
       Operation operation = new Operation(type, Company.values()
        [company], amount);
       return jsonb.toJson(operation);
    }
}

此类生成将通过 "stock-quote" 通道写入 Kafka 的消息。该消息包含通过三个参数随机生成的股票订单:

  • The type of order (sale/purchase)
  • The company name
  • The number of shares purchased/sold

在一天结束时,generate 方法将生成一条包含 JSON 字符串的消息,类似于以下内容:

{"amount":32,"company":"Soylent","type":0}

为了让我们更好地了解附件组件,这里是 Company 枚举,其中包含以下一组公司:

public enum Company {
        Acme, Globex, Umbrella, Soylent, Initech
}

我们还需要Operation类的核心部分,它是一个Java POJO,保存着每个股票订单的数据:

public class Operation {

    public static final int SELL = 0;
    public static final int BUY = 1;

    private int amount;
    private Company company;
    private int type;

    public Operation(int type, Company company, int amount) {
        this.amount = amount;
        this.company = company;
        this.type = type;
    }
    // Getters/Setters method omitted for brevity
}

现在,简要介绍一下华尔街 101:每个股票订单都会决定一家公司的报价变化。简单地说,通过卖出股票,公司的价格会下降,而买入订单会使股票的需求增加,这意味着价格会上涨。出售/购买的股票数量最终将决定价格的涨跌幅度。

以下 QuoteConverter 类将完成将股票订单转换为交易中涉及的 Company 的新报价的工作:

@ApplicationScoped
public class QuoteConverter {
    HashMap<String,Double> quotes;

    private Random random = new Random();
    @PostConstruct
    public void init() {
        quotes = new HashMap<>();
        for (Company company: Company.values())
        quotes.put(company.name(), new Double(random.nextInt
        (100) + 50));

    }

    @Incoming("stocks")
    @Outgoing("in-memory-stream")
    @Broadcast
    public String newQuote(String quoteJson) {
        Jsonb jsonb = JsonbBuilder.create();

        Operation operation = jsonb.fromJson(quoteJson, 
         Operation.class);
        double currentQuote = 
         quotes.get(operation.getCompany().name());
        double newQuote;
        double change = (operation.getAmount() / 25);

        if (operation.getType() == Operation.BUY) {
              newQuote = currentQuote + change;
        }
        else  {
            newQuote = currentQuote - change;
        }
        if (newQuote < 0) newQuote = 0;

        quotes.replace(operation.getCompany().name(), newQuote);
        Quote quote = new Quote(operation.getCompany().name(), 
         newQuote);

        return jsonb.toJson(quote);

    }

}

这个类的 init 方法简单地用一些随机值引导每个 Company 的初始引用。

newQuote 方法是我们交易系统的核心。通过读取 JSON 文件中包含的操作数据,使用基本算法生成新的报价:对于任何 25 只股票进行交易,都会对股票的价值产生一个点的影响。返回的 JSON 字符串包装了 Quote 类,该类通过 @Broadcast 广播给 "in-memory-stream" 频道的所有匹配订阅者 注释位于方法之上。

为了完整起见,我们还将包含 Quote Java 类,它将作为 JSON 发送到客户端:

public class Quote {
    String company;
    Double value;

    public Quote(String company, Double value) {
        this.company = company;
        this.value = value;
    }

   // Getters Setters method omitted for brevity
}

在我们的示例中,我们有以下 "in-memory-stream" 频道的订阅者,其中发布了 Quote

@Path("/quotes")
public class QuoteEndpoint {

    @Inject
    @Channel("in-memory-stream") 
    Publisher<String> quote;

    @GET
    @Path("/stream")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    @SseElementType("text/plain")
    public Publisher<String> stream() {

        return quote;
    }
}

QuoteEndpoint 是我们的 REST 端点。其中,我们使用 @Channel 限定符将 "in-memory-stream" 通道注入 bean。这正是反应式世界(由流控制)与命令式世界(CDI bean,它按顺序执行代码)统一的地方。简单地说,这是我们的 bean 能够检索由 Reactive Messaging 管理的通道的地方。

所有前面的组件都需要一个经纪人,我们在这里发布股票报价并阅读它们。这是 application.properties 文件,它将所有这些部分放在一起:

#Kafka destination
mp.messaging.outgoing.stock-quote.connector=smallrye-kafka
mp.messaging.outgoing.stock-quote.topic=stocks
mp.messaging.outgoing.stock-quote.value.serializer=org.apache.kafka.common.serialization.StringSerializer

#Kafka source
mp.messaging.incoming.stocks.connector=smallrye-kafka
mp.messaging.incoming.stocks.topic=stocks
mp.messaging.incoming.stocks.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

第一个块与 Kafka 目的地相关,在流式术语中也称为 sink,是我们编写 QuoteGenerator 生成的股票报价的地方。要跨类节点复制数据,有必要序列化其内容。字节流是操作系统用于 I/O 的标准语言。在我们的例子中,由于数据是 JSON 格式,我们使用 StringSerializer

在第二个块中,我们配置源主题和连接器,我们将股票报价读取为 JSON 序列化流。

现在,我们需要做的就是添加一个能够捕获 SSE 并在格式良好的数据表中显示它的文本的客户端应用程序。为简洁起见,我们将仅添加收集 SSE 的核心 JavaScript 函数:

<script> var source = new EventSource("/quotes/stream"); source.onmessage = function (event) { var data = JSON.parse(event.data); var company = data['company']; var value = data['value']; document.getElementById(company).innerHTML = value; }; </script>

前面的代码将包含在index.html页面中,可以在本章的源代码中找到。让我们看看它的实际效果!在构建应用程序之前,使用以下命令启动 Kafka/ZooKeeper 容器:

docker-compose up

Docker Compose 工具将搜索 docker-compose.yaml 文件,该文件位于本示例的根目录中。在这里,我们已经配置了 Kafka 和 ZooKeeper 容器以便它们启动。成功的引导程序将在控制台底部产生以下输出:

kafka_1      | [2019-10-20 07:05:36,276] INFO Kafka version : 2.1.0 (org.apache.kafka.common.utils.AppInfoParser)
 kafka_1      | [2019-10-20 07:05:36,277] INFO Kafka commitId : 809be928f1ae004e (org.apache.kafka.common.utils.AppInfoParser)
 kafka_1      | [2019-10-20 07:05:36,279] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

您可以通过执行 docker ps 命令来验证 Kafka 和 ZooKeeper 容器是否已启动并正在运行:

docker ps --format '{{.Names}}'

上述命令将显示以下活动进程:

kafka_kafka_1
kafka_zookeeper_1

现在,像往常一样使用以下命令引导应用程序:

mvn install quarkus:dev

应用程序的欢迎页面(可在 http://localhost:8080 获得)将显示正在运行的股票报价行情,如以下屏幕截图所示:

读书笔记《hands-on-cloud-native-applications-with-java-and-quarkus》与Quarkus进行反应性消息传递

列表中的每家公司都将从 N/A 报价开始,直到对其执行随机操作。最后,您会看到前面的页面每两秒更新一次,这是我们在 QuoteGenerator 类中配置的。很酷,不是吗?

完成此示例后,使用以下命令停止所有正在运行的容器:

docker stop $(docker ps -a -q)

一旦 docker-compose 进程终止,前面的命令将显示所有已停止的容器层的列表:

6a538738088f
f2d97de3520a

然后,通过再次执行 docker ps 命令验证 Kafka 和 ZooKeeper 容器是否已停止:

docker ps --format '{{.Names}}'

前面的命令不应该产生任何输出,这意味着没有挂起的 Docker 进程正在运行。

我们刚刚开始使用 Docker Compose 工具。现在,让我们继续并在 OpenShift 上部署完整的应用程序堆栈。

Streaming messages to Kafka in the cloud

为了完成我们的下一个挑战,我们强烈建议使用最新的 OpenShift 4.X 版本。事实上,为了编排 Kafka 和 ZooKeeper 等多种服务,使用 OpenShift 版本 4 会简单得多,因为它建立在 Operators 的概念之上。 Kubernetes Operator 是 在集群上的 Pod 中运行的一个软件,它通过 自定义资源定义 (CRDs引入新的对象类型>)。 CRD 只不过是 Kubernetes 中的一种扩展机制,它允许您为用户定义接口;例如,您可以为 Kafka 服务器定义一个 CRD,这为我们在集群中配置和运行它提供了一种更简单的方法。

此外,Operators 已经有一个公共目录(https://operatorhub.io/),您可以在其中找到现有的运算符或添加您自己的。

您可以访问 https://www.openshift.com/trial/ 来评估 OpenShift 4。在那里,您可以找到多种评估 OpenShift 的替代方案,无论是在云中还是在您自己的机器上。在本章中,我们假设您已经完成了注册过程并且您已经启动并运行了 OpenShift 4。

对于下一个项目,请参考 Chapter10/kafka-openshift 目录,您将在其中找到为 OpenShift 配置的股票交易应用程序以及用于设置和配置 Kafka 集群的 YAML 文件。

Installing Kafka on OpenShift

在 OpenShift 集群上安装和管理 Apache Kafka 集群的最简单方法是通过 Strimzi 项目 (https: //strimzi.io/),可以作为 OpenShift Operator 安装。

首先创建一个名为 kafka-demo 的新 OpenShift 项目。您可以从管理控制台或使用 oc 命令行实用程序创建它,如下所示:

oc new-project kafka-demo

返回的输出将确认项目命名空间已在您的虚拟地址中创建:

Now using project "kafka-demo" on server "https://api.fmarchioni-openshift.rh.com:6443".

根据您的情况,服务器名称会有所不同,具体取决于您登录时选择的帐户名称。

我们建议从 OpenShift Web 控制台继续。在左侧的 Administrator 面板中,选择 OperatorHub,如下图所示:

读书笔记《hands-on-cloud-native-applications-with-java-and-quarkus》与Quarkus进行反应性消息传递

OperatorHub 目录将显示在 OpenShift 主仪表板中。选择 Srimzi 运算符,如以下屏幕截图所示:

读书笔记《hands-on-cloud-native-applications-with-java-and-quarkus》与Quarkus进行反应性消息传递

然后,在以下 UI 中,选择 Install Operator:

读书笔记《hands-on-cloud-native-applications-with-java-and-quarkus》与Quarkus进行反应性消息传递

接下来,您将能够选择是将 Operator 安装在集群的所有可用命名空间中还是仅安装在特定项目中。由于我们不会在其他项目中使用此 Operator,因此只需选中 A specific namespace on the cluster 选项并选择您的项目。您的选择应如下所示:

读书笔记《hands-on-cloud-native-applications-with-java-and-quarkus》与Quarkus进行反应性消息传递

几秒钟后,在主面板中,您将收到已安装 Operator 及其提供的所有 API 的通知:

读书笔记《hands-on-cloud-native-applications-with-java-and-quarkus》与Quarkus进行反应性消息传递

现在您有了可用的 Strimzi Operator 安装 Kafka 集群将是小菜一碟!在 Chapter10/kafka-openshift/strimzi 文件夹中,您将找到以下文件:

  • kafka-cluster-descriptor.yaml: This file contains a Kafka cluster definition based on the Strimzi Operator.
  • kafka-topic-queue-descriptor.yaml: This file defines a resource (a Kafka topic) that we need to configure in our Kafka cluster.

您可以使用 oc 命令安装它们。让我们从集群开始:

oc create -f strimzi/kafka-cluster-descriptor.yaml

上述命令的输出如下:

kafka.kafka.strimzi.io/my-kafka created

现在,等待几秒钟,直到 Kafka 集群启动并运行。您可以使用以下命令检查当前项目中 Pod 的状态:

oc get pods 

然后,等到所有的 Pod 都运行起来,如下:

NAME                                                READY   STATUS    RESTARTS   AGE
my-kafka-entity-operator-58d546cf6c-dw85n           3/3     Running   0          5m50s
my-kafka-kafka-0                                    2/2     Running   1          6m27s
my-kafka-kafka-1                                    2/2     Running   1          6m27s
my-kafka-kafka-2                                    2/2     Running   0          6m27s
my-kafka-zookeeper-0                                2/2     Running   0          7m5s
my-kafka-zookeeper-1                                2/2     Running   0          7m5s
my-kafka-zookeeper-2                                2/2     Running   0          7m5s
strimzi-cluster-operator-v0.14.0-59744f8569-d7j44   1/1     Running   0          7m47s

成功的集群设置将由以下组件组成:

  • Three Kafka cluster nodes in a running state
  • Three ZooKeeper cluster nodes also in a running state

集群的名称 (my-kafka) 已在 kafka-cluster-descriptor.yaml 文件中分配,如下所示:

apiVersion: kafka.strimzi.io/v1beta1
 kind: Kafka
 metadata:
   name: my-kafka

现在,让我们继续添加一个名为 stock 的队列,该队列在 kafka-topic-queue-descriptor.yaml 文件夹中定义。您可以使用以下命令创建它:

oc create -f strimzi/kafka-topic-queue-descriptor.yaml

您将看到以下输出:

kafkatopic.kafka.strimzi.io/stocks created

如果你想对 Kafka 集群有一些见解,你可以检查该主题是否可用。为此,请使用 oc rsh 登录到任何可用的 Kafka 节点:

oc rsh my-kafka-kafka-0

通过这样做,您将可以访问该容器的终端。从那里,执行以下命令:

sh-4.2$ ./bin/kafka-topics.sh --list --zookeeper localhost:2181

您将在控制台中看到的最小输出是 stocks,这是我们的主题名称:

stocks

要连接到 Kafka 集群,我们不会使用 IP 地址或 Pod 名称(在重启时会有所不同)。相反,我们将使用服务名称,这将让您通过别名访问集群。您可以使用以下命令检查可用的服务名称:

oc get services -o=name

上述命令的输出将限制在 name 列。在我们的例子中,它将如下所示:

service/my-kafka-kafka-bootstrap
service/my-kafka-kafka-brokers
service/my-kafka-zookeeper-client
service/my-kafka-zookeeper-nodes

我们感兴趣的服务名称是 my-kafka-kafka-bootstrap,我们将很快将其添加到我们的 Quarkus 项目中。

Shaping up our project for native cloud execution

为了在 OpenShift 上运行我们的项目,我们将对配置文件进行一些最小的更改,以便我们可以访问我们刚刚确定的 Kafka 服务名称。在以下代码中,我们突出显示了必须应用于 application.properties 文件的更改:

mp.messaging.outgoing.stock-quote.connector=smallrye-kafka
mp.messaging.outgoing.stock-quote.topic=stocks
mp.messaging.outgoing.stock-quote.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.stock-quote.bootstrap.servers=my-kafka-kafka-bootstrap:9092
 
  
mp.messaging.incoming.stocks.connector=smallrye-kafka
mp.messaging.incoming.stocks.topic=stocks
mp.messaging.incoming.stocks.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.stocks.bootstrap.servers=my-kafka-kafka-bootstrap:9092

如您所见,在前面的配置中,我们使用 bootstrap.servers 属性来指定 Kafka 服务器列表(host:port)。

通过使用逗号分隔每个条目,可以在配置中添加多个服务器。

在这个例子的源码中,你还会发现所有在消息流中已经序列化的POJO类都用@io.quarkus.runtime.annotations.RegisterForReflection进行了注解,如下:

@RegisterForReflection
public class Quote   { . . . }

事实上,在构建原生可执行文件时,GraalVM 会做一些假设,以删除所有未在代码中直接使用的类、方法和字段。通过反射使用的元素不是调用树的一部分,因此在从本机可执行文件中消除它们时,它们是候选者。由于 JSON 库严重依赖反射来执行它们的工作,我们必须使用 @RegisterForReflection 注释明确告诉 GraalVM 不要排除它们。

这是我们为了将其发布到云而应用的小改动。现在,使用以下命令构建和部署本机应用程序:

#Build the native application
mvn clean package -Pnative -Dnative-image.docker-build=true

#Create a new build for it 
oc new-build --binary --name=quarkus-kafka -l app=quarkus-kafka

#Patch the Docker.native file 
oc patch bc/quarkus-kafka -p "{\"spec\":{\"strategy\":{\"dockerStrategy\":{\"dockerfilePath\":\"src/main/docker/Dockerfile.native\"}}}}"

#Deploy the application in the build
oc start-build quarkus-kafka --from-dir=. --follow
 
# To instantiate the image as new app
oc new-app --image-stream=quarkus-kafka:latest
 
# To create the route
oc expose service quarkus-kafka
请注意,您可以找到前面的脚本,即 deploy-openshift.sh,在 Chapter10/kafka-openshift 文件夹。

执行上述脚本后,使用以下命令验证 quarkus-kafka Pod 是否已启动并正在运行:

oc get pods

输出将证实这一点:

NAME                 READY   STATUS      RESTARTS    AGE
kafka-demo-1-deploy   0/1     Completed   0          30s
kafka-demo-1-p9qdr    1/1     Running     0          36s

可以查看路由地址如下:

oc get routes

该路由将在 HOST/PORT 列输出下:

NAME            HOST/PORT                                                 PATH   SERVICES        PORT       TERMINATION   WILDCARD
quarkus-kafka   quarkus-kafka-kafka-demo.apps.fmarchio-qe.qe.rh-ocs.com          quarkus-kafka   8080-tcp                 None

如果您想通过单击访问您的应用程序,请前往 Administration 控制台并选择 Networking | 路线。然后,点击Route Location,如下图所示:

读书笔记《hands-on-cloud-native-applications-with-java-and-quarkus》与Quarkus进行反应性消息传递

一旦为发出报价配置的超时时间过去,您将在 OpenShift 上看到正在运行的股票交易应用程序:

读书笔记《hands-on-cloud-native-applications-with-java-and-quarkus》与Quarkus进行反应性消息传递

我们已经结束了 Apache Kafka Streaming 的光荣旅程!在下一节中,我们将学习如何处理另一种基于 AMQP 协议的流式消息传递候选解决方案。

Streaming messages with AMQP

如果您在 Java Enterprise 社区工作多年后才刚刚发现 Quarkus,那么您已经熟悉消息代理,它用于允许不同的 Java 应用程序使用 JMS 作为标准协议进行通信。尽管 JMS 是用于实现消息传递系统的强大且成熟的解决方案,但它的主要限制之一是它只专注于 Java。在微服务世界中,使用不同的语言来组成整个系统架构是相当普遍的,因此需要一个独立于平台的解决方案。在这种情况下,AMQP 提供了一系列优势,使其非常适合在分布式系统中实现微服务时实现 Reactive Streams API。

简而言之,以下是 AMQP 协议的一些主要特性:

  • It provides a platform-independent wire-level messaging protocol that allows interoperability across multiple languages and platforms.
  • It is a wire-level protocol, by which data is sent across the network as a stream of bytes.
  • It can achieve high performance while working at low-level byte streams.
  • It supports long-lived messaging and classic message queues.
  • It supports distribution patterns such as round-robin (where the load is equally distributed across servers) and store and forward (where messages are stored at the sender side in a persistence store and are then forwarded to the receiver side).
  • It provides support for transactions (across message destination), as well as distributed transactions using common standards (XA, X/Open, MS DTC).
  • It provides support for data encryption using SASL and TLS protocols.
  • It allows us to control the message flow with metadata.
  • It provides flow control of messages to control backpressure.

为了让我们的应用程序与 AMQP 交互,我们需要一个支持该协议的代理。 Java 企业中常用的解决方案是 Apache Artemis ActiveMQ (https:// activemq.apache.org/components/artemis/),它也兼容 Java Enterprise 面向消息的中间件 (MOM) 跨度>。在下一节中,我们将学习如何在我们的股票报价应用程序中启动和配置它。

Configuring the AMQP broker

为了尽快启动我们的应用程序,我们将使用 Docker Compose 脚本。这将下载合适版本的消息代理并设置一些所需的环境变量,以便我们可以访问代理。

只需使用以下命令启动 amqp 文件夹中包含的 docker-compose.yaml 文件:

docker-compose up

如果启动成功,您应该看到以下输出:

artemis_1  | 2019-10-26 17:20:47,584 INFO  [org.apache.activemq.artemis] AMQ241001: HTTP Server started at http://0.0.0.0:8161
artemis_1  | 2019-10-26 17:20:47,584 INFO  [org.apache.activemq.artemis] AMQ241002: Artemis Jolokia REST API available at http://0.0.0.0:8161/console/jolokia
artemis_1  | 2019-10-26 17:20:47,584 INFO  [org.apache.activemq.artemis] AMQ241004: Artemis Console available at http://0.0.0.0:8161/console

您可以通过执行 docker ps 命令来验证 Kafka 和 ZooKeeper 容器是否已启动并正在运行:

docker ps --format '{{.Names}}'

上述命令将显示以下活动进程:

amqp_artemis_1

现在,让我们配置我们的应用程序以便它可以使用 ActiveMQ。您将在本书 GitHub 存储库的 Chapter10/amqp 文件夹中找到更新后的应用程序。首先,我们需要将 Kafka 的 Reactive Messaging 依赖替换为 AMQP Reactive Messaging 依赖:

<dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-smallrye-reactive-messaging-amqp</artifactId>
</dependency>

在应用程序配置方面,需要对我们的 application.properties 文件进行一些更改。首先,我们需要包含我们在 docker-compose.yaml (quarkus/quarkus) 中设置的用户名和密码:

amqp-username=quarkus
amqp-password=quarkus

然后,我们需要配置 AMQP 连接器,以便我们可以写入 stock-quote 队列,方法是指定队列是持久的(例如,持久化到磁盘并在代理重新启动后仍然存在):

 mp.messaging.outgoing.stock-quote.connector=smallrye-amqp
 mp.messaging.outgoing.stock-quote.address=stocks
 mp.messaging.outgoing.stock-quote.durable=true

相反,我们需要配置 AMQP 连接器,以便它可以从 stocks 队列中读取:

 mp.messaging.incoming.stocks.connector=smallrye-amqp
 mp.messaging.incoming.stocks.durable=true  

现在,我们可以像往常一样使用以下命令引导应用程序:

mvn install quarkus:dev

应用程序的欢迎页面(可在 http://localhost:8080 获得)将显示正在运行的股票报价代码,它现在使用 ActiveMQ 作为其代理。如您所见,对 UI(只是标题)进行了最小的调整,但它准确地掩盖了引擎盖下所做的更改:

读书笔记《hands-on-cloud-native-applications-with-java-and-quarkus》与Quarkus进行反应性消息传递

您可以通过登录 AMQ 管理控制台来了解有关此过程的更多信息,该控制台位于 http://localhost:8161/console。使用配置的凭据 (quarkus/quarkus) 登录后,您可以检查目标队列是否已在可用地址列表中创建:

读书笔记《hands-on-cloud-native-applications-with-java-and-quarkus》与Quarkus进行反应性消息传递

通过选择 stocks 目的地,您可以在管理控制台的主面板中查看更多详细信息,如以下屏幕截图所示:

读书笔记《hands-on-cloud-native-applications-with-java-and-quarkus》与Quarkus进行反应性消息传递

完成后,使用以下命令停止所有正在运行的容器:

docker stop $(docker ps -a -q)

前面的命令将显示所有已停止的容器层的列表,如下所示:

6a538738088f
f2d97de3520a

然后,通过再次执行 docker ps 命令验证 ActiveMQ 容器是否已停止:

docker ps --format '{{.Names}}'

前面的命令不应产生任何输出。现在,让我们在云中测试相同的应用程序堆栈。

Streaming messages to AMQ in the cloud

我们要做的最后一件事是将 Quarkus 应用程序部署到云端,同时使用 AMQ 作为消息传递代理。为此,我们将之前测试过的 ActiveMQ Docker 映像插入 OpenShift(有关此映像的更多详细信息,请参见 GitHub 的 https://github.com/vromero/activemq-artemis-docker)。

首先,创建一个名为 amq-demo 的新项目:

oc new-project amq-demo

输出将确认项目命名空间已在您的虚拟地址中创建:

Now using project "amq-demo" on server "https://api.fmarchioni-openshift.rh.com:6443"

接下来,使用以下命令将 AMQ 服务器部署到您的项目,该命令将设置用户名和密码,以便您可以访问代理:

oc new-app --name=artemis vromero/activemq-artemis:2.9.0-alpine -e ARTEMIS_USERNAME=quarkus -e ARTEMIS_PASSWORD=quarkus -e RESTORE_CONFIGURATION=true

记下 RESTORE_CONFIGURATION=true 环境变量。这是必需的,因为 OpenShift 会在所有声明的卷中自动挂载空卷。由于此行为会影响此映像的 /etc 文件夹(存储配置的位置),我们需要将 RESTORE_CONFIGURATION 环境变量设置为 true

执行new-app命令后,会显示如下输出:

--> Found container image 2fe0af6 (10 days old) from Docker Hub for "vromero/activemq-artemis:2.9.0-alpine"
 
* An image stream tag will be created as "artemis:2.9.0-alpine" that will track this image
* This image will be deployed in deployment config "artemis"
* Ports 1883/tcp, 5445/tcp, 5672/tcp, 61613/tcp, 61616/tcp, 8161/tcp, 9404/tcp will be load balanced by service "artemis"
* Other containers can access this service through the hostname "artemis"
* This image declares volumes and will default to use non-persistent, host-local storage.
You can add persistent volumes later by running 'oc set volume dc/artemis --add ...'
--> Creating resources ...
     imagestream.image.openshift.io "artemis" created
     deploymentconfig.apps.openshift.io "artemis" created
     service "artemis" created
--> Succes

您可以使用 oc 命令检查 Pod 的状态:

oc get pods

以下输出确认 artemis Pod 处于运行状态:

NAME               READY   STATUS      RESTARTS   AGE
artemis-1-deploy   0/1     Completed   0          80s
artemis-1-p9qdr    1/1     Running     0          76s

最后,让我们检查一下服务名称,即 artemis

oc get services -o name

检查返回的输出是否与此处显示的输出匹配:

service/artemis

现在,让我们进行最后的杀戮:我们将部署位于 Chapter10/amqp-openshift 目录中的应用程序。在此文件夹中,您将找到股票交易应用程序,该应用程序已配置为在 AMQ 上流式传输消息。

这是更新后的 application.properties 文件,其中包含 AMQ 用户名和密码,以及运行服务的主机和端口:

amqp-username=quarkus
amqp-password=quarkus
 
# Configure the AMQP connector to write to the `stocks`  address
mp.messaging.outgoing.stock-quote.connector=smallrye-amqp
mp.messaging.outgoing.stock-quote.address=stocks
mp.messaging.outgoing.stock-quote.durable=true
mp.messaging.outgoing.stock-quote.host=artemis
mp.messaging.outgoing.stock-quote.port=5672
 
# Configure the AMQP connector to read from the `stocks` queue
mp.messaging.incoming.stocks.connector=smallrye-amqp
mp.messaging.incoming.stocks.durable=true
mp.messaging.incoming.stocks.host=artemis
mp.messaging.incoming.stocks.port=5672

接下来,我们会将同一文件夹 (Chapter10/amqp-openshift) 中包含的应用程序部署到 OpenShift 中。为方便起见,您可以简单地运行 deploy-openshift.sh 脚本,该脚本可以在同一目录中找到。这是脚本的内容,你应该很熟悉:

#Build native image of the project
mvn clean package -Pnative -Dnative-image.docker-build=true
 
# Create a new binary build
oc new-build --binary --name=quarkus-amq -l app=quarkus-amq

# Patch the native file 
oc patch bc/quarkus-amq -p "{\"spec\":{\"strategy\":{\"dockerStrategy\":{\"dockerfilePath\":\"src/main/docker/Dockerfile.native\"}}}}"

# Add project to the build
oc start-build quarkus-amq --from-dir=. --follow
 
# To instantiate the image
oc new-app --image-stream=quarkus-amq:latest
 
# To create the route
oc expose service quarkus-amq

然后,检查 quarkus-amq Pod 是否处于运行状态:

oc get pods

您将收到的输出证实了这一点:

 NAME                   READY   STATUS      RESTARTS   AGE
 artemis-1-deploy       0/1     Completed   0          9m9s
 artemis-1-p9qdr        1/1     Running     0          9m5s
 quarkus-amq-1-deploy   0/1     Completed   0          14s
 quarkus-amq-1-zbvrl    1/1     Running     0          10s

现在,您可以通过单击路由地址来验证应用程序是否正常工作。只需转到控制台中的 Networking | Routes 路径:

读书笔记《hands-on-cloud-native-applications-with-java-and-quarkus》与Quarkus进行反应性消息传递

输出将几乎相同,除了路线名称,它庆祝你在本书中的最后一个成就:

读书笔记《hands-on-cloud-native-applications-with-java-and-quarkus》与Quarkus进行反应性消息传递

就是这样,伙计们!

Summary

在本章中,我们学习了如何使用 CDI bean 来使用 Reactive Messaging 规范来生成、使用和处理消息。 我们还学习了如何引导和配置 Apache Kafka 和 Active MQ 的代理,使其充当我们 CDI Bean 的分布式流媒体平台。为了使我们的新技能到位,我们创建了一个示例股票交易应用程序,该应用程序最初在开发模式下运行,然后作为本机映像部署在 OpenShift 之上。

现在,我们已经读到本书的结尾,在这里我们了解了 Java 企业应用程序逐渐更新的故事,从单体应用程序到在云中运行的原生微服务。这是一段激动人心的旅程,无疑是一个里程碑,但这并不是我们辛勤工作的结束,这只是这个故事的结束。