vlambda博客
学习文章列表

读书笔记《hands-on-reactive-programming-in-spring-5》使用 Cloud Streams 进行扩展

第 8 章使用 Cloud Streams 进行扩展

前面的章节教我们如何在使用 Reactor 3 时使用反应式编程范式成为一种乐趣。到目前为止,我们已经学习了如何使用 Spring WebFlux 和 Spring Data Reactive 构建反应式 Web 应用程序。这种强大的组合使得构建能够处理高负载同时还提供高效资源利用、低内存占用、低延迟和高吞吐量的应用程序成为可能。

然而,这并不是 Spring 生态系统带来的可能性 的终结。在本章中,我们将学习如何使用 Spring Cloud 生态系统提供的特性来改进我们的应用程序,以及学习如何使用 Spring Cloud Streams< /strong>。此外,我们还将了解 RSocket 库是什么以及它如何帮助我们开发快速流式传输系统。最后,本章还介绍了Spring Cloud Function模块,它简化了基于响应式编程和背压支持的云原生响应式系统的构建.

在本章中,我们将介绍以下主题:

  • The role of message brokers in reactive systems
  • The role of Spring Cloud Streams in reactive systems with Spring Framework
  • Serverless reactive systems with Spring Cloud Function
  • RSocket as an application protocol for asynchronous, low-latency message passing

 

消息代理是消息驱动系统的关键


如果我们记得 第 1 章, 为什么选择 Reactive Spring?,响应式系统的本质在于消息驱动的通信。此外,前面的章节清楚地表明,通过应用反应式编程技术,我们可以为进程间/跨服务通信编写异步交互。此外,通过使用 Reactive Streams 规范,我们能够以 asynchronous 方式管理背压和故障。收集所有这些功能,我们能够在一台计算机内构建一个高粒度的反应式应用程序。不幸的是,单节点应用程序有其限制,这些限制以硬件限制表示。首先,如果不关闭整个系统,就不可能提供额外的 CPU、RAM 和硬盘驱动器/SSD 等新的计算资源。这样做没有任何好处,因为用户可以分布在世界各地,因此整体用户体验是不同的。

笔记

在这里, 不同的用户体验 指的是不同的延迟分布,它直接取决于应用程序的服务器位置和用户位置之间的距离。

可以通过将单体应用程序拆分为微服务来解决此类限制。该技术的中心思想旨在实现具有直接位置透明度的弹性系统。然而,这种构建应用程序的方式暴露了新的问题,例如服务管理、监控和无痛扩展。

服务器端负载均衡

 分布式系统开发的早期阶段,实现location 透明度和弹性是使用外部负载均衡器(例如 HAProxy/Nginx)作为副本组顶部的入口点或作为中央负载均衡器整个系统。考虑下图:

读书笔记《hands-on-reactive-programming-in-spring-5》使用 Cloud Streams 进行扩展

图 8.1 外部服务负载均衡示例

编号图的每个部分解释如下:

  1. 在这里,我们有一个服务扮演网关的角色并协调所有用户的请求。正如我们所见,网关两次调用 Service A 和Service B< /strong>。假设 Service A 扮演访问控制的角色如果第一次调用验证给定的访问令牌是正确的,或者检查是否存在访问网关的有效授权。一旦检查了访问权限,就会执行第二次调用,这可能包括在 Service B, 这可能需要额外的权限检查,因此再次调用访问控制。
  2. 这是负载均衡器的示意图。为了启用自动缩放,负载均衡器可能会收集诸如打开连接数之类的指标,这可能会给出该服务上的整体负载状态。或者,负载均衡器可以收集响应延迟,并在此基础上对服务的健康状况做出一些额外的假设。 将此信息与额外的定期健康检查相结合,负载均衡器可以调用第三方机制来分配额外的资源负载峰值或在负载减少时取消分配冗余节点。
  1. 这演示了在专用负载平衡器 (2) 下分组的服务的特定实例。在这里,每个服务都可以在不同的节点或机器上独立工作。

从图中我们可以看出,负载 balancer 扮演着可用实例注册的角色。对于每组服务,都有一个专用的负载均衡器来管理所有实例中的负载。反过来,负载平衡器可以基于整体组负载和可用指标启动扩展过程。例如,当用户活动出现高峰时,可能会动态地将新实例添加到组中,从而处理增加的负载。反过来,当负载减少时,负载均衡器(作为指标持有者)可能会发送通知,说明组中存在冗余实例。

笔记

请注意,出于本书的目的,我们将跳过自动缩放技术。但是,我们将在 第 10 章, 最后, 发布它。

但是,该解决方案存在一些已知问题。首先,在高负载下,负载均衡器可能会成为系统中的热点。回顾 Amdahl 定律,我们可能还记得负载均衡器成为一个争论点,并且底层服务组不能处理比​​负载均衡器更多的请求。反过来,为负载均衡器提供服务的成本可能很高,因为每个专用负载均衡器都需要单独的强大机器或虚拟机。此外,它可能需要安装负载平衡器的额外备份计算机。最后,还应管理和监控负载均衡器。这可能会导致基础设施管理的一些额外费用。

笔记

一般来说,服务器端负载均衡是一种经过时间验证的技术,可以用于很多情况。尽管它有其局限性,但它是当今可靠的解决方案。要了解有关技术和用例的更多信息,请查看以下链接: https://aws.amazon.com/ru/blogs/devops/introducing-application-load-balancer-unlocking-and-optimizing-architectures/

 

使用 Spring Cloud 和 Ribbon 进行客户端负载平衡

幸运的是,当服务器端负载均衡器成为系统中的热点时,Spring Cloud 生态系统来救援并试图解决这个问题。 Spring 团队没有为外部负载均衡器提供解决方法,而是决定遵循 Netflix 构建分布式系统的最佳实践。实现可扩展性和位置透明性的方法之一是通过客户端负载平衡。

客户端负载平衡的想法很简单,意味着服务通过复杂的客户端进行通信,该客户端知道目标服务的可用实例,以便它可以轻松地平衡它们之间的负载:

读书笔记《hands-on-reactive-programming-in-spring-5》使用 Cloud Streams 进行扩展

图 8.2。客户端负载平衡模式示例

在上图中,编号点的含义如下:

  1. 这一点描述了一些与 Service A 通信的服务。
  2. 这些是客户端负载平衡器。正如我们现在所看到的,负载均衡器是每个服务的一部分。因此,所有协调都应在实际 HTTP 调用发生之前完成。
  3. Service A 实例的实际组如下所示.

在此示例中,所有调用者执行对 不同 副本的调用 Service A. 尽管该技术提供了独立于专用外部负载平衡器的独立性(从而提供了更好的可扩展性),但它也有其局限性。首先, 提到的负载平衡技术是客户端平衡。因此,每个调用者都负责通过选择目标服务的实例来平衡本地的所有请求。这有助于避免单点故障,从而提供更好的可扩展性。另一方面,关于可用服务的信息应该以某种方式可供系统中的其他服务访问。

为了熟悉用于服务发现的现代技术,我们正在考虑 Java 和 Spring 生态系统中流行的库之一——Ribbon。 Ribbon 库是 Netflix 创建的客户端负载均衡器模式的实现。 Ribbon 提供了两种常用技术来提供对可用服务列表的访问。提供此类信息的最简单方法是通过服务地址的静态预配置列表。该技术如下图所示:

读书笔记《hands-on-reactive-programming-in-spring-5》使用 Cloud Streams 进行扩展

图 8.3。使用 Netflix Ribbon 为每个客户端负载平衡预配置的静态服务列表。

在上述图表中,编号点的含义如下:

  1. 这一点描述了一些与 Service A通信的服务。
  2. 这些是客户端负载平衡器。每个服务都可以访问特定的服务实例组。
  3. 预配置 Service A 实例的内部列表的表示。在这里,每个调用者服务独立测量每个目标 Service A 实例的负载,并应用与此相关的平衡过程。列表中粗体的服务名称也指Service A的当前执行目标实例。< /em>
  4. 描述了实际的 Service A 实例组这里。

 

在上图中,不同边框的形状 勾勒出不同调用者的知识领域。不幸的是,客户端负载均衡 技术 有其差距。首先,客户端平衡器之间没有协调,所以有可能所有调用者都决定调用同一个instance 并重载它,如下图所示:

读书笔记《hands-on-reactive-programming-in-spring-5》使用 Cloud Streams 进行扩展

图 8.4。不同步的客户端负载均衡问题点的表示

编号图的每个部分解释如下:

  1. 这是指调用者服务及其预配置的 Service A 实例列表。本地负载测量因服务而异。因此,在此示例中,显示了所有服务调用 Service A< 的同一目标实例的情况em>. 这可能会导致负载出现意外峰值。
  2. 这是 Service A 实例之一。在这里,实例上的实际负载与每个调用者服务基于本地测量所假设的负载不同。

此外,这种使用静态服务实例列表管理负载的简单方法远非反应式系统需求,主要来自弹性负载管理 观点。

笔记

从响应式宣言的角度来看,弹性是指在以下情况下动态增加系统吞吐量以响应不断增长的需求和减少的资源使用的能力需求减少。

作为一种解决方案,Ribbon 能够与 Eureka 等服务注册中心集成,因此注册中心服务会不断更新可用服务副本的列表。考虑下图:

读书笔记《hands-on-reactive-programming-in-spring-5》使用 Cloud Streams 进行扩展

图 8.5。可用服务列表的动态更新示例

在上图中,编号点的含义如下:

  1. 这指的是具有可用 Service A 实例列表的调用者服务。在这里,为了使 Service A 中的活动实例列表保持最新,客户端平衡器会定期刷新该列表并获取来自注册表 (2) 的最新信息。
  2. 这是注册服务的表示。正如我们在此处看到的,注册表服务保留了自己的已发现服务列表及其状态。
  3. 虚线表示服务或健康状态检查请求的心跳。

从上图可以看出,客户端负载均衡器的coordination问题依然存在。 在这种情况下,注册中心负责保存健康服务实例的列表并在运行时不断更新它们。这里,客户端均衡器和注册服务都可以保存目标服务实例的负载信息,客户端均衡器可以周期性地将内部负载统计信息与注册服务收集的负载进行同步。此外,所有相关方都可以访问该信息并根据该信息执行负载平衡。这种管理可用服务列表的方式比前一种更广泛,可以动态更新可用目的地列表。

首先,该技术适用于小型服务集群。 然而,使用共享注册表发现动态服务远非理想。与服务器端负载平衡一样,像 Eureka 这样的经典注册表服务会成为单点故障,因为需要付出巨大的努力来保持系统状态信息的更新和准确。例如,当集群状态 迅速 变化时,注册服务的信息可能会过时。为了跟踪服务的健康状态,服务实例通常会定期发送心跳消息。或者,注册表可以定期执行健康检查请求。在这两种情况下,非常频繁的状态更新可能会消耗不合理的高百分比的集群资源。因此,健康检查之间的持续时间通常从几秒到几分钟不等(Eureka 的默认持续时间是 30 秒)。因此,注册中心可能会提议一个在上次健康检查期间健康但已被销毁的服务实例。因此,集群越动态,就越难使用集中式注册表准确跟踪服务的状态。

