读书笔记《developing-java-applications-with-spring-and-spring-boot-ebook》处理服务器发送的事件
在第 4 章, Kotlin 基础知识和 Spring Data Redis 和 第 5 章,响应式 Web 客户端,我们创建了两个微服务。第一个负责在 Redis 上保存跟踪数据并触发第二个微服务,该微服务将使用 Twitter 流。这个过程是异步发生的。
在本章中,我们将创建另一个微服务,它将使用 Twitter Gathering 生成的数据并通过 REST API 公开它。可以按文本内容过滤推文。
我们使用 Server-Sent Events (SSE );我们创建了一个响应式 REST 客户端来使用它。现在,是时候为 SSE 创建我们的实现了。我们将使用 RabbitMQ 队列并将数据推送到我们连接的客户端。
我们将看看 SSE 并了解为什么这个解决方案非常适合我们的几个微服务。
在本章的最后,我们将对在 Spring 生态系统中使用 SSE 充满信心。
在本章中,我们将学习以下内容:
- Implementation of SSE endpoints with the Spring Framework
- Consuming RabbitMQ using the Reactor Rabbit client
现在,我们将创建我们的 last 微服务。它将为我们连接的客户(在本例中为消费者)推送由 Twitter Gathering 过滤的推文。
在本章中,我们将使用 Spring Initializr 页面来帮助我们创建漂亮的新项目。让我们创造。
如您所见,Spring Initializr 页面是创建 Spring 项目的一种伙伴。让我们再次使用它并创建一个项目:
转到 https://start.spring.io并使用以下内容填写数据截屏:
我们选择了 Reactive Web
依赖项;我们还将继续使用 Kotlin 作为编程语言。最后,点击 Generate Project
按钮。很好,对我们来说已经足够了。
Spring Initializr 中没有显示一些缺失的依赖项。我们需要手动设置这些依赖。我们将在下一节中完成该任务。让我们去那里。
服务器发送事件 (SSE) 是一种将数据流从服务器发送到客户端的标准方式。在下一节中,我们将学习如何使用 Spring 框架来实现它。
此外,我们还将了解 SSE 和 WebSockets 之间的主要区别。
HTTP是OSI模型中的一个应用层protocol。应用层是 OSI 模型中表示的最后一层。这意味着该层更接近用户界面。该层的主要目的是发送和接收用户输入的数据。通常,它通过用户界面发生,也称为应用程序,例如文件传输和发送电子邮件。
应用层有多种协议,例如域名服务 (DNS),将域名转换为 IP 地址,或 SMTP,其主要目的是将电子邮件传送到邮件管理器应用程序。
应用层直接与电子邮件客户端等软件交互;与硬件部分没有交互。它是 OSI 模型的最后一层,也是最接近最终用户的层。
所有这些层都处理软件,这意味着不关心OSI模型中表示的物理部分。
Note
有关 OSI 模型的更详细说明,请参见: https://support.microsoft.com/en-us/help/103884/the-osi-model-s-seven-layers-defined - 和功能解释。
以下是 OSI 模型表示:
HTTP 协议使用 TCP 协议作为传输通道。然后,它将建立连接并开始在通道上传输数据。
TCP 协议是流协议和全双工通道。这意味着服务器和客户端可以通过连接发送数据。
HTTP 协议是一个请求-响应模型,客户端提交消息(HTTP 请求),服务器处理该消息并将响应(HTTP 响应)发送给客户端。发送响应后将关闭连接。
看下图:
这很容易理解。客户端将发送请求,在这种情况下,连接将被打开。之后,服务器将收到处理某些内容的请求,并将答案发送给客户端。整个过程结束后连接将关闭。如果客户端需要发送新请求,则应再次打开连接,并且流程以相同的顺序发生。
这里有一个明显的缺点,客户端需要根据请求打开新连接。从服务器的角度来看,服务器需要同时处理大量的新连接。这会消耗大量的 CPU 和内存。
在 HTTP 的 1.0 版本上,连接不是持久的。要启用它,keep-alive
标头应该包含在请求中。标头应如下所示:
如前所述,这是使 HTTP 连接在 1.0 版本上持久的唯一方法;发生这种情况时,服务器不会断开连接,客户端可以重用打开的连接。
在 HTTP 1.1 上,默认情况下连接是持久的;在这种情况下,与第一个版本相比,连接保持打开状态,客户端可以正常使用它。
这里有一个明显的改进,它可以带来一些优势。服务器需要管理的连接更少,它减少了大量的 CPU 时间。 HTTP 请求和响应可以在同一个连接中流水线化。
正如我们所知,天下没有免费的午餐。这也有一些缺点;服务器需要保持连接打开,服务器将为客户端保留所需的连接。在某些情况下,这可能会导致服务器不可用。
持久连接对于维护服务器和客户端之间的流非常有用。
我们的解决方案是完全反应式的,因此我们需要使用 Reactor RabbitMQ,它允许我们使用反应范式。
在这个新的微服务上,我们不需要通过消息代理发送消息。我们的解决方案将侦听 RabbitMQ 队列并为连接的客户端推送收到的推文。
Reactor RabbitMQ 尝试提供一个反应库来与 RabbitMQ rboker 交互。它使开发人员能够使用 RabbitMQ 作为消息代理解决方案,基于反应流创建非阻塞应用程序。
正如我们之前所了解的,这种解决方案一般不会占用大量内存。如果我们将其与阻塞解决方案进行比较,该项目基于 RabbitMQ Java 客户端并具有类似的功能。
我们没有使用 spring-amqp-starter
,所以魔法不会发生。我们将需要为 Spring 上下文编写 bean 声明,我们将在下一节中这样做。
在本节中,我们将在 Spring 上下文中配置 RabbitMQ 基础结构类。我们将使用 @Configuration
类来声明它。
配置类应如下所示:
这里有两件重要的事情。第一个是我们为 Kotlin 配置了 Jackson 支持。它允许我们将 ObjectMapper
注入到我们的 Spring bean 中。下一个重要的事情与 RabbitMQ 连接的配置有关。
我们已经为 Spring Context 声明了一个 ConnectionFactory
bean。我们使用 @Value
注释注入配置,并在构造函数中接收值。我们可以直接在属性中设置值,在 Kotlin 语言中;查看 ConnectionFactory
属性分配。
在 ConnectionFactory
配置之后,我们可以声明一个接收器,它是一个 Reactive
抽象来消费队列,使用反应式编程。我们接收之前创建的 ConnectionFactory
并将其设置为 ReceiverOptions
。
这就是 Reactor RabbitMQ 配置的全部内容。
现在,我们将使用 RabbitMQ 队列。该实现与我们在阻塞实现中看到的非常相似,并且函数的名称也相似。
在前面的章节中我们已经消费了一些 RabbitMQ 消息,但是这个解决方案是完全不同的。现在,我们将使用 Reactive RabbitMQ 实现。这里的主要思想是消费事件流;这些事件代表已到达代理的消息。这些消息到达,Reactor RabbitMQ 将这些消息转换为 Flux,使我们能够在响应式范式中消费。
在响应式范式中,事件流(我们可以认为队列中的消息)的表示是 Flux
。
然后我们的函数,它正在监听 RabbitMQ,应该返回 Flux
,一个无限的事件表示。 Receiver 实现返回消息的Flux
,这对我们来说已经足够了,也很符合我们的需求。
让我们多了解一点。我们在构造函数中收到了 Receiver
作为注入。当有人调用 dispatch()
函数时,Receiver
将开始消费队列,队列也被注入到构造函数中.
Receiver
产生 Flux<Delivery>
。现在,我们需要转换 Flux<Delivery>
的实例,它代表一个消息抽象, 到我们的领域模型推文。 flatMap()
函数可以为我们完成,但首先,我们将 message.body
转换为字符串,然后我们已经使用 Jackson 读取 JSON 并转换为我们的 Tweet 域模型。
看看代码读起来有多简单; API 流畅且可读性强。
在连接的客户端断开连接之前,消费者不会终止。我们很快就能看到这种行为。
我们正在接收来自 RabbitMQ 的消息。现在,我们需要将消息返回 给连接的客户。
为此,我们将使用 SSE 和 Spring WebFlux。该解决方案非常适合我们,因为我们将生成 Flux<Tweet>
并开始为我们的客户推送推文。客户端将发送查询以过滤所需的推文。
该应用程序将是完全反应式的。让我们看一下我们的代码:
很容易理解。我们已经声明了 tweets()
函数;这个函数被映射到一个 GET HTTP 请求并产生一个 MediaType.TEXT_EVENT_STREAM_VALUE
。当客户端连接到端点时,服务器将开始相应地发送带有所需参数的推文。
当客户端断开连接时,Reactor RabbitMQ 将关闭请求的 RabbitMQ 连接。
现在,是时候打包整个解决方案并为所有项目创建 Docker 映像了。在我们想要的任何地方运行项目很有用。
我们将逐步配置所有项目,然后在 Docker 容器中运行解决方案。作为一个挑战,我们可以使用 docker-compose
在单个 yaml
文件中编排整个解决方案。
对于 Tracked Hashtag Service,我们创建了 docker 镜像。然后,我们将开始配置 Tweet Gathering,最后一个是Tweet Dispatcher。让我们现在就这样做。
Note
您可以在以下位置找到更多 docker-compose
项目详细信息: https://docs.docker.com/compose/。此外,在新版本中,docker-compose
支持 Docker Swarm 来编排集群节点之间的堆栈。在生产环境中部署 Docker 容器非常有用。
让我们为 Tweet Gathering 项目配置 pom.xml
。
构建节点应如下所示:
看看端口配置;它应该与我们在 application.yaml
中配置的相同。配置完成,让我们创建我们的 Docker 镜像:
命令输出应类似于以下屏幕截图:
有一个最近创建并标记为最新的图像;映像已准备好运行。让我们为 Tweet Dispatcher 项目做同样的事情。
再看一次端口配置。 Docker 将使用它来公开正确的端口。现在,我们可以运行镜像创建命令:
然后,我们可以看到命令的输出,如下图所示:
太棒了,所有图像都准备好了。让我们运行它。
Note
我们需要为所有项目创建 Docker 镜像。过程相同;配置 maven Docker 插件,然后在项目上使用 mvn clean install docker:build
。完整的源代码可以在 GitHub 上找到。可在此处找到跟踪标签服务 (https ://github.com/PacktPublishing/Spring-5.0-By-Example/tree/master/Chapter04),Tweet Gathering 可以在这里找到(https://github.com/PacktPublishing/Spring-5.0-By-Example/tree/master/Chapter05< /a>) 最后,可以在此处找到 Tweet Dispatcher (https://github.com/PacktPublishing/Spring-5.0-By-Example/tree/master/Chapter06)。
我们已准备好在 Docker 容器中运行该解决方案。我们一直在使用 IDE 或命令行运行解决方案,但现在我们将启动一些容器并测试解决方案和 Spring 配置文件。
- The first operation, the Tracked Hashtag Service, will persist the hashtag in the Redis database.
- After that, the Tracked Hashtag Service will send the newly tracked hashtag to a queue in the RabbitMQ Broker.
- Tweet Gathering is listening to the queue to track Tweets and trigger the event and starts by listening to the Twitter stream.
- Tweet Gathering starts to get Tweets from the Twitter stream.
- Tweet Gathering publishes Tweets to a queue in the RabbitMQ broker.
- Tweet Dispatcher consumes the message.
- Tweet Dispatcher sends the message to the Client using SSE.
现在我们已经了解了解决方案,让我们启动容器。
该图像已在 previous 部分中创建,因此现在我们可以启动容器。启动容器的命令应如下所示:
让我们解释一下指令。 -d
告诉 Docker 引擎在后台运行容器 模式或分离。 另一个重要的参数是 --net
,它将容器附加到所需的网络。
我们可以使用以下命令在运行时跟踪容器日志:
该命令类似于 Linux 上的 tail -f
命令,其中 看起来在日志流的最后一部分。我们可以去掉标志 -f
来查看日志的最后几行。
docker 日志的输出应如下所示:
查看日志中选择的配置文件:
请记住,我们已在 Tracked Hash Tag Service 的 pom.xml
文件中对其进行了参数化。让我们看一下以下代码段:
好工作。我们的第一个服务运行正常。让我们运行 Tweet Gathering;这里有一些有趣的配置。
运行 Tweet Gathering 应用程序稍微 不同。此容器需要用于与 Twitter API 交互的环境变量。我们可以在 docker run
命令上使用 -e
参数。让我们这样做:
看看我们在 application.yaml
文件中配置的环境变量。 Docker 运行命令会将这些变量注入系统,然后我们可以在 Java 应用程序中使用它们。
让我们检查一下我们的容器日志。我们可以使用以下命令来做到这一点:
太棒了,我们的应用程序已启动并正在运行。如您所见,应用程序已连接到 RabbitMQ 代理。
Note
RabbitMQ 和 Redis 应该正在运行以使您能够运行 Tweet搜集。我们可以使用 docker ps
命令检查它;它将列出正在运行的容器,RabbitMQ 和 Redis 需要在此列表中。
现在,我们可以运行 Dispatcher 应用程序来完成整个解决方案。让我们这样做。
运行 Tweet Dispatcher 容器并不是什么秘密。我们可以使用以下命令来运行它:
它将启动容器,在运行期间命名容器是一个好 的想法。它可以帮助我们使用命令行工具来管理容器,例如 docker container ls
或docker ps
,因为它显示最后一列中的容器名称。然后,让我们检查我们的容器是否正在运行,因此输入以下命令:
或者,您可以运行以下命令:
我们应该能够看到 Gathering 容器正在运行,如下面的输出所示:
有五个容器、三个应用程序和两个基础设施服务, RabbitMQ 和 Redis .
在任何时候,我们都可以使用以下命令停止所需的容器:
docker stop
只会停止容器;信息将保存在容器卷中。我们也可以使用容器名称或容器 ID,我们之前命名过。这对我们来说很容易。如果我们使用 docker ps
命令,最近停止的图像将永远不会出现在列表中。要显示所有容器,我们可以使用 docker ps -a
或docker container ls -a
。
现在,我们将再次启动容器;该命令是不言自明的:
容器再次运行。我们对 Docker 进行了更多的实践。
很棒的工作,伙计们。整个应用程序是容器化的。做得好。
在微服务 architectural 风格中,整个解决方案在小型且定义明确的服务中解耦。通常,当我们采用这些样式时,我们要部署的工件不止一个。
让我们分析一下我们的解决方案;我们要部署三个组件。我们使用了 Docker 容器,并且使用 docker run
命令运行了这些容器。一一我们已经使用了 docker run
3次。在开发例程中非常复杂且很难做到。
docker-compose
可以在这种情况下帮助我们。它是一个有助于在像我们这样的复杂场景中编排 Docker 容器的工具。
假设我们的应用程序正在快速增长,我们需要再构建四个微服务来实现所需的业务案例,这将涉及另外四个 docker run
命令,并且维护起来可能会很痛苦,尤其是在开发生命周期中。有时,我们需要提升工件来测试环境,我们可能需要修改我们的命令行来实现这一点。
docker-compose
使我们能够使用单个 yaml
文件部署多个容器。这个 yaml
文件有一个定义的结构,它允许我们在同一个文件中定义和配置多个容器。此外,我们可以用一个命令运行这个 yaml
文件中配置的解决方案,它使开发生活变得容易。
该工具可以在本地机器上工作,或者我们可以将它与可以管理 Docker 主机集群的 Docker Swarm 工具集成。
Note
Docker Swarm 是一个管理 docker 集群的原生工具。它使在 Docker 集群上部署容器变得容易。在新版本中,docker-compose
与 Docker Swarm 完全集成。我们可以从 docker-compose.yaml
中的 Docker Swarm 属性中定义它。 Docker Swarm 文档位于: https://docs.docker。 com/engine/swarm/。
docker-compose
yaml
有一个定义的 structure 跟随;文档可以在这里找到: https://docs.docker.com/compose/compose-file/#compose-and-docker-compatibility-matrix. 我们将创建一个简单的文件来理解 docker-compose
行为。让我们创建简单的 yaml
——yaml
应该如下所示:
上述代码中的 yaml
将创建下图中详述的结构:
它简化了开发时间。现在,我们将学习如何安装 docker-compose
。
docker-compose
安装非常简单 并且有据可查。我们使用的是 Linux,所以我们将使用 Linux 指令。
打开终端并使用以下命令:
等待下载,然后我们可以执行以下指令为程序赋予可执行权限。让我们通过执行以下命令来做到这一点:
如您所知,系统可能会要求您输入管理员密码。我们的 docker-compose
现在已经安装好了。让我们检查一下:
提示将显示已安装的版本,如以下屏幕截图:
docker-compose
已经启动并运行,所以让我们跳到下一部分,开始创建我们的 yaml
文件并部署整个堆栈与一个单一的命令。
Note
对于不同的操作系统,可以在此处找到说明: https ://docs.docker.com/compose/install/#install-compose。然后,您可以浏览说明并单击所需的操作系统。
现在,我们已经安装了 docker-compose
,我们可以尝试使用该工具。我们想用一个命令运行整个堆栈。我们将创建 yaml
文件来表示堆栈。我们的 yaml
文件应该包含 Redis 容器、RabbitMQ 容器、Tracked Hashtag 应用程序、Gathering 应用程序,最后是 Dispatcher 应用程序。
我们可以在任何我们想要的地方创建一个 docker-compose.yaml
文件,没有任何限制。
我们的 docker-compose.yaml
文件应该看起来如下:
如您所见,我们在 yaml
中定义了整个堆栈。需要注意的是,我们可以找到与 docker run
命令的一些相似之处,实际上它会使用 Docker 引擎来运行。 yaml 中的 environment
节点与 Docker 运行命令中的 -e
具有相同的行为。
我们已经定义了应用程序端口、docker 映像,并且还将容器连接 到同一个网络。这一点真的很重要,因为当我们在网络上使用 docker-compose
文件名时,可以发现容器名有一种 DNS 行为。
例如,在定义的网络solution
中,容器可以通过名称redis
找到Redis容器实例。
docker-compose
简化了 process 以运行整个堆栈。我们的 yaml
文件已正确配置和定义。
让我们开始解决方案。运行以下命令:
该命令非常简单,-d
参数指示 Docker 在后台运行该命令。正如我们对 Docker 运行命令所做的那样。
此命令的输出应如下所示:
看一下,docker-compose
已经为我们的堆栈创建了一个网络。在我们的例子中,网络驱动程序是一个网桥,在网络创建之后,容器被启动。
让我们测试一下,找到 Gathering 容器 - container 名称在 docker-compose
以文件夹名称为前缀,
docker-compose
的启动位置。
例如,我在 compose 文件夹中启动了我的 docker-compose
堆栈。由于文件夹名称,我的容器名称将是 compose_gathering_1
。
然后,我们将连接 Gathering 容器。可以使用以下命令来实现:
docker exec
命令允许我们在容器内执行一些东西。在我们的例子中,我们将执行 /bin/bash
程序。
命令结构如下:
太棒了,注意命令行。应该更改它,因为现在我们在容器命令行中:
我们没有在主机上以 root 身份连接,但现在我们是容器上的 root。该容器与名为 redis
的 Redis 容器实例位于同一网络上。
让我们用 ping
命令进行测试;我们应该可以通过名称找到redis
容器 redis
,我们来做吧。键入以下内容:
命令输出应如下所示:
太棒了,我们的容器可以通过名称找到 Redis 容器。 yaml
文件可以正常工作。
在本章中,我们完成了第二个解决方案。我们被介绍到 RabbitMQ Reactor 库,它使我们能够使用响应式范例连接到 RabbitMQ。
我们在 Docker 容器中准备了整个解决方案,并将其连接到同一个网络,以使应用程序能够相互通信。
我们还学习了通过 HTTP 持久连接将数据从服务器推送到客户端的重要模式,以及 WebSocket 和服务器发送事件之间的区别。
最后,我们了解了 docker-compose
如何帮助我们创建堆栈并使用几个命令运行整个解决方案。
在接下来的章节中,我们将构建一个完整的微服务解决方案,使用一些重要的模式,如服务发现、API 网关、断路器等等。