我们将添加的第一个类是QuoteGenerated,它是一个ApplicationScoped CDI bean,它每两秒为一家公司生成随机报价。这是此的代码:
此类生成将通过 "stock-quote" 通道写入 Kafka 的消息。该消息包含通过三个参数随机生成的股票订单:
- The type of order (sale/purchase)
- The company name
- The number of shares purchased/sold
在一天结束时,generate 方法将生成一条包含 JSON 字符串的消息,类似于以下内容:
为了让我们更好地了解附件组件,这里是 Company 枚举,其中包含以下一组公司:
我们还需要Operation类的核心部分,它是一个Java POJO,保存着每个股票订单的数据:
现在,简要介绍一下华尔街 101:每个股票订单都会决定一家公司的报价变化。简单地说,通过卖出股票,公司的价格会下降,而买入订单会使股票的需求增加,这意味着价格会上涨。出售/购买的股票数量最终将决定价格的涨跌幅度。
以下 QuoteConverter 类将完成将股票订单转换为交易中涉及的 Company 的新报价的工作:
这个类的 init 方法简单地用一些随机值引导每个 Company 的初始引用。
newQuote 方法是我们交易系统的核心。通过读取 JSON 文件中包含的操作数据,使用基本算法生成新的报价:对于任何 25 只股票进行交易,都会对股票的价值产生一个点的影响。返回的 JSON 字符串包装了 Quote 类,该类通过 @Broadcast 广播给 "in-memory-stream" 频道的所有匹配订阅者 注释位于方法之上。
为了完整起见,我们还将包含 Quote Java 类,它将作为 JSON 发送到客户端:
在我们的示例中,我们有以下 "in-memory-stream" 频道的订阅者,其中发布了 Quote:
QuoteEndpoint 是我们的 REST 端点。其中,我们使用 @Channel 限定符将 "in-memory-stream" 通道注入 bean。这正是反应式世界(由流控制)与命令式世界(CDI bean,它按顺序执行代码)统一的地方。简单地说,这是我们的 bean 能够检索由 Reactive Messaging 管理的通道的地方。
所有前面的组件都需要一个经纪人,我们在这里发布股票报价并阅读它们。这是 application.properties 文件,它将所有这些部分放在一起:
第一个块与 Kafka 目的地相关,在流式术语中也称为 sink,是我们编写 QuoteGenerator 生成的股票报价的地方。要跨类节点复制数据,有必要序列化其内容。字节流是操作系统用于 I/O 的标准语言。在我们的例子中,由于数据是 JSON 格式,我们使用 StringSerializer。
在第二个块中,我们配置源主题和连接器,我们将股票报价读取为 JSON 序列化流。
现在,我们需要做的就是添加一个能够捕获 SSE 并在格式良好的数据表中显示它的文本的客户端应用程序。为简洁起见,我们将仅添加收集 SSE 的核心 JavaScript 函数:
前面的代码将包含在index.html页面中,可以在本章的源代码中找到。让我们看看它的实际效果!在构建应用程序之前,使用以下命令启动 Kafka/ZooKeeper 容器:
Docker Compose 工具将搜索 docker-compose.yaml 文件,该文件位于本示例的根目录中。在这里,我们已经配置了 Kafka 和 ZooKeeper 容器以便它们启动。成功的引导程序将在控制台底部产生以下输出:
您可以通过执行 docker ps 命令来验证 Kafka 和 ZooKeeper 容器是否已启动并正在运行:
上述命令将显示以下活动进程:
现在,像往常一样使用以下命令引导应用程序:
应用程序的欢迎页面(可在 http://localhost:8080 获得)将显示正在运行的股票报价行情,如以下屏幕截图所示:
列表中的每家公司都将从 N/A 报价开始,直到对其执行随机操作。最后,您会看到前面的页面每两秒更新一次,这是我们在 QuoteGenerator 类中配置的。很酷,不是吗?
完成此示例后,使用以下命令停止所有正在运行的容器:
一旦 docker-compose 进程终止,前面的命令将显示所有已停止的容器层的列表:
然后,通过再次执行 docker ps 命令验证 Kafka 和 ZooKeeper 容器是否已停止:
前面的命令不应该产生任何输出,这意味着没有挂起的 Docker 进程正在运行。
我们刚刚开始使用 Docker Compose 工具。现在,让我们继续并在 OpenShift 上部署完整的应用程序堆栈。