此外,所有的平衡 仍然 发生在客户端。这会导致负载均衡不协调的相同问题,这意味着服务上的实际负载可能会出现不均衡。此外,客户端负载平衡器 基于分布式服务中的指标 提供准确和诚实的请求协调系统是另一个挑战,这可能比前一个更难。 因此,我们必须找到更好的解决方案来构建反应式系统。

笔记

在本节中,我们讨论了一个非常流行的 Spring Cloud 生态系统发现/注册服务:Eureka。有关这方面的更多信息,请参阅以下链接:https://cloud. spring.io/spring-cloud-netflix/。一般来说,客户端负载均衡是一种在目标服务实例之间分配负载的有效技术。  此外,还有一些算法可以使客户端平衡可预测,因此可以避免此处描述的大多数问题。要了解更多信息,请参阅以下内容:https://www.youtube.com /watch?v=6NdxUY1La2I

消息代理作为消息传输的弹性、可靠层

幸运的是,响应式 manifesto 为与服务器端和客户端平衡相关的问题提供了解决方案:

“使用显式消息传递可以通过整形和监控系统中的消息队列并在必要时施加背压来实现负载管理、弹性和流量控制。”< /跨度>

该声明可以解释为使用独立 消息代理来传输消息。考虑下图:

读书笔记《hands-on-reactive-programming-in-spring-5》使用 Cloud Streams 进行扩展

图 8.6。使用消息队列作为服务的负载平衡示例

在上图中,编号点的含义如下:

  1. 这些是调用者服务。正如我们在这里看到的,调用者服务只知道消息队列的位置和接收者的服务名称,这允许调用者服务与实际的目标服务实例解耦。这种通信模型类似于我们在服务器端平衡中的通信模型。然而,这里的显着差异之一是调用者和最终接收者之间的异步通信行为。在这里,我们不必在处理请求时保持连接打开。
  2. 这是传出消息表示。在此示例中,外发消息可能包含有关接收者的服务名称和消息相关 ID 的信息。
  3. 这是消息队列的表示。在这里,消息队列作为一个独立服务,允许用户向Service C 实例,以便任何可用实例都可以处理消息。
  4. 这是接收方的服务实例。每个 Service C 实例具有相同的平均负载,因为每个工作人员都能够通过显示需求来控制背压 (6) 以便消息队列可以发送传入消息 (5).

首先,所有请求都通过消息队列发送,然后可以将它们发送给可用的工作线程。此外,消息队列可以保持消息持续存在,直到其中一个工作人员请求新消息。通过这种方式,消息队列知道系统中有多少相关方,并可以根据该信息管理负载。反过来,每个工人都能够在内部管理背压并根据机器的可能性发送需求。仅通过监控待处理消息的数量,我们就可以增加活跃工作人员的数量。此外,仅仅依靠待处理的工人需求量,我们就可以减少休眠工人。

虽然消息队列解决了客户端负载均衡的问题,但似乎我们又回到了与之前非常相似的服务器端负载均衡解决方案,消息队列可能会成为系统中的热点.然而,事实并非如此。首先,这里的通信模型有点不同。消息队列不是搜索可用服务并决定将请求发送给谁,而是将传入消息放入队列中。然后,当工作人员声明接收消息的意图时,排队的消息将被传输。因此,这里有两个独立的、可能独立的阶段。这些如下:

  • Receiving the messages and putting them into the queue (which may be very fast)
  • Transferring data when consumers show demand

另一方面,我们可以为每组收件人复制消息队列。因此,我们可以增强系统的可扩展性并巧妙地避免任何瓶颈。考虑下图:

读书笔记《hands-on-reactive-programming-in-spring-5》使用 Cloud Streams 进行扩展

图 8.7。消息队列即服务的弹性示例

编号 diagram 的每个部分解释如下:

  1. 这表示启用了数据复制的消息队列。在此示例中,我们有一些复制的消息队列,它们是每组服务实例专用的。
  2. 这表示同一组收件人的副本之间的状态同步。
  3. 这表示副本之间可能的负载平衡(例如,客户端平衡)。

在这里,我们为每个收件人组设置一个队列,并为该组中的每个队列设置一个复制集。但是,负载可能因组而异,因此一组可能会超载,而另一组可能只是处于休眠状态而没有任何工作,这可能是一种浪费。因此,我们可能会依赖支持虚拟队列的消息代理,而不是将消息队列作为单独的服务。通过这样做,我们可以降低基础设施的成本,因为系统上的负载可能会减少,因此一个消息代理可以在不同的接收者组之间共享。反过来,消息代理也可能是一个反应系统。因此,消息代理可以具有弹性和弹性,并且可以使用异步、非阻塞消息传递来共享其内部状态。考虑下图:

读书笔记《hands-on-reactive-programming-in-spring-5》使用 Cloud Streams 进行扩展

图 8.8。分布式消息代理的弹性

在上图中,编号点的含义如下:

  1. 这是具有分区客户端负载平衡器的调用者服务。通常,消息代理可以使用前面提到的技术来组织分区的发现并与其客户端共享信息。
  2. 这是指消息代理分区的表示。在此示例中,每个分区都有许多分配的收件人(主题)。反过来,随着分区,每个部分也可能有一个副本。
  3. 这指的是分区的重新平衡。消息代理可以采用额外的重新平衡机制,因此在集群中有新接收者或新节点的情况下,这样的消息代理可以轻松扩展。
  4. 这是接收者的一个例子,它可能会监听来自不同分区的消息。

上图描述了消息代理的可能设计,作为一个可以成为目标应用程序可靠主干的系统。从上图中可以看出,消息代理可以拥有系统需要的任意数量的虚拟队列。现代消息代理采用状态共享技术,例如最终一致性和消息多播,从而实现开箱即用的弹性。消息代理可以是异步消息传输的可靠层,具有背压支持和可重放性保证。

笔记

例如,消息代理的可靠性可以通过采用有效的技术在快速存储上进行消息复制和持久性来实现。但是,经验可能会有所不同,因为此类消息代理的性能可能低于那些不使用消息持久性或在对等发送消息的情况下的那些消息代理的性能。

这对我们意味着什么?这意味着在消息代理崩溃的情况下,所有消息都可用,因此一旦消息传递层可用,所有未传递的消息都可以找到它们的目的地。

总之,我们可以得出结论,消息代理技术提高了系统的整体可扩展性。在这种情况下,我们可以很容易地构建一个弹性系统,因为消息代理可以作为一个反应系统。因此,沟通不再是瓶颈。

消息代理市场

尽管使用 消息代理的想法可能是大多数业务需求的梦想,但我们的实际实现自己的消息代理可能会变成一场噩梦。幸运的是, 市场 现在 为我们提供了一些强大的开源解决方案。一些最流行的消息代理和消息平台包括RabbitMQ、Apache Kafka、Apache Pulsar、Apache RocketMQ、NATS、NSQ等。

笔记

我们可以通过以下链接找到消息代理的比较:Apache RocketMQ 与 Apache Kafka athttps://rocketmq.apache.org/docs/motivation/#rocketmq-vs-activemq-vs-kafka; RabbitMQ 与 Kafka 与 NSQ 在 https://stackshare.io/stackups/kafka-vs -nsq-vs-rabbitmq; 以及 Apache Pulsar 与 Apache Kafka 在 https://streaml.io/blog/pulsar-streaming-queuing

Spring Cloud Streams 作为 Spring 生态系统的桥梁


所有前面提到的 解决方案都相互竞争,并且各有优势,例如赢得低延迟或更好的保证消息传递或持久性。

尽管如此,这本书是关于 Spring 生态系统中的响应式可能性的,因此了解 Spring 为与消息代理的无痛集成提供了什么是很有价值的。

使用 Spring Cloud 构建健壮的消息驱动系统的强大方法之一是通过 Spring Cloud Streams。 Spring Cloud Streams 为异步跨服务消息传递提供了简化的编程模型。反过来,Spring Cloud Streams 模块构建在 Spring Integration 和 Spring Message 模块之上,它们是与外部服务正确集成和直接异步消息传递的基本抽象。此外,Spring Cloud Streams 提供了构建弹性应用程序的能力,而无需处理过于复杂的配置,也无需深入了解特定的消息代理。

笔记

不幸的是,只有少数消息代理在 Spring Framework 中具有适当的支持。在撰写本文时,Spring Cloud Streams 仅提供与 RabbitMQ 和 Apache Kafka 的集成。

 

为了了解使用 Spring Cloud Streams 构建反应式系统的基础知识,我们将升级我们在 第 7 章反应式 数据库访问,到一个新的响应式 Spring Cloud Stack。

首先,我们将从 回顾我们的应用程序的设计开始,它由三个概念部分组成。第一部分是名为 ChatService 的连接器服务。在我们的例子中,这是与 Gitter 服务通信的实现,它是一个服务器发送的事件流。反过来,该消息流在 ChatController之间共享, 负责将这些消息直接传输给最终用户,  StatisticService,负责将消息存储在数据库中,并根据变化重新计算统计信息。以前,所有三个部分都由一个整体应用程序组成。因此,系统中的每个组件都通过使用 Spring 框架依赖注入连接。此外,Reactor 3 反应类型支持异步、非阻塞消息传递。我们需要了解的第一件事是 Spring Cloud Streams 是否允许将单体应用程序分解为微服务,同时允许使用响应式类型进行组件之间的通信。

幸运的是,从 Spring Cloud 2 开始,直接支持通过 Reactor 类型进行通信。以前,位置透明度可能与单体应用程序中组件的松散耦合有关。使用 Inversion of Control (IoC),每个组件都可以在没有任何实现的知识 的情况下访问组件接口。在云生态系统中,除了知道访问接口之外,我们还应该知道域名(组件名称),或者在我们的例子中,是专用队列的名称。作为通过接口进行通信的替代品,Spring Cloud Streams 提供了两个概念注解用于连接服务之间的通信。

笔记

要了解有关位置透明度的更多信息,请参阅以下链接:http://wiki.c2.com/?LocationTransparency

第一个概念注解是 @Output 注解。这个注解定义了消息应该被传递到的队列名称。第二个概念注解是 @Input 注解,它定义了消息应该从哪个队列监听。服务之间的这种交互方法可能会取代我们的接口,因此我们可能依赖于向特定队列发送消息,而不是调用该方法。让我们考虑一下为了允许向消息代理发送消息而必须应用到我们的应用程序的更改:

@SpringBootApplication                                             // (1)
@EnableBinding(Source.class)                                       // (1.1)
@EnableConfigurationProperties(...)                                //
/* @Service */                                                     // (1.2)
public class GitterService                                         //
   implements ChatService<MessageResponse>  {                      //

   ...                                                             // (2)

@StreamEmitter                                                  // (3)
   @Output(Source.OUTPUT)                                          // (3.1)
   public Flux<MessageResponse> getMessagesStream() { ... }        //

@StreamEmitter                                                  // (4)
@Output(Source.OUTPUT)                                          // 
   public Flux<MessageResponse> getLatestMessages() { ... }        //

public static void main(String[] args) {                        // (5)
SpringApplication.run(GitterService.class, args);            //
}                                                               //
}                                                                  //

笔记

请注意,与实际实现一起,前面的代码显示了代码差异。这里, /* Commented Text */ 指的是删除的代码行,加粗下划线的文本  表示新的。非样式代码意味着那里没有任何改变。

在上述代码中,编号点的含义如下:

  1. 这是 SpringBootApplication 声明。在 (1.1), 我们将 Spring Cloud Streams 定义为一个 @EnableBinding 注解,支持与 流式基础设施的底层集成(例如,与 Apache Kafka 的集成)。反过来,我们删除了 @Service注解(1.2),< /em> 因为我们从单体应用程序迁移到分布式应用程序。现在我们可以将该组件作为一个小型独立应用程序运行,并以这种方式实现更好的扩展。
  2. 这是字段和构造函数的列表,它们保持不变。
  3. 这指的是消息的 Flux 声明。该方法返回来自 Gitter 的不定式消息流。在这里,关键角色扮演 @StreamEmitter 因为它确定给定的源是一个反应源。因此,为了定义目标通道, @Output 在这里使用,接受通道的名称。请注意,目标频道的名称必须在 (1.1) 行的绑定频道列表中。
  1. 在这里, getLatestMessages 返回最新的 Gitter 消息的有限流并将它们发送到目标通道。
  2. 这是指 main 方法声明,用于 引导Spring Boot应用程序。

从示例中可以看出,从业务逻辑的角度来看,没有重大 更改。反过来,只需应用一些 Spring Cloud Streams 注释,就可以在此处添加大量基础架构逻辑。首先,通过使用 @SpringBootApplication,我们将我们的小服务定义为一个单独的 Spring Boot 应用程序。通过应用 @Output(Source.OUTPUT),我们在消息代理中定义了目标队列的名称。

最后, @EnableBinding@StreamEmitter 表示我们的应用绑定到了一个消息代理,< code class="literal">getMessagesStream() 和 getLatestMessages() 方法在应用程序的起始点被调用。

笔记

Тo 了解更多关于 @StreamEmitter 及其限制,请参阅以下链接:https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_reactive_sources。另外,要基本了解 Spring Cloud Stream 的注解模型,请看这个链接:https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_programming_model

除了 Java 注解,我们还应该提供 Spring Cloud Stream 绑定的配置。这可以通过提供 Spring Application 属性来完成,例如以下(application.yaml,在这种情况下):

spring.cloud.stream:                                               //
bindings:                                                       //
output:                                                      // (1)
destination: Messages                                     // (2)
producer:                                                 // (3)
requiredGroups: statistic, ui                          // (4)

在前面的示例代码中,在 (1), 点我们指定了绑定的键,这是 Source.OUTPUT 中定义的通道的名称。通过这样做,我们可以访问 org.springframework.cloud.stream.config.BindingProperties 并在消息代理 (2).除此之外,我们还可以配置我们的生产者应该如何表现  (3). 例如,我们可以配置一个接收者列表,该列表应该接收带有 至少一次 交货保证 (4)

 

通过将前面的代码作为单独的应用程序运行,我们可以看到消息代理内部的专用队列开始接收消息。另一方面,我们可能记得 第 7 章Reactive Database Access, 我们的聊天应用程序有两个中央消息消费者:控制器层和统计服务。作为系统修改的第二步,我们将更新统计服务。在我们的应用程序中,统计服务不仅仅是一个普通的消费者。它负责根据数据库更改更新统计信息,然后将其发​​送 到控制器层。这意味着该服务是 一个 处理器,因为它扮演 Source and 下沉 同时。因此,我们必须提供从消息代理消费消息并将其发送到消息代理的能力。考虑以下代码:

@SpringBootApplication                                             // (1)
@EnableBinding(Processor.class)                                    //
/* @Service */                                                     //
public class DefaultStatisticService                               //
   implements StatisticService {                                   //

   ...                                                             // (2)

@StreamListener                                                 // (3)
 @Output(Processor.OUTPUT)                                       //
   public Flux<UsersStatisticVM> updateStatistic(                  //
 @Input(Processor.INPUT) Flux<MessageResponse> messagesFlux   // (3.1)
   ) { ... }                                                       //

   ...                                                             // (2)

public static void main(String[] args) {                        // (4)
SpringApplication.run(DefaultStatisticService.class, args);  //
}                                                               //
}

编号代码的每个部分解释如下:

  1. 这是 @SpringBootApplication 声明。在这里,与前面的示例一样,我们将 @Service 替换为 @EnableBinding 注释。与 GitterService 组件的配置相比,我们使用 Processor接口,它声明StatisticService 组件使用来自消息代理的数据并将它们发送到消息代理。
  2. 这是代码中保持不变的部分。
  1. 这是处理器的方法声明。在这里, updateStatistic 方法接受一个 Flux,它提供对来自消息代理通道的传入消息的访问。我们必须通过提供@StreamListener注解和 @Input来明确定义 给定的方法监听消息代理 注释声明。
  2. 这是 main 方法声明,用于 引导Spring Boot应用程序。

可能会注意到,我们使用 Spring Cloud Streams annotations 只是为了标记输入 Flux 和输出 Flux 是来自/去往已定义队列的流。在该示例中,@StreamListener 允许虚拟队列的名称(从/向其消费/发送消息)对应于在 @Input/@Output 注解,而预配置接口绑定在 @EnableBinding 注解中.如上例所示,连同生产者配置,我们可以使用相同应用程序的属性(在本例中为 YAML 配置)以相同的通用方式配置声明的输入和输出:

spring.cloud.stream:                                               //
bindings:                                                       //
input:                                                       // (1)
destination: Messages                                     //
group: statistic                                          // (2)
output:                                                      //
producer:                                                 //
requiredGroups: ui                                     // 
destination: Statistic                                    //

Spring Cloud Stream 在与消息代理的通信配置方面提供了灵活性。在这里,在 (1) 点,我们定义了 input 实际上是消费者的配置。另外(2),我们要定义group的名字 表示接收者组的名字消息代理。

笔记

要了解更多关于 Spring Cloud Stream 的可用配置,请访问以下链接 https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_configuration_options

 

最后,在准备好发射器之后,我们必须更新我们的InfoResource组件通过以下方式:

@RestController                                                    // (1)
@RequestMapping("/api/v1/info")                                    //
@EnableBinding({MessagesSource.class, StatisticSource.class})      // (1.1)
@SpringBootApplication                                             //
public class InfoResource {                                        //

   ...                                                             // (2)

/*public InfoResource(                                            // (3)
ChatService<MessageResponse> chatService,                    //
StatisticService statisticService                            //
) { */                                                          // (3.1)
@StreamListener                                                 //
public void listen(                                             //
@Input(MessagesSource.INPUT) Flux<MessageResponse> messages, //
      @Input(StatisticSource.INPUT)                                //
      Flux<UsersStatisticVM> statistic                             //
   ) {                                                             //

   /* Flux.mergeSequential(                                       // (4)
chatService.getMessagesAfter("")                          //
 .flatMapIterable(Function.identity()),         //
chatService.getMessagesStream()                           //
)                                                            //
.publish(flux -> Flux.merge( ... */                          //

messages.map(MessageMapper::toViewModelUnit)              // (5)
                 .subscribeWith(messagesStream);                   //
statistic.subscribeWith(statisticStream);                 //

   /* ))                                                           // (4)
.subscribe(); */                                             //
}                                                               //

   ...                                                             // (2)

public static void main(String[] args){ // (6)
SpringApplication.run(InfoResource.class, args);             //
}                                                               //
}                                                                  //

 

在上述代码中,编号点的含义如下:

  1. 这是@SpringBootAppliction 定义。我们可能已经注意到,@EnableBinding 接受两个自定义的可绑定接口 这里 具有用于统计和消息的单独输入通道的配置。
  2. 这是代码中保持不变的部分。
  3. 这是 .listen 方法声明。正如我们所见,接受两个接口的构造函数现在接受了由 @Input 注解注解的Fluxes。
  4. 这是修改后的逻辑。在这里,我们不再需要手动合并和共享流,因为我们已将该责任转移到消息代理。
  5. 这就是我们 订阅 给定的统计数据和消息流的点。此时,所有传入消息都缓存到 ReplayProcessor。请注意 提到的缓存是本地的,为了实现更好的可扩展性,可以使用分布式缓存。
  6. 这是 main 方法声明,用于 引导Spring Boot应用程序。

笔记

要了解有关与分布式缓存(例如 Hazelcast 的可能集成的更多信息,请参阅 中的可用扩展Reactor-Addons 模块https://github.com/reactor/reactor-addons/tree/master/reactor-extra/src/main/java/reactor/cache

在这里,我们正在监听两个单独的队列。同样,使用消息代理可以让我们与 GitterService 和 StatisticService透明地分离。反过来,当我们处理 Spring Cloud Stream 时,我们必须记住 @StreamListener 注解仅适用于方法。因此,我们必须通过在 void 方法之上应用 @StreamListener 来破解该元素,该方法在与消息代理的连接被连接时被调用。

 

为了更好地理解自定义可绑定接口的内部结构, 让我们考虑以下代码:

interface MessagesSource {                                         //
   String INPUT = "messages";                                      // (1)
                                                                   //
   @Input(INPUT)                                                   // (2)
   SubscribableChannel input();                                    // (3)
}                                                                  //

interface StatisticSource {                                        //
   String INPUT = "statistic";                                     // (1)
                                                                   //
   @Input(INPUT)                                                   // (2)
   SubscribableChannel input();                                    // (3)
}                                                                  //

numbered 代码的每个部分在此处解释:

  1. 这是表示绑定通道名称的 String 常量。
  2. 这是 @Input annotation,声明被注解的方法提供 MessageChannel,传入的消息通过它进入应用程序.
  3. 这是表示 MessageChannel类型的方法。 对于 消息消费者的可绑定接口 我们必须提供SubscribableChannel,它扩展了 MessageChannel 有两个额外的异步消息监听方法。

与前面的案例一样,我们必须在本地application.yaml 中提供类似的属性:

spring.cloud.stream:                                               
   bindings:                                                       
      statistic:                                                   
         destination: Statistic                                    
         group: ui                                                  
      messages:                                                    
         destination: Messages                                     
         group: ui                                                 

通过在图中包含所有谜题,我们得到系统的以下架构:

读书笔记《hands-on-reactive-programming-in-spring-5》使用 Cloud Streams 进行扩展

图 8.9。分布式聊天应用程序示例

 

在上图中,编号点的含义如下:

  1. 这是 GitterService 的表示。可能注意到,GitterService 与 Gitter API 和 Message Broker 紧密耦合(2) (在这种情况下为 Apache Kafka)。但是,没有直接依赖于外部服务。
  2. 这是消息代理表示。在这里,我们里面有两个虚拟队列。请注意 所提到的表示并没有公开特定的配置,例如复制和分区。
  3. 此时,消息代理将消息解复用到 UI 服务 (InfoResource) 和 StatisticService
  4. 这是 StatisticService 表示。可能会注意到,该服务为消息代理侦听传入消息,将它们存储在 MongoDB 中,进行一些统计聚合,并生成更新结果。
  5. 最后,两个队列都由 UI 服务使用,它依次将所有消息多路分解到所有订阅的客户端。
  6. 这是网络浏览器的表示。在这里,UI 服务的客户端是 Web 浏览器,它们通过 HTTP 连接接收所有更新。

从上图中我们可能已经注意到,我们的应用程序在组件级别完全解耦。例如,GitterServiceStatisticService 和 UI 服务作为单独的应用程序运行(可能在不同的机器上运行)和向消息代理发送消息。此外, Spring Cloud Streams 模块支持 Project Reactor 及其编程模型,这意味着它 遵循 Reactive Streams 规范并支持背压控制,从而提供更高的应用程序弹性。通过这样的设置,每个服务都可以独立扩展。因此,我们可以实现一个响应式系统,这是迁移到 Spring Cloud Streams 的主要目标。

云中的反应式编程


尽管 Spring Cloud Streams 提供了一种 simplified 方式来实现分布式响应式系统,但我们仍然需要处理配置列表(例如,目的地的配置)来处理 Spring Cloud Streams 编程模型的细节,等等。另一个重要的问题是关于流量的推理。我们可能记得 第 2 章Spring 中的响应式编程 - 基本概念,发明响应式扩展(作为异步编程的概念)的主要原因之一是为了实现一个隐藏复杂异步的工具运营商功能链背后的数据流。尽管我们可以开发特定的组件并且我们可以指定组件之间的交互,但深入挖掘它们之间交互的全貌可能会成为一个难题。同样,在反应式系统中,理解微服务之间的流交互是一个关键部分,如果没有特定的工具是很难实现的。

幸运的是,亚马逊在 2014 年发明了 AWS Lambda。这为响应式系统开发提供了新的可能性。该服务的官方网页上提到了这一点:

"AWS Lambda 是一种 无服务器计算 服务( https://aws.amazon.com/serverless) 运行您的代码以响应事件并自动为您管理底层计算资源 (https://aws.amazon.com/lambda/features)。"

AWS Lambda 允许我们构建 小型、独立且可扩展的转换。此外,我们获得了将业务逻辑的开发生命周期与特定数据流分离的能力。最后,用户友好的界面允许我们使用每个功能独立构建整个业务流程。

亚马逊是该领域的先驱,激励许多云提供商采用相同的技术。幸运的是,Pivotal 也是该技术的采用者之一。

Spring Cloud 数据流

2016年初,Spring Cloud引入了一个新的模块称为 Spring Cloud Data Flow,有 官方模块说明 在这个链接:https://cloud.spring.io/spring-cloud-dataflow:

“Spring Cloud Data Flow 是一个用于构建数据集成和实时数据处理管道的工具包。”

 

概括地说,该模块的主要思想是实现功能业务转换的开发与已开发组件之间的实际交互之间的分离。换句话说,这是业务流程中功能及其组成之间的分离。为了解决这个问题,Spring Cloud Data Flow 为我们提供了一个用户友好的 Web 界面,这使得上传可部署的 Spring 成为可能启动应用程序。然后,它通过使用上传的工件并将组合管道部署到所选平台(例如 Cloud Foundry、Kubernetes、Apache YARN 或 Mesos)来设置数据流。此外,Spring Cloud Data Flow 为源(数据库、消息队列, 和文件),用于数据转换的不同内置处理器和接收器,它们表示存储结果的不同方式。

笔记

要了解有关支持的源、处理器和接收器的更多信息,请访问以下链接:https://cloud.spring.io/spring-cloud-task-app-starters/ https://cloud.spring.io/spring-cloud-stream-app-starters/.

如前所述,Spring Cloud Data Flow 采用了流处理的思想。因此,所有部署的流都构建在 Spring Cloud Stream 模块之上,并且所有通信都是通过分布式、弹性消息代理(如 Kafka)或 RabbitMQ 的分布式高度可扩展变体完成的。

为了了解使用 Spring Cloud Data Flow 进行分布式响应式编程的强大功能,我们将构建一个支付处理流程。正如我们可能已经知道的那样,支付处理非常复杂。但是,这是此过程的简化图:

读书笔记《hands-on-reactive-programming-in-spring-5》使用 Cloud Streams 进行扩展

图 8.10。支付处理流程图

 

正如我们可能已经注意到的那样,用户的付款必须通过几个重要步骤,例如验证、帐户限制检查和付款批准。在 第 6 章中, WebFlux 异步非阻塞通信, 我们构建了一个类似的应用程序,其中一个服务编排了整个流程。尽管整个交互分布在对多个独立微服务的异步调用中,但流的状态由 Reactor 3 内部的一个服务持有。这意味着在该服务发生故障的情况下,恢复状态可能具有挑战性。

幸运的是,Spring Cloud Data Flow 依赖于 Spring Cloud Streams,后者依赖于弹性消息代理。因此,如果发生故障,消息代理将不会确认关于message 接收,因此 因此,消息将被重新传递给另一个执行者,而无需额外的努力。

由于我们已经对 Spring Cloud Data Flow 内部的核心原理和 Payment Flow 的业务需求有了基本的了解,我们可以使用该技术的堆栈来实现该服务。

首先,我们必须定义入口点,它通常可以作为 HTTP 端点访问。 Spring Cloud Data Flow 提供了一个可以使用 Spring Cloud Data Flow DSL 定义的 HTTP 源, 如下例所示:

SendPaymentEndpoint=Endpoint: http --path-pattern=/payment --port=8080

笔记

前面的例子代表了 Spring Cloud Data Flow 管道 DSL 的一小部分。在下面的示例中,我们将定义更多关于如何构建完整 Spring Cloud Data Flow 管道的示例。要了解有关 Stream Pipeline DSL 的更多信息,请访问以下链接:https://docs.spring.io/spring-cloud-dataflow/docs/current/reference/htmlsingle/#spring-cloud-dataflow-stream-intro- DSL。在开始任何操作之前,请确保所有支持的应用程序和任务都已在 https://docs.spring.io/spring-cloud-dataflow/docs/current/reference/htmlsingle/#supported-apps-and-tasks

在前面的示例中,我们定义了一个新的数据流函数,它将所有 HTTP 请求表示为消息流。因此,我们可以以明确的方式对它们做出反应。

 

具有 Spring Cloud Function 的最细粒度的应用程序

定义 HTTP 端点后,我们必须验证 incoming 消息。不幸的是,这部分流程包含实际的业务逻辑,并且需要对流程的某个阶段进行自定义实现。不过幸运的是,Spring Cloud Data Flow 允许在流程中使用自定义 Spring Cloud Stream applications

笔记

我们不会在这里详细介绍自定义 Spring Cloud Data Flow 阶段的实现。但是,要了解有关自定义 Spring Boot 应用程序的创建和注册的更多信息,请点击以下链接:

一方面,我们可以为自己的独立 Spring Cloud Stream 应用程序提供自定义验证逻辑。但是,另一方面,我们仍然需要处理所有配置、uber-jars、较长的启动时间以及与应用程序部署相关的其余问题。幸运的是,使用 Spring Cloud Function 项目可以避免这些问题。

Spring Cloud Function 的主要目的是通过函数促进业务逻辑。该项目提供了将自定义业务逻辑与运行时细节分离的能力。因此,相同的功能可能会以不同的方式和地点重用。

笔记

想了解更多Spring Cloud Function项目的特性,请访问以下链接:https://github.com/spring-cloud/spring-cloud-function

在开始使用 Spring Cloud Function 之前,我们将在本节中介绍 Spring Cloud Function 模块的主要特性,并更好地了解其内部结构。

Spring Cloud Function 的核心是对可能运行在 Spring Cloud Streams、AWS Lambda 或使用任何通信传输的任何其他云平台之上的应用程序的额外抽象级别。

默认情况下,Spring Cloud Function 具有适配器,用于将函数可配置地部署到 AWS Lambda、Azure Functions 和 Apache OpenWhisk。与直接上传 Java 函数相比,使用 Spring Cloud Function 的主要好处是可以使用大多数 Spring 功能,并且不依赖于特定的云提供商 SDK。

Spring Cloud Function 提供的编程模型只不过是对以下 Java 类之一的定义——java.utils.function.Function java.utils.function.Supplier 和 java.utils.function.Consume。此外,Spring Cloud Function 可以在不同的框架组合中使用。例如,我们可以创建一个 Spring Boot 应用程序,它可以是一个平台元素的功能。反过来,它们中的一些可能 被表示为一个普通的Spring @Bean

@SpringBootApplication                                             // (1)
@EnableBinding(Processor.class)                                    // (1.1)
public class Application {                                         //

   @Bean                                                           // (2)
   public Function<                                                //
      Flux<Payment>,                                               // 
      Flux<Payment>                                                //
   > validate() {                                                  //
      return flux -> flux.map(value -> { ... });                   // (2.1)
   }                                                               //

   public static void main(String[] args) {                        // (3)
      SpringApplication.run(Application.class, args);              //
   }                                                               //
}

在上述代码中,numbered 点的含义如下:

  1. 这是 @SpringBootApplication 声明。可能会注意到,我们仍然必须为 Spring Boot 应用程序定义一个最小的声明。此外,我们使用 @EnableBinding 注释和 Processor 接口作为参数。在这样的组合中,Spring Cloud 在 (2) 行标识一个 bean,以用作消息处理程序。此外, input 和 output函数被绑定到 Processor 绑定公开的外部目的地。
  2. 这显示了 Function,它将 Flux 转换成另一个Flux  作为 IoC 容器的一个组件。在这里,在 (2.1) 处,我们声明了一个用于元素验证的 lambda,它又是一个接受流并返回另一个流的高阶函数。
  3. 这是 main 方法声明,用于 引导一个Spring Boot应用程序。

 

从前面的例子可以看出,Spring Cloud Function 支持不同的编程模型。例如,支持 Reactor 3 反应类型和 Spring Cloud Streams 的消息转换,它将这些流暴露给外部目的地。

此外,Spring Cloud Function 不限于预定义的函数。例如,它有一个内置的运行时编译器,可以在属性文件中以字符串形式提供函数,如下例所示:

spring.cloud.function:                                             // (1)
   compile:                                                        // (2)
      payments:                                                    // (3)
         type: supplier                                            // (4)
         lambda: ()->Flux.just(new org.TestPayment())              // (5)

numbered 代码的每一部分解释如下:

  1. 这是 Spring Cloud 函数属性命名空间。
  2. 这是与运行时(动态)函数编译相关的命名空间。
  3. 这是键的定义,它是在 Spring IoC 容器中可见的函数的名称。这扮演了已编译 Java 字节码的文件名的角色。
  4. 这是一种函数的定义。可用的选项是 supplier/function/consumer
  5. 这是 lambda 定义。正如我们所见,供应商被定义为一个 String 它被编译成字节码并存储在文件系统中。在 spring-cloud-function-compiler 模块的支持下,编译是可能的。它有一个内置编译器,并且可以将编译后的函数存储为字节码并 将它们添加到 ClassLoad。 

前面的示例显示 Spring Cloud Function 提供了动态定义和运行函数的能力,而无需预先编译它们。这种能力可用于实现功能即服务FaaS ) 软件解决方案中的功能。

除此之外,Spring Cloud Function 提供了一个名为 spring-cloud-function-task的模块,它允许使用相同的属性文件在管道中运行上述函数:

spring.cloud.function:
task:                                                           // (1)
supplier: payments                                           // (2)
function: validate|process                                   // (3)
consumer: print                                              // (4)
   compile:                                                        
      print:                                                    
type: consumer
         lambda: System.out::println
         inputType: Object
      process:                                                    
type: function
         lambda: (flux)->flux
         inputType: Flux<org.rpis5.chapters.chapter_08.scf.Payment>
         outputType: Flux<org.rpis5.chapters.chapter_08.scf.Payment>

代码中的编号解释如下:

  1. 这是用于任务配置的命名空间。 
  2. 在这里,我们为任务配置一个 supplier (source) 函数。如我们所见,要定义供应商,我们必须传递供应商函数的名称。
  3. 这些是数据的中间转换。在这里,为了管道执行,我们可以使用 (pipe) | 符号组合几个函数。请注意,在底层,所有函数都使用 Function#accept 方法链接。
  4. 这是消费者阶段的定义。请注意,仅在提供所有阶段时才执行任务。

正如我们所见,通过使用带有 Spring Cloud Function 模块作为依赖项的纯 Spring Boot 应用程序,可以运行用户准备好的函数并将它们组合在一个复杂的处理程序中。

 spring-cloud-function-compiler 模块在 Spring Cloud Function 生态系统中扮演着重要角色。除了从属性文件进行动态函数编译外,此模块还公开了 Web 端点,它 允许动态函数部署。 例如, 通过在终端中调用以下 curl 命令,我们可以将提供的函数添加到正在运行的 Spring Boot 应用程序中:

curl -X POST -H "Content-Type: text/plain" \
  -d "f->f.map(s->s.toUpperCase())" \
  localhost:8080/function/uppercase\
  ?inputType=Flux%3CString%3E\
  &outputTupe=Flux%3CString%3E

在此示例中,我们上传了一个函数,其中 Flux<String> 作为它的输入和输出类型。

笔记

请注意,我们在这里使用 %3C %3E 符号来编码 < > 在 HTTP URI 中。

笔记

 spring-cloud-function-compiler 作为服务器运行有两种选择: 

  • Download a JAR file from maven-central and run it independently
  • Add the module as a dependency to the project and provide the following path to scan for beans: "org.springframework.cloud.function.compiler.app"

通过 running 部署一个函数后,依赖于 spring-cloud 的轻量级 Spring Boot 应用程序-function-web and spring-cloud-function-compiler, 我们获得了基于 HTTP 的 on-the-fly 函数部署及其动态部署为一个单独的网络应用程序。例如,通过使用相同的 jar 文件并更改程序参数,我们可以使用不同的函数运行它,如下所示:

java -jar function-web-template.jar \
  --spring.cloud.function.imports.uppercase.type=function \
  --spring.cloud.function.imports.uppercase.location=\
file:///tmp/function-registry/functions/uppercase.fun \
\
  --spring.cloud.function.imports.worldadder.type=function \
  --spring.cloud.function.imports.worldadder.location=\
file:///tmp/function-registry/functions/worldadder.fun

在这个例子中,我们导入了两个函数:

  • Uppercase: Which transforms any given string into an uppercase equivalent
  • Worldadder: Which adds the world suffix to any given string

 

从前面的示例代码可以看出,我们使用 spring.cloud.function.imports 命名空间来定义导入函数的名称( 粗体),然后是它们的类型(斜体) 这些字节码的位置功能。应用程序启动成功后,我们可以通过执行以下 curl 命令来访问部署的函数:

curl -X POST -H "Content-Type: text/plain" \
   -d "Hello" \
   localhost:8080/uppercase%7Cworldadder

作为执行的结果 我们收到"HELLO World",它确保这两个函数都存在于服务器上并按照URL中定义的顺序 执行。

笔记

我们在这里使用 %7C 符号来编码(管道) | in HTTP URI。

同样,我们可以在相同或独立的应用程序中部署和导入其他函数

或者,Spring Cloud Function 提供了一个部署模块,该模块扮演独立功能的容器的角色。在之前的案例中,我们能够运行内置函数或通过 spring-cloud-function-compiler web API 进行部署。我们已经了解了如何使用已部署的功能并将它们作为独立的应用程序运行。尽管具有这种灵活性,但 Spring Boot 应用程序的启动时间可能比该功能的执行时间长得多。在某些情况下(连同纯函数),我们必须使用一些 Spring Framework 库。例如,依赖 Spring Data 或 Spring Web features。因此,在这种情况下可能有用的是部署薄罐。 Spring Cloud Function 在这里提供了一个额外的模块,称为 spring-cloud-function-deployer

Spring Cloud Function Deployer 模块允许运行 每个 jar 具有相同的 Spring Deployer 应用程序,但完全隔离。乍一看,使用该模块并没有获得宝贵的好处。然而,我们可能还记得,独立函数(这是我们想要实现的)在它们的引导和执行中是快速的。打包到 Spring Boot 环境中启动的函数需要启动整个 Spring Boot 应用程序,与函数的启动时间相比,这通常需要大量时间。

因此,为了解决这个问题,Spring Cloud Function Deployer 首先启动自己并预加载部分 JDK 类。然后它为每个带有函数的 jar 创建子 ClassLoader 。  每个 jar 的执行发生在它自己的 Thread 这使得并行执行成为可能。由于每个 jar 都是一个独立的微型 Spring Boot 应用程序,它在自己的 Spring 上下文中运行,因此 bean 不会与相邻应用程序的 bean 相交。最后,子 Spring Boot 应用程序的启动速度明显更快,因为父 ClassLoader 已经完成了 JVM 预热的艰巨工作。

此外, spring-cloud-function-deployer 和 spring-boot-thin-launcher的杀手锏组合 也可以解决 fat jar 问题。 Spring Boot Thin Launcher 是一个插件用于 Maven 和 Gradle,它 覆盖默认的 Spring Boot fat JarLauncher 并提供 ThinJarWrapper 和 < code class="literal">ThinJarLauncher 改为。这些类首先完成打包无依赖项 jar 所需的所有工作,然后——仅在引导阶段——它们从配置的缓存(例如,从本地 Maven 存储库)中找到所有需要的依赖项,或者从配置的 Maven 存储库。以这种方式运行,我们的应用程序可能会将 jar 的大小减少到几 KB,并将启动时间减少到数百毫秒。

总结 得到的关于Spring Cloud Function的信息 ,我们来看下面这张生态系统的概括图:

读书笔记《hands-on-reactive-programming-in-spring-5》使用 Cloud Streams 进行扩展

图 8.11。该  Spring Cloud Function 的生态系统

在上图中,numbered 点的含义如下:

  1. 这是六边形形式的函数表示。正如我们所见,这里描绘了几种不同类型的六边形。其中一些是 Spring Boot 应用程序中函数的组合,或者是通过 HTTP 公开的函数。另一个可以在 Spring Cloud Stream Adapter 的支持下与其他功能进行通信,或者可以部署为单个执行的任务。
  2. 这是 Spring Cloud Function Deployer 的表示。如前所述,Spring Cloud Function Deployer 被描述为一个容器。在这种情况下,我们在不同的节点上执行了两个独立的 Spring Cloud Function Deployer。此外,容器内函数周围的虚线边框代表独立的类加载器。
  3. 这是 Spring Cloud Function Compiler 模块的表示。在这种情况下,模块扮演服务器的角色,它允许函数通过 HTTP 部署并保存在存储中。
  4. 这是消息代理的表示,在本例中为 RabbitMQ。

 

正如我们所见,使用 Spring Cloud Function 模块以及与现有 Cloud 平台的直接集成,我们可以构建自己的 Function as a Service (FaaS) 平台,几乎提供了 Spring Framework 的全部功能,允许使用 lightweight 函数。但是,我们必须记住,当有实例部署、监控和管理的基础时,Spring Cloud Function 显示出强大的功能,因此可以在此基础上构建 Spring Cloud Function 生态系统并公开 FaaS 功能。因此,在下一节中,我们将介绍 Spring Cloud Function 如何与成熟的 Spring Cloud Data Flow 生态系统结合使用。

Spring Cloud——作为数据流的一部分

现在,对 Spring Cloud Function 生态系统有了足够的了解,我们可以回到主题,看看如何使用这个很棒的模块。有一个 additional 模块,名为 Spring Cloud Starter < /span>Stream App Function,可以在 Spring Cloud Data Flow 中使用 Spring Cloud Function 功能。这个模块允许我们使用纯 jars 并将它们部署为 Spring Cloud Data Flow 的一部分,而不需要 Spring 的任何 redundant 开销Boot. 由于我们这里有一个简单的映射,一个简化的 Validation 函数可以简化为一个传统的 Function 代码> 函数 可以如下所示:

public class PaymentValidator 
       implements Function<Payment, Payment> { 

  public Payment apply(Payment payment) { ... } 
}

打包发布一个工件后,我们或许可以编写如下流管道脚本,将我们的 HTTP 源连接到 Spring Cloud Function Bridge:

SendPaymentEndpoint=Endpoint: http --path-pattern=/payment --port=8080 | Validator: function --class-name=com.example.PaymentValidator --location=https://github.com/PacktPublishing/Hands-On-Reactive-Programming-in-Spring-5/tree/master/chapter-08/dataflow/payment/src/main/resources/payments-validation.jar?raw=true 

笔记

在撰写本文时,Spring Cloud Data Flow 的 Spring Cloud Function  模块未包含在默认的 Applications 和 Tasks 包中,应通过提供以下批量导入属性进行注册:

source.function=maven://org.springframework.cloud.stream.app:function-app-rabbit:1.0.0.BUILD-SNAPSHOT

source.function.metadata=maven://org.springframework.cloud.stream.app:function-app-rabbit:jar:metadata:1.0.0.BUILD-SNAPSHOT

processor.function=maven://org.springframework.cloud.stream.app:function-app-rabbit:1.0.0.BUILD-SNAPSHOT

processor.function.metadata=maven://org.springframework.cloud.stream.app:function-app-rabbit:jar:metadata:1.0.0.BUILD-SNAPSHOT

sink.function=maven://org.springframework.cloud.stream.app:function-app-rabbit:1.0.0.BUILD-SNAPSHOT

sink.function.metadata=maven://org.springframework.cloud.stream.app:function-app-rabbit:jar:metadata:1.0.0.BUILD-SNAPSHOT

最后,为了完成流程的第一部分,我们必须向不同的目的地提供经过验证的付款,并根据验证结果选择端点。由于验证函数是一个纯函数,不应该访问基础设施(例如 RabbitMQ 路由标头),我们应该将该责任委托给其他地方。幸运的是,Spring Cloud Data Flow 提供了 Router Sink,它允许基于如下表达式将传入消息路由到不同的队列:

...  | router --expression="payload.isValid() ? 'Accepted' : 'Rejected'"

或者,我们可以配置一个源来监听 particular 消息队列名称。例如,管道脚本负责监听名为 Accepted 的 RabbitMQ 通道,如下所示:

...
Accepted=Accepted: rabbit --queues=Accepted

 

根据付款流程图,付款处理的以下步骤将其状态保持在  接受 状态。通过这种方式,用户可以使用他们的付款访问特定页面,并检查每笔付款的处理状态。因此,我们应该提供与数据库的集成。例如,我们可以在 MongoDB 中存储付款转换的状态。 Spring Cloud Data Flow 提供了一个 MongoDB Sink。使用它,我们可以轻松地将传入消息写入 MongoDB。依靠 Spring Data Flow 插件,我们可以将消息广播到 MongoDB 接收器和下一个执行步骤。这种技术 只能 在完全可靠的消息代理(如Apache Kafka)的情况下是一个有效的解决方案。我们可能知道,卡夫卡 坚持 信息。因此,即使执行在某个阶段崩溃,消息也会在消息代理中可用。因此,MongoDB 拥有一个旨在在 UI 中使用的状态,而实际的处理状态则保存在  消息代理;因此,它可以在任何时间点重播。另一方面,对于快速、内存中的 message 代理,例如 RabbitMQ,依赖存储在 MongoDB 中的状态作为事实来源。因此,我们必须确保在执行下一步之前已经存储了付款的状态。不幸的是,为了实现这样的功能,我们必须编写一个自定义的 Spring Cloud Stream 应用程序,将 MongoDB 包装为流程中的一个处理阶段。

通过对流程的其余部分重复类似的操作,我们可以实现以下执行流程:

读书笔记《hands-on-reactive-programming-in-spring-5》使用 Cloud Streams 进行扩展

图 8.12。在 Spring Cloud Data Flow 用户界面的支持下完成了 Payment 的执行流程

笔记

上图显示了一个内置的 Spring Cloud Data Flow 仪表板,它允许使用基于浏览器的 GUI 构建流和管理应用程序。我们不会在此仪表板上详细介绍,但我们可以通过访问以下链接了解更多信息:https://docs.spring.io/spring-cloud-dataflow/docs/current/reference/htmlsingle/#dashboard, < a class="ulink" href="https://github.com/spring-projects/spring-flo/" target="_blank">https://github.com/spring-projects/spring-flo/ https://github.com/spring-云/spring-cloud-dataflow-ui.除了仪表板选项外,还有一个数据流 shell 客户端,提供与仪表板相同的功能。要了解有关数据流 shell 的更多信息,请参阅以下链接:https://docs.spring.io/spring-cloud-dataflow/docs/current/reference/htmlsingle/#shell

前面的流可视化由以下管道脚本表示:

SendPaymentEndpoint=Endpoint: http --path-pattern=/payment --port=8080 | Validator: function --class-name=com.example.PaymentValidator --location=https://github.com/PacktPublishing/Hands-On-Reactive-Programming-in-Spring-5/tree/master/chapter-08/dataflow/payment/src/main/resources/payments.jar?raw=true | router --expression="payload.isValid() ? 'Accepted' : 'Rejected'"

Accepted=Accepted: rabbit --queues=Accepted | MarkAccepted: mongodb-processor --collection=payment | Limiter: function --class-name=com.example.PaymentLimiter --location=https://github.com/PacktPublishing/Hands-On-Reactive-Programming-in-Spring-5/tree/master/chapter-08/dataflow/payment/src/main/resources/payments.jar?raw=true | router --expression="payload.isLimitBreached() ? 'Rejected' : 'Checked'"

Checked=Checked: rabbit --queues=Checked | MarkChecked: mongodb-processor --collection=payment | Approver: function --class-name=com.example.PaymentApprover --location=https://github.com/PacktPublishing/Hands-On-Reactive-Programming-in-Spring-5/tree/master/chapter-08/dataflow/payment/src/main/resources/payments.jar?raw=true | router --expression="payload.isApproved() ? 'Approved' : 'Rejected'"

Approved=Approved: rabbit --queues=Approved | MarkApproved: mongodb-processor --collection=payment | Executor: function --class-name=com.example.PaymentExecutor --location=https://github.com/PacktPublishing/Hands-On-Reactive-Programming-in-Spring-5/tree/master/chapter-08/dataflow/payment/src/main/resources/payments.jar?raw=true | router --expression="payload.isExecuted() ? 'Executed' : 'Rejected'"

Executed=Executed: rabbit --queues=Executed | MarkExecuted: mongodb --collection=payment

Rejected=Rejected: rabbit --queues=Rejected | MarkRejected: mongodb --collection=payment

最后,通过部署该流,我们可以执行支付并在控制台中查看执行日志。

笔记

要与安装的 Spring Cloud Data Flow 服务器一起运行代码,我们必须安装 RabbitMQ 和 MongoDB。

这里值得注意的一点是,deployment 流程就像业务逻辑开发一样简单。首先,Spring Cloud Data Flow 工具包构建在 Spring Cloud Deployer 之上,用于部署到 Cloud Foundry、Kubernetes、Apache Mesos 或 Apache YARN 等现代平台上。该工具包公开了允许配置应用程序源的 Java API(例如,Maven 存储库、artifactId groupIdversion) 及其后续部署到目标平台。 除此之外, Spring Cloud Deployer 足够灵活,并提供更广泛的配置和属性列表,其中之一是可部署实例的副本数。

笔记

已部署的应用程序实例组的高可用性、容错性或弹性直接取决于平台和 Spring Cloud Deployer 本身,对此不提供任何保证。例如,不建议将 Spring Cloud Deployer Local 用于生产案例。该工具包的本地版本旨在使用 Docker 在一台机器上运行。需要注意的是,Spring Cloud Deployer SPI 不提供额外的监控或维护,并期望底层平台提供所需的功能。

拥抱上述可能性,Spring Cloud Data Flow 提供一键单击(或一个终端命令)部署 能够传递所需的配置和属性。

总而言之,我们从 Spring Cloud Streams 的基础开始,以对几个模块的强大抽象结束了这个故事。结果,我们看到,在这些项目的支持下,我们可以通过应用反应式编程的不同抽象来构建反应式系统。 提到的 使用消息代理进行异步、可靠消息传递的技术涵盖了大多数业务需求。此外,该技术总体上可以降低反应式系统的开发成本,并可用于快速开发大型网络商店、物联网或聊天应用程序等系统。然而,重要的是要记住,尽管所描述的方法提高了系统可靠性、可伸缩性和吞吐量,但由于额外的通信,它可能会损害请求处理延迟,尤其是与持久消息代理的通信。因此,如果我们的系统可以容忍发送和接收消息之间的几毫秒的额外延迟,那么这种方法可能适用于我们的案例。

用于低延迟、反应式消息传递的 RSocket


在上一节中,我们学习了如何使用 Spring Cloud Streams 及其在 Spring Cloud Data 中的变体轻松实现反应式系统流动。反过来,我们学习了如何在 Spring Cloud Function 的支持下使用轻量级函数构建细粒度系统,以及将它们组合到流中的简单性。

然而,这种简单性和灵活性的一个缺点是失去了低延迟方法。如今,在某些应用领域,每一毫秒都起着至关重要的作用。例如,股票交易市场、在线视频游戏或实时制造控制系统。使用这样的系统,将时间浪费在消息的排队和出队上是不可接受的。反过来,可能会注意到,大多数可靠的消息代理都会持久化消息,这会增加传递消息所需的时间。

 

在分布式系统中实现服务之间的低延迟通信的可能解决方案之一是使用服务之间的连续直接连接。例如,当在应用程序之间连接连续的 TCP 连接时,我们可以实现服务之间的直接通信,具有较低的延迟和一些交付保证,具体取决于给定的传输。除此之外,使用更广为人知的 protocols(例如 WebSocket)允许使用 Spring WebFlux 的  ReactorNettyWebSocketClient。

然而,正如我们可能在本章前面所记得的那样,以及服务之间的紧密耦合(因为连接),WebSockets 的使用不符合反应式系统的要求,因为 协议不提供 控制背压,这是弹性系统的重要组成部分。

 

幸运的是,反应流规范背后的团队了解跨网络、异步、低延迟通信的必要性。 2015 年年中,Ben Christensen 与一组专家发起了一个名为 RSocket 的新项目。

RSocket 项目的中心目标是在异步、二进制边界上提供具有反应流语义的应用程序协议。

笔记

要了解有关创建 RSocket 项目的动机的更多信息,请访问以下链接:https://github.com/rsocket/rsocket/blob/master/Motivations.md

RSocket 与 Reactor-Netty

乍看之下,RSocket 似乎不是很创新;如今,我们已经拥有诸如 RxNetty 或 Reactor-Netty(Netty 的默认 WebFlux 包装器)之类的 Web 服务器。这些解决方案 提供了建立在反应流类型之上的API(允许从/向网络读取或写入),这意味着背压支持。然而,这种背压的唯一问题是它是孤立地工作的。随后,这意味着仅在组件和网络之间连接适当的背压支持。

例如,使用 Reactor-Netty,我们只有在准备好时才能使用传入的字节。同样,网络的准备情况 暴露为 Subscription#request 方法调用。这里的核心问题是组件的实际需求没有跨越网络边界,如下图所示:

读书笔记《hands-on-reactive-programming-in-spring-5》使用 Cloud Streams 进行扩展

图 8.13。孤立背压的例子

 

从上图我们可以看出,这里有两个服务 (Service A和 服务 B)。除此之外,我们还有一个抽象的 Adapter,它允许服务(业务逻辑)通过 Network. 在这个例子中,假设我们的适配器是 Reactor-Netty,它提供反应式访问。这意味着背压控制在传输级别正确实施。因此, 适配器会通知发布者  大约 此时它可以写入多少元素。在我们的例子中, Service A 创建一个continuous服务 B 的连接(例如,WebSocket 连接) > 一旦连接可用,它就会开始向适配器发送元素。因此,适配器会妥善处理向网络写入元素。在连接的另一端,我们有Service B,它通过同一个适配器使用来自网络的消息。作为SubscriberService B向适配器表达需求,并在适当数量的接收到字节后,适配器将它们转换为逻辑元素,并将其发送到业务逻辑。同样,一旦元素被发送到适配器,适配器会负责将其转换为字节,然后按照传输级别定义的流控制将它们发送到网络。

我们可能记得 第 3 章Reactive Streams - 新 Streams 的标准,当消费者足够慢以至于生产者很容易溢出时,生产者和消费者之间的最坏情况就会出现消费者。为了强调隔离背压的问题,假设我们有一个慢的消费者和一个快的生产者,所以部分事件在Socket Buffer< /跨度>。不幸的是,缓冲区的大小是有限的,并且在某些时候,网络数据包可能会开始丢弃。当然,在许多方面,传输的行为取决于交互协议。例如,可靠的网络传输(如 TCP)具有包含 sliding 窗口和 确认。 这意味着,一般来说,背压是可以实现的在二进制级别(在接收到的字节和相应的确认级别)。在这种情况下,TCP 负责重新传递消息以及缺少确认会降低发布者速度的情况  Service A 。这里的缺点是对应用程序本身性能的影响很大,因为我们必须处理丢弃的包的重新交付。此外,通信的性能也可能会降低,因此系统的整体稳定性可能会丢失。尽管这样的传输流控制是可以接受的,但是连接的利用率很低。这是因为我们不能重用同一个连接来复用多个逻辑流。另一方面是当消费者无法遵循传输流控制时,消费者会继续在内部缓冲字节,这可能会以 OutOfMemoryError 告终。

笔记

要了解更多关于 TCP 流量控制的信息,请查看以下链接:https://hpbn.co/building-blocks-of-tcp/#flow-control

与使用 Reactor Netty 或 RxNetty 可以轻松实现的隔离反应式 Publisher-Subscriber 通信相比,RSockets 提供了一个 二进制 protocol 是一种跨异步网络边界的反应流和语义的应用协议:

读书笔记《hands-on-reactive-programming-in-spring-5》使用 Cloud Streams 进行扩展

图 8.14。使用 RSocket 的背压示例

 

从上图中我们可以看出,通过 RSocket 协议,我们获得了对跨越网络边界传输需求的支持。因此,生产者服务可以响应请求并发送相应数量的onNext信号,这些信号也通过网络传输。此外,RSocket 工作在负责将数据写入网络的适配器之上,以便本地需求可以协调< /a> 与收到的来自消费者服务的需求隔离。一般来说,RSocket 是一种传输不可知论者,它与 TCP 一起支持 Aeron 和通过 WebSocket 进行的通信。

乍一看,似乎连接的使用效率很低,因为服务之间的交互可能很低。然而,RSocket 协议的强大功能之一是它允许为同一服务器和客户端之间的许多流重用相同的套接字连接。因此,它可以优化一个连接的使用。

与协议一起,RSockets 启用 对称交互模型,例如:

  • Request/response: A stream of one element in the request and response
  • Request/stream: A stream of one element in the request stream and a finite/infinite stream as the response
  • Fire-and-forget: A stream of one element in the request and a Void (no elements) response stream
  • Channel: Fully bi-directional finite/infinite streams for both the request and response

正如我们所见,RSocket 提供了大量常用的异步消息传递交互模型,一开始只获取单个连接。

Java中的RSocket

RSocket 协议和 interaction 模型在 Java(以及 C++、JS、Python、和 Go)并在 Reactor 3 之上实现。RSocket-Java 模块提供了以下编程模型:

RSocketFactory                                                     // (1)
   .receive()                                                      // (1.1)
   .acceptor(new SocketAcceptorImpl())                             // (1.2)
   .transport(TcpServerTransport.create("localhost", 7000))        // (1.3)
   .start()                                                        // (1.4)
   .subscribe();                                                   //

RSocket socket = RSocketFactory                                    // (2)
   .connect()                                                      // (2.1)
   .transport(TcpClientTransport.create("localhost", 7000))        // 
   .start()                                                        //
   .block();                                                       // (2.2)

socket                                                             // (3)
   .requestChannel(                                                // (3.1)
      Flux.interval(Duration.ofMillis(1000))                       //
          .map(i -> DefaultPayload.create("Hello [" + i + "]"))    // (3.2)
   )                                                               //
   .map(Payload::getDataUtf8)                                      // (3.3)
   .doFinally(signalType -> socket.dispose())                      //
   .then()                                                         // 
   .block();                                                       //

编号代码的每个部分都在此处解释:

  1. 这是服务器(接收器)RSocket 定义。在 (1.1) 点,我们确定我们的目标是创建一个 RSocket 服务器,所以我们想使用 .receive() 方法在这里。在 (1.2) 点,我们提供了 SocketAcceptor 实现,它是在传入客户端连接上调用的处理程序方法的定义。反过来,在 (1.3) 点,我们定义了首选传输,在这种情况下是 TCP 传输。请注意,TCP 传输提供程序是 Reactor-Netty。最后,要开始监听定义的套接字地址,我们启动服务器并 .subscribe() 到它。
  2. 这是客户端 RSocket 定义。这里 (2.1),我们使用 .connect(),提供客户端的RSocket 实例。为了在该示例中简单起见,请注意我们使用了 .block()  方法来等待成功连接并获取活动 RSocket
  3. 这演示了对服务器的请求的执行。在这个例子中,我们使用了通道交互(3.1),所以在发送消息流的同时,我们也接收到一个流。请注意,流中消息的默认表示是 Payload 类。因此,在这一点 (3.2), 我们必须将消息包装到 Payload (在这种情况下,进入默认实现 DefaultPayload) 或将其解包 (3.3),例如 字符串。 

在前面的示例中,我们在客户端和服务器之间进行了有线双工通信。在这里,所有通信都是在对 Reactive Streams 规范和 Reactor 3 的支持下完成的。

此外,重要的是要提到 SocketAcceptor 的实现:

class SocketAcceptorImpl implements SocketAcceptor {               // (1)

@Override                                                       // (2)
public Mono<RSocket> accept(                                    // 
      ConnectionSetupPayload setupPayload,                         // (2.1)
RSocket reactiveSocket                                       // (2.2)
   ) {
return Mono.just(new AbstractRSocket() {                     // (3)
@Override                                                 // 
public Flux<Payload> requestChannel(                      // (3.1)
            Publisher<Payload> payloads                            // (3.2)
         ) {                                                       //
return Flux.from(payloads)                             // (3.3)
                       .map(Payload::getDataUtf8)                  //
                       .map(s -> "Echo: " + s)                     //
                       .map(DefaultPayload::create);               //
}                                                         //
      });                                                          //
}                                                               //
}

代码中的编号解释如下:

  1. 这是 SocketAcceptor 接口的实现。请注意 SocketAcceptor是服务器端接受器的表示。 
  2. SocketAcceptor 接口只有一个名为 accept 的方法。这个方法有两个参数,包括ConnectionSetupPayload参数(2.1),它代表第一个 握手 在连接期间从客户端 接线。正如我们在本节中可能记得的那样,RSocket 本质上是双工连接。这种性质由 accept 方法的第二个参数表示,称为 sendingRSocket  (2.2)。 使用第二个参数,服务器可以开始向客户端发送流式请求,就好像服务器是交互的发起者一样。 
  3. 这是处理程序 RSocket 声明。在这种情况下,AbstractRSocket 类是 RSocket 接口的抽象实现,它发出 UnsupportedOperationException 适用于 任何处理方法。随后,通过重写方法之一 (3.1),我们可以声明我们的服务器支持哪些交互模型。最后,在 (3.3) 处,我们提供了 echo 功能,它采用正在进行的流 (3.2) 并修改传入的消息。

 

我们可以看到,SocketAcceptor的定义并不代表handler的定义。在这种情况下, SocketAcceptor#accept 方法的调用指的是一个新的传入连接。反过来,在 RSocket-Java 中,RSocket 接口同时表示客户端和服务器的处理程序。最后,各方之间的通信是点对点通信,这意味着双方都可以处理请求。

此外,为了实现可扩展性,RSocket-Java 提供了 RSocketLoadBalancer 模块,该模块可以与 Eureka 等服务注册中心集成。例如,以下 代码显示了与 Spring Cloud Discovery 的简单集成:

Flux                                                     
    .interval(Duration.ofMillis(100))                              // (1)
    .map(i ->
discoveryClient
          .getInstances(serviceId)                                 // (2)
          .stream()                                                //
          .map(si -> 
             new RSocketSupplier(() ->
RSocketFactory.connect()                           // (3)
                              .transport(                          //
                                 TcpClientTransport.create(        //
                                    si.getHost(),                  // (3.1)
                                    si.getPort()                   // (3.2)
                                 )                                 //
                              )                                    //
                              .start()                             //
             ) {
public boolean equals(Object obj) { ... }          // (4)
                                                                   //
public int hashCode() {                            //
return si.getUri().hashCode();                  // (4.1)
}                                                  //
             }
          )
          .collect(toCollection(ArrayList<RSocketSupplier>::new))
    )
    .as(LoadBalancedRSocketMono::create);                          // (5)

在前面的代码中,编号点的含义如下:

  1. 这是 .interval() 运算符声明。在这里,我们的想法是使用一些 serviceId 运行定期 检索可用实例。
  2. 这显示了实例列表的检索。
  1. 这是来自 Mono<RSocket> 创作的Supplier。我们使用服务器主机(3.1)和端口(3.2)等信息 从每个给定的ServiceInstance 创建一个适当的 传输连接。
  2. 这是匿名的 RSocketSupplier 创建。在这里,我们重写 equalshashCode 以便区分哪个 RSocketSupplier 都是一样的。请注意,在底层, LoadBalancedRSocketMono 使用 HashSet,所有接收到的实例都存储在其中。反过来,我们使用 URI 作为组中实例的唯一标识符。
  3. 这是 Flux<Collection<RSocketSupplier>>LoadBalancedRSocketMono的转换阶段。请注意,即使结果是 Mono 类型的实例,LoadBalancedRSocketMono 也是有状态的。因此,每个新订户可能会收到不同的结果。在底层,LoadBalancedRSocketMono使用预测负载平衡算法选择 RSocket 实例并将选择的实例返回给订阅者。

前面的例子展示了一个简单的方法 集成LoadBalancedRScoketMono 代码类="literal">DiscoveryClient。即使上面提到的例子效率不高,我们仍然可以学习如何正确使用 LoadBalancedRSocketMono

概括地说,RSocket 是一种遵循 Reactive Streams 语义的通信协议,并通过背压控制支持扩展了跨网络边界的流式通信的新视野。反过来,有一个强大的基于 Reactor 3 的实现,它提供了一个简单的 API 用于在对等点之间建立连接,并在交互的生命周期中有效地利用它。

RSocket 与 gRPC

如果有一个名为 gRPC 的知名框架,我们可能仍然想知道为什么我们需要一个单独的框架。 gRPC 描述如下:

"一个高性能、开源、通用的 RPC 框架 (https://github.com/grpc)"

 

该项目最初由 Google 开发,旨在通过 HTTP/2 提供异步消息传递。它使用 Protocol Buffers (Protobuf) 作为 接口描述语言IDL)和 作为其基础< /span> 消息 interchange 格式。

笔记

要了解有关使用 IDL 和 Protobuf 的服务定义的更多信息,请参阅以下链接:https://grpc.io/docs/guides/concepts.html#service-definition

一般来说,gRPC 提供了与 Reactive Streams 几乎相同的 messaging 语义,并提供以下接口:

interface StreamObserver<V>  {

void onNext(V value);

void onError(Throwable t);

void onCompleted();
}

正如我们所见,语义是从 RxJava 1 到 Observer  的 1:1。反过来,gRPC 的 API 提供了 Stream< /code> 接口,扩展了以下方法: 

public interface Stream {

void request(int numMessages);

   ...

boolean isReady();

   ...
}

查看前面的代码,我们可能会感觉到 gRPC 以及异步消息传递也提供了背压控制支持。但是,这部分有点棘手。总的来说,交互流程有点类似于我们在 Diagram 8.13 中看到的,唯一的区别是它支持 flow control with更高的粒度。由于 gRPC 构建在 HTTP/2 之上,因此该框架使用 HTTP/2 流控制作为提供细粒度背压控制的构建块。尽管如此,流控制仍然依赖于以字节为单位的滑动窗口大小,因此逻辑元素级别粒度上的背压控制未被覆盖。

 

 

gRPC 和 RSocket 之间的另一个显着区别是 gRPC 是一个 RPC 框架,而 RSocket 是一个协议。 gRPC 基于 HTTP/2 协议,并为服务存根和客户端提供代码生成。默认情况下,gRPC 使用 Protobuf 进行消息传递格式;但是,它也可能支持 其他格式,例如JSON。同时,RSocket 只为服务端和客户端提供了一个响应式的实现。此外,还有一个名为 RSocket-RPC 的独立 RPC 框架,它建立在 RSocket 协议之上,并提供 gRPC 的所有功能。 RSocket-RPC 允许以与 gRPC 相同的方式基于 Protobuf 模型生成代码。因此,任何使用 gRPC 的项目都可以顺利迁移到 RSocket-RPC。

笔记

要了解有关 RSocket-RPC 的更多信息,请参阅以下链接:https://github. com/netifi/rsocket-rpc.要了解有关 gRPC 中的背压控制支持的更多信息,请参阅以下链接: https://github.com/salesforce/reactive-grpc#back-pressure

Spring框架中的RSocket

尽管该实现为使用 Reactor API 编写异步、低延迟和高吞吐量通信提供了更广泛 的可能性,但它留下了很多为开发人员使用基础设施配置。幸运的是,Spring 团队重视该项目,并开始尝试在 Spring 生态系统中采用如此出色的解决方案,并使用简化的编程模型而不是注解。

其中一项实验称为 Spring Cloud Sockets,旨在提供一种熟悉的(与 Spring Web 相比)编程模型来声明注释:

@SpringBootApplication                                             // (1)
@EnableReactiveSockets                                             // (1.1)
public static class TestApplication {                              //

   @RequestManyMapping(                                            // (2)
      value = "/stream1",                                          // (2.1)
      mimeType = "application/json"                                // (2.2)
   )                                                               //
   public Flux<String> stream1(@Payload String a) {                // (2.3)
      return Flux.just(a)                                          //
                 .mergeWith(                                       //
                    Flux.interval(Duration.ofMillis(100))          //
                        .map(i -> "1. Stream Message: [" + i + "]")//
                 );                                                //
   }                                                               //

   @RequestManyMapping(                                            // (2)
      value = "/stream2",                                          // (2.1)
      mimeType = "application/json"                                // (2.2)
   )                                                               //
   public Flux<String> stream2(@Payload String b) {                // (2.3)
      return Flux.just(b)                                          //
                 .mergeWith(                                       //
                    Flux.interval(Duration.ofMillis(500))          //
                        .map(i -> "2. Stream Message: [" + i + "]")//
                 );                                                //
   }                                                               //
}

代码中的编号在这里解释:

  1. 这是 @SpringBootApplication 定义。在这里,在 (1.1) 点,我们定义了 @EnableReactiveSockets 注解,它提供 所需的配置并启用应用程序中的 RSocket。
  2. 这是处理程序方法声明。这里,我们使用 @RequestManyMapping 注解来指定当前方法在请求流< /span> 交互模型。  the  提供的一个引人注目的功能Spring Cloud Sockets 模块是它提供了开箱即用的映射(路由)并允许定义处理程序映射的路径 (2.1) 和传入消息的 mime-type (2.2)。最后,还有一个额外的 @Payload 注释 (2.3),这表明给定的参数是传入请求的负载。

在这里,我们有熟悉的 Spring Boot 应用程序,它在 Spring Cloud Socket 的支持下,启用了 RSocket-Java 库的附加功能。反过来,Spring Cloud Sockets 也提供了 simplification 从客户端的角度与服务器交互:

public interface TestClient {                                      // (1)
   @RequestManyMapping(                                            // (2)
      value = "/stream1",                                          //
      mimeType = "application/json"                                //
   )                                                               //
   Flux<String> receiveStream1(String a);                          //

   @RequestManyMapping(                                            // (2)
      value = "/stream1",                                          //
      mimeType = "application/json"                                //
   )                                                               //
   Flux<String> receiveStream2(String b);                          //
}

在这里,我们所要做的就是在提供接口(1)时使用Spring Cloud Sockets声明一个RSocket Client。为了启用 RSocket 客户端,我们必须在   客户端的方法之上定义相同的注释,如在服务器示例中,并定义对应的处理程序路径。

因此,接口可以在运行时使用 ReactiveSocketClient工厂轻松转换为 代理,如下所示例子:

ReactiveSocketClient client = new ReactiveSocketClient(rSocket);
TestClient clientProxy = client.create(TestClient.class);

Flux.merge(
       clientProxy.receiveStream1("a"),
       clientProxy.receiveStream2("b")
    )
    .log()
    .subscribe();

笔记

Spring Cloud Socket 是一个实验项目。目前,它托管在 Spring Cloud 官方组织之外。源代码可以在 https://github 的以下 GitHub 存储库中找到。 com/viniciusccarvalho/spring-cloud-sockets

在前面的示例中,我们创建了一个客户端(请注意,在此示例中,我们必须手动提供 RSocket 客户端的实例)。出于演示的目的,我们合并了两个 streams 并努力 .log()  结果。

其他框架中的 RSocket

如前所述,Spring Cloud Socket 模块是实验性的,原作者不再支持。尽管 Spring 团队继续进行内部实验并密切关注 RSocket,因为这是通过网络边界的 Reactive Streams 的强大解决方案,但还有一些其他框架也采用了 Java 协议的实现。

 

ScaleCube 项目

正如 framework 的原作者所说,ScaleCube 的定义如下:

“一个开源项目,专注于简化可扩展的微服务反应式系统的反应式编程 (http://scalecube.io)。"

该项目的中心目标是构建 高度可扩展的低延迟分布式系统。 

对于服务之间的交互,该工具包使用 Project Reactor 3,并且通常与传输无关。但是,在编写本书时,默认传输是 RSocket-Java。

除此之外,ScaleCube 还提供与 Spring Framework 的集成,并提供基于注释的 API,以构建可扩展、低延迟的分布式系统。我们不会在这里详细介绍框架的集成。但是,要了解更多信息,请查看以下链接:https://github.com/ scalecube/scalecube-spring.

变形虫计划

另一个强大的 工具包是Netifi Proteus 项目。与 ScaleCube 项目相比,Proteus 的定位如下:

“一个快速简单的基于 RSocket 的微服务 RPC 层 (https://github. com/netifi-proteus)"

总的来说,Proteus 提供了一个使用 RSocket 协议和 RSocket-RPC 框架的云原生微服务平台,并提供了用于消息路由、监控和跟踪的模块列表。

此外,Proteus 项目提供了与 Spring Framework 的集成,并提供了一个简单的、基于注释的编程模型以及强大的代码生成功能。 我们不会详细介绍框架的集成;但是,要了解更多信息,请访问此链接:https://github.com /netifi-proteus/proteus-spring

 

总结 RSocket

正如我们可能已经注意到贯穿这一节,RSocket 是构建高吞吐量和低延迟响应式的便捷方式基于异步对等通信的系统,采用反应式流规范。通常,RSocket 协议旨在减少感知延迟并提高系统效率。这可以通过双工连接支持非阻塞通信来实现。除此之外,RSocket 的设计目的是减少硬件占用空间。 反过来,RSocket 是一种可以用任何语言实现的协议。 最后,RSocket 在 Java 中的实现是建立在 Project Reactor 之上的,它提供了一个开箱即用的强大编程模型。

总的来说,RSocket 社区正在飞速发展,看起来很有希望。目前,该项目由 Facebook 和 Netifi 积极维护,在不久的将来,其他公司也将加入。

概括


在本章中,我们踏上了将单体应用程序演变为反应式系统的旅程。在这里,我们了解了使用普通的服务器端负载平衡技术来实现可扩展系统的利弊。但是,这些技术无法提供弹性,因为在这种情况下负载均衡器可能会成为瓶颈。反过来,这种技术可能会增加额外的运营成本以及负载均衡器的强大基础架构。

此外,在本章中,我们探讨了客户端负载平衡技术。但是,这种技术有其局限性,并且不提供与安装在系统中所有服务上的客户端负载平衡器的平衡协调。

最后,我们查看了响应式宣言如何建议我们使用消息队列来实现健壮的异步消息传递。因此,我们了解到,使用消息代理作为异步通信的单独反应系统可以让我们实现弹性,使其成为一个完全反应系统。

此外,我们还介绍了 Spring 生态系统如何在 Reactor 和 Spring Cloud Stream 项目的支持下帮助我们构建反应式系统。我们还学习了使用 Reactor 3 与 Apache Kafka 或 RabbitMQ 等消息代理进行背压支持通信的新编程范例。反过来,我们看到了一些将该技术应用于实际项目的示例。

 

 

在此之后,我们看到了 Spring Cloud Data Flow 如何帮助我们将特定业务逻辑与与消息代理通信或与特定云平台集成相关的实际特定配置分离。

最后,我们了解了一个额外的库,用于使用 RSocket 实现低延迟、高吞吐量的通信。因此,我们看到了一个可以轻松地将 RSocket 集成到 Spring 生态系统中的实验项目。

为了完善我们的反应性知识, 第 9 章< span class="emphasis">,Testing the Reactive Application, 探讨了测试使用 Spring 5 构建的响应式系统的基本技术。为了最终确定我们的知识,我们将看看如何发布、支持和监控反应式系统。