读书笔记《developing-java-applications-with-spring-and-spring-boot-ebook》使用Spring Boot的AMQP消息传递
我应该补充一点,我们从上到下都是@springboot / @SpringCloudOSS。
– DaShaun Carter @dashaun
在上一章中,我们向社交媒体应用程序添加了一些工具,以加快开发人员的时间并提供基本的运营支持功能。
但没有什么是静止的。在各种社交媒体平台中,用户之间存在某种形式的消息传递。为什么不为我们创造一个呢?
在本章中,我们将学习以下主题:
- Getting started with RabbitMQ, an AMQP broker
- Creating a message-based module for our social media app
- Adding customized metrics to track message flow
- Creating dynamically routed messages
- Taking a peek at Spring Cloud Stream and its RabbitMQ bindings
RabbitMQ 是一个开源的 AMQP 代理。 高级消息队列协议 (AMQP) 是一个开放协议,包括通过网络发送的消息格式。这已经上升受欢迎程度比较 到 JMS 等其他消息传递解决方案。为什么?
JMS 是一种 API,而 AMQP 是一种协议。 JMS 定义了如何与代理对话,但没有定义其消息的格式。它仅限于 Java 应用程序。 AMQP 没有谈论如何与代理交谈,而是谈论如何将消息放在网络上以及如何将它们拉下。
为了说明这一点,想象两个不同的应用程序。如果它们都是 Java,它们可以通过 JMS 进行通信。但是,如果其中之一是 Ruby,JMS 就会被淘汰。
为了进一步展示 JMS 和 AMQP 之间的区别,讲 JMS 的代理实际上可以在底层使用 AMQP 来传输消息.
Note
事实上,我已经为 Pivotal Software 开发的 RabbitMQ JMS Client 做出了贡献,该客户端位于 <跨度>https://github.com/rabbitmq/rabbitmq-jms-client。
在本章中,我们将本着最大选择的精神探索使用 RabbitMQ。
在 macOS 上,如果我们使用 Homebrew (http://brew.sh/),就这么简单:
在 Debian Linux 上,您可以使用以下命令:
在任何 Red Hat Linux 系统上,都可以运行以下命令:
在包括 Cloud Foundry 在内的各种云解决方案上,RabbitMQ 可以作为服务找到(包括 Pivotal 的 RabbitMQ 用于 PCF,位于 https://network.pivotal.io/products/p-rabbitmq),我们将在 第 19 章,使用 Spring Boot 将您的应用程序投入生产。
有关下载和安装的更多详细信息,请访问 https://www.rabbitmq。 com/download.html。
安装了 RabbitMQ 代理后,我们只需要启动它。有这两种方法可以做到这一点:
- Starting it in our current shell
- Having it start when the machine boots
要在我们当前的 shell 中启动,我们可以执行以下命令:
在带有 Homebrew 的 macOS 上,使用以下命令作为守护进程启动并在我们重新启动时重新启动:
如果您使用 Homebrew,则有一个功能可以管理各种 服务。键入 homebrew services
以查看可用的命令。例如,brew services list
将列出所有服务及其状态:
现在我们可以看到 RabbitMQ 已经加入了 MongoDB(我们在 Chapter 12 中安装了它,使用 Spring Boot 进行响应式数据访问)。
从本质上讲,这利用了 macOS X 的 launchctl
系统和 Homebrew 提供的守护进程控制文件。
对于 Windows,请查看 https://www.rabbitmq.com/安装-windows.html。它有指向 下载 代理的链接。安装后,它将使用各种默认值配置它并启动它。
要控制代理,请查看 sbin
文件夹中的 rabbitmqctl.bat
脚本(以管理员身份)。使用以下命令:
rabbitmqctl start
rabbitmqctl stop
rabbitmqctl status
Note
想以更直观的方式探索 RabbitMQ 代理?运行rabbitmq-plugins enable rabbitmq_managment
,访问http://localhost:15672
。 RabbitMQ 的默认用户名/密码是 guest/guest
。我建议先查看 Exchanges 和 Queues。
随着 RabbitMQ 代理的启动和运行,我们现在可以将重点转移到我们的应用程序工作上。
到目前为止,我们为社交媒体平台构建了什么?我们有能力上传和删除图片。然而,任何社交媒体平台的一个关键部分是允许用户相互交互。这通常通过评论社交媒体内容或直接相互聊天来完成。
让我们从添加评论图像的功能开始。但在我们开始之前,让我们停下来讨论架构。
多年来,人们一直使用分层方法来拆分应用程序。从根本上说,我们不希望在一个包中包含所有类的大型应用程序,因为要跟上所有内容太难了。
到目前为止,我们的所有内容都位于 com.greglturnquist.learningspringboot
中。从历史上看,该模式一直将事物拆分为域层、服务层和控制器层,如下面的屏幕截图所示:
在这种结构中,我们会将每个服务放入 services
子包中,并在需要时创建更多子子包。我们将所有域对象放在 domain
中,所有控制器将进入 controllers
。
这个想法是控制器调用服务和服务返回域对象。它防止了诸如服务调用控制器之类的纠缠,这在当时是有意义的。
但是随着微服务的兴起(我们将在 第 16 章中深入探讨,使用 Spring Boot 的微服务),当应用程序变得非常大时,这些基于层的方法会成为一个问题。当进行重构时,由于我们可能创建的不必要的耦合,在功能无关的同一软件包中找到的服务可能会变得棘手。
一种更苗条和精简的方法是使用垂直切片< /strong> 而不是水平的层:
使用前面屏幕截图中显示的结构,我们将 split 东西分成 images
和
comments
,更基于函数的性质。
我们会将与处理图像相关的所有内容放在 former 中,将所有与评论相关的内容放在后者中。如果需要,这些包中的任何一个都可以进一步拆分为子包,如下所示:
担心这会导致domain
/services
/controllers
爆炸我们的代码中的三重奏?不要恐慌!我们只根据需要这样做,并且鉴于每个 domain
子包的范围与旧层方法相比相对较小,因此功能应该是高度内聚的,即具有彼此有很多共同点。
由于我们即将创建一个单独的功能(注释),继续将我们的应用程序分解为 images
和 是有意义的评论
。所以让我们这样做吧!
首先,让我们创建 images
和 comments
子包。有了这些,最明显的变化就是移动 Image
、ImageRepository
和 ImageService
进入 image
子包。很容易。
这给我们留下了以下内容:
LearningSpringBootApplication
HomeController
LearningSpringBootHealthIndicator
LearningSpringBootApplication
体现了整个应用程序,所以它应该保持在顶层。这不仅仅是一个语义陈述。该类包含我们的 SpringBootApplication
注释,它支持应用程序的自动配置行为,如组件扫描。组件扫描应从顶层开始并搜索所有子包。
HomeController
代表了一个有趣的概念。尽管它调用了 ImageService
,因为它服务于应用程序的顶层视图,所以我们也将它留在顶层。
至于LearningSpringBootHealthIndicator
,可以做一个类似的案例来保持它的根。既然我们是为了让事情在顶部保持轻松,我们为什么不创建 一个单独的模块来encompass 所有基于 Ops 的功能,并不特定于任何一个模块,ops
。
首先,我们需要在我们的构建文件中添加一个新的依赖项,这是通过以下代码完成的:
这将使我们能够访问包含 RabbitMQ 支持的 Spring AMQP。
在我们的应用程序中添加消息传递技术可能会让我们大声疾呼,好吧,编写一些与 RabbitMQ 对话的代码。但这并不是一个很好的流程。相反,我们应该从两个角度之一开始——编写单元测试或编写一些 UI。
任何一种方法都旨在找出我们试图解决的用例。在解决手头的问题之前,我们需要弄清楚我们的确切问题是什么。在这种情况下,让我们从 UI 角度开始。
为此,我们可以利用上一章中的 Spring Boot DevTools 和 launch 我们的应用程序在 Debug 模式下使用 LiveReload 功能已启用。这样,当我们 make 更改时,我们可以立即看到它们:
通过前面的屏幕截图,我们可以看到我们的应用程序在启用 LiveReload 服务器的情况下启动并运行(并预加载了一些示例数据)。
现在我们可以对 Thymeleaf 模板进行 编辑,并创建输入字段供人们撰写评论:
我们前面的模板中每一行被渲染的部分可以解释如下:
- There is a new column containing an HTML unordered list to display each comment
- The unordered list consists of an HTML line item for each comment via Thymeleaf's
th:each
construct - There is also a new column containing an HTML form to post a new comment
- The form contains an HTML text input for the comment itself
- The form also contains a hidden HTML element specifying the ID of the image that the comment will be associated with
为了支持这一点,我们需要更新 HomeController
如下:
我们更新了类定义如下:
我们需要查看评论。为此,我们需要一个可以读取评论的 Spring Data repository。在我们的社交媒体应用程序的这个阶段,阅读评论是这个存储库需要做的所有事情。
让我们使用这个新的存储库并在 GET /
的 Spring WebFlux 处理程序中使用它,如下所示:
最后一段代码包含对模型的 images
属性的轻微调整:
- The code takes the
Flux
returned from ourImageService.findAll()
method and flatMaps each entry from an Image into a call to find related comments. repository.findByImageId(image.getId()).collectList()
actually fetches allComment
objects related to a givenImage
, but turns it intoMono<List<Comment>>
. This waits for all of the entries to arrive and bundles them into a single object.- The collection of comments and it's related image are bundled together via
Mono.zipWith(Mono)
, creating a tuple-2 or a pair. (This is the way to gather multiple bits of data and pass them on to the next step of any Reactor flow. Reactor has additional tuple types all the way up toTuple8
.) - After flatMapping
Flux<Image>
intoFlux<Tuple2<Image,List<Comment>>>
, we then map each entry into a classic JavaMap
to service our Thymeleaf template. - Reactor's
Tuple2
has a strongly typedgetT1()
andgetT2()
, withT1
being theImage
andT2
being the list of comments, which is suitable for our needs since it's just a temporary construct used to assemble details for the web template. - The image's
id
andname
attributes are copied into the target map fromT1
. - The
comments
attribute of our map is populated with the completeList<Comment>
extracted fromT2
.
随着我们继续使用 Reactor 类型,希望这些类型的流程变得熟悉。放置这样的流程时,拥有一个提供代码完成功能的 IDE 是一项关键资产。我们使用这些类型的转换越多,它们就越容易。
Note
如果您注意到,鉴于我们使用 MongoDB 的反应式驱动程序,ImageService
是完全反应式的。检索评论的操作是也是反应式的。将响应式调用链接在一起,使用 Reactor 的运算符并将它们连接到 Thymeleaf 的响应式解决方案,确保尽可能高效地获取所有内容,并且仅在必要时获取。编写反应式应用程序取决于拥有一个完全反应式的堆栈。
round我们的阅读评论,我们需要定义CommentReaderRepository
如下:
上述代码可以描述如下:
- It's a declarative interface, similar to how we created
ImageRepository
earlier in this book. - It extends Spring Data Commons'
Repository
interface, which contains no operations. We are left to define them all. This lets us create a read-only repository. - It has a
findByImageId(String imageId)
method that returns aFlux
ofComment
objects.
这个 repository 为我们提供了关于评论的只读读数。这很方便,因为它可以让我们获取评论并且不会意外地让人们通过它来写。相反,我们打算在本章中进一步实现一些不同的东西。
我们的 CommentReaderRepository
需要一件事:一个 Comment
域对象:
前面的域对象包含以下内容:
- The
@Data
annotation tells Lombok to generate getters, setters,toString()
,equals()
, andhashCode()
methods - The
id
field is marked with Spring Data Commons'@Id
annotation so we know it's the key for mapping objects - The
imageId
field is meant to hold anImage.id
field, linking comments to images - The
comment
field is the place to store an actual comment
我们已经看到了对模板的更改,添加了一个 HTML 表单来写评论。我们把对应的控制器在comments子包中编码,如下:
代码可以解释如下:
- It's the first class we have put in the new
comments
subpackage. - The
@Controller
annotation marks this as another Spring controller. - It contains a
RabbitTemplate
initialized by constructor injection. ThisRabbitTemplate
is created automatically by Spring Boot when it spotsspring-amqp
on the classpath. - The
@PostMapping("/comments")
annotation registers this method to respond to the form submissions that we added earlier in the template withth:action="@{'/comments'}"
. - Spring will automatically convert the body of the POST into a
Comment
domain object. Additionally, since we are using WebFlux, deserializing the request body is wrapped in aMono
, hence that process will only occur once the framework subscribes to the flow. - The incoming
Mono<Comment>
is unpacked usingflatMap
and then turned into arabbitTemplate.convertAndSend()
operation, which itself is wrapped inMono.fromRunnable
. - The comment is published to RabbitMQ's
learning-spring-boot
exchange with a routing key ofcomments.new
. - We wait for this to complete with
then()
, and when done, return a Spring WebFlux redirect to send the webpage back to the home page.
暂停。关于 RabbitMQ 交换和路由密钥的要点可能听起来有点复杂。
我们需要把它拆开来更好地理解 AMQP 的基础知识。
如果您已经使用过 JMS,那么您就会知道它有队列和主题。 AMQP 也有 queues 但语义不同。
基于 JMS 的生产者发送的每条消息仅由该队列的一个客户端使用。基于 AMQP 的生产者不直接发布到队列,而是发布到 exchanges。当 queues 被声明时,它们必须绑定到一个交换器。多个队列可以绑定到同一个交换,模拟主题的概念。
JMS 具有消息选择器,允许消费者选择他们从队列或主题接收到的消息。 AMQP 具有 路由键,它们的行为基于 type 的交易所,如下所列。
直接交换路由消息基于一个固定的路由键,通常是队列的名称。例如,我们刚刚看到的最后一个代码提到了 learning-spring-boot
作为交换的名称和 comments.new
作为路由键。任何使用 comments.new
路由键将自己的队列绑定到该交换的消费者都将收到之前发布的每条消息的副本。
topic exchange 允许路由键具有通配符,例如 comments.*
。这种情况最适合 客户端,其中实际路由键在用户提供条件之前是未知的。例如,想象一个股票交易应用程序,其中用户必须提供他或她有兴趣监控的股票代码列表。
fanout exchange 盲目广播每条消息到 绑定到它的每个队列,无论路由键如何。
关于 AMQP 的语义,让我们通过分块查看 CommentService
(也在 comments
子包中)进一步探索:
该代码可以描述如下:
- The
@Service
annotation marks it as a Spring service to be registered with the application context on startup CommentWriterRepository
is a Spring Data repository used to write new comments and is initialized by the constructor injection
这给我们带来了这项服务的实质,如下所示:
最后一个小功能很重要,所以让我们把它拆开:
- The
@RabbitListener
annotation is the easiest way to register methods to consume messages. - The
@QueueBinding
annotation is the easiest way to declare the queue and the exchange it's bound to on-the-fly. In this case, it creates an anonymous queue for this method and binds to thelearning-spring-boot
exchange. - The routing key for this method is
comments.new
, meaning any message posted to thelearning-spring-boot
exchange with that exact routing key will cause this method to be invoked. - It's possible for the
@RabbitListener
methods to receive a Spring AMQPMessage
, a Spring MessagingMessage
, various message headers, as well as a plain old Java object (which is what we have here). - The method itself invokes our
CommentWriterRepository
to actually save the comment in the data store.
要使用 RabbitMQ,我们通常需要 @EnableRabbit
,但是由于 Spring Boot,它会在 spring-boot-starter-amqp
时自动激活code> 在类路径上。再一次,Boot 知道我们想要什么并照做了。
需要理解的重要一点是,@RabbitListener
可以动态创建操作所需的所有交换和队列。但是,它仅在 AmqpAdmin
的实例位于应用程序上下文中时才有效。没有它,所有的交换和队列都必须声明为单独的 Spring bean。但是Spring Boot的RabbitMQ自动配置政策提供了一种,因此没有汗水!
这种方法有一个小问题会导致它无法运行——对象序列化。如果我们声明了方法签名以向我们提供 Spring AMQP Message
对象,我们将拉下一个字节数组。但是,开箱即用,Spring AMQP 在序列化自定义域对象方面的功能有限。毫不费力,它可以处理简单的字符串和可序列化的对象。
但是对于自定义域对象,还有一个更优选的方案——Spring AMQP 消息转换器,如下图:
在 save(Comment newComment)
方法下面列出的前面的 bean 可以描述如下:
@Bean
registers this as a bean definition.- It creates
Jackson2JsonMessageConverter
, an implementation of Spring AMQP'sMessageConverter
, used to serialize and deserialize Spring AMQPMessage
objects. In this case, is uses Jackson to convert POJOs to/from JSON strings.
Spring Boot 的 RabbitMQ 自动配置策略将查找 Spring AMQP 的 MessageConverter
实例的任何实现,并将它们注册到我们之前使用的 RabbitTemplate
以及它在我们的代码中发现 @RabbitListener
时创建的 SimpleMessageListenerContainer
。
为了从头开始我们的应用程序,我们在 CommentService
的底部有以下代码:
最后的代码可以描述如下:
- The
@Bean
annotation will register this chunk of code automatically - By implementing Spring Boot's
CommandLineRunner
interface, the Java 8 lambda expression will run itself when all beans have been created - It receives a copy of
MongoOperations
, the blocking MongoDB object we can use to drop the entire collection based onComment
为了在我们的数据存储中保留评论,我们有以下 Spring Data 存储库:
前面的存储库并不难剖析,可以按如下方式完成:
- It's an interface, which means that we don't have to write any code. We just declare the semantics and Spring Data does the rest.
- By extending Spring Data Commons'
Repository
interface, it will be picked up as a repository. Being an empty interface, it comes with no predefined operations. - It contains a
save()
operation to store a new comment (and return it after it gets saved). If the ID value is null, Spring Data MongoDB will automatically generate a unique string value for us. - Spring Data requires a
findOne()
operation in order to perform saves because that's what it uses to fetch what we just saved in order to return it. - All of these method signatures use Reactor
Mono
types.
这个存储库专注于将数据写入 MongoDB,仅此而已。即使它有一个 findOne()
,它也不是为读取数据而构建的。这已保留在 images
子包中。
为了完成我们的 comments
子包中的内容,让我们看一下核心域对象:
这个先前的域对象包含以下内容:
- The
@Data
annotation tells Lombok to generate getters, setters,toString()
,equals()
, andhashCode()
methods - The
id
field is marked with Spring Data Common's@Id
annotation so we know it's the key for mapping objects - The
imageId
field is meant to hold anImage.id
field linking comments to images - The
comment
field is the place to store an actual comment
Note
等一等!这不是 com.greglturnquist.learningspringboot.images.Comment
中的 完全相同的代码吗?现在是。但重要的是要认识到不同的 slices 将来可能需要不同的属性。通过保留一个特定于切片的域对象,我们可以更改一个而不会有更改另一个的风险。事实上,我们有可能(剧透警告!),在本书的后面部分,将整个评论系统移动到一个单独的微服务中。通过将事物保持在良好划分的切片中,可以降低紧密耦合的风险。
另一个因素是 RabbitMQ 不是反应式的。调用 rabbitTemplate.convertAndSend()
是阻塞的。鉴于 AMQP 是一种发布/订阅技术,这听起来可能很尴尬。但是将消息发布到 RabbitMQ 代理的整个过程 阻碍了我们的线程,并且根据定义,是阻塞的。
因此,我们的代码将其包装在 Java Runnable
中,并通过 Reactor 的 Mono 将其转换为
。这使得只有当我们在正确的时间准备好时才可以调用这个阻塞任务。重要的是要知道 Mono-wrapped-Runnable 不像传统的 Java Mono
.fromRunnableRunnable
并且不会在单独的线程中启动。取而代之的是,Runnable
接口提供了一个方便的包装器,Reactor 可以精确控制何时在其调度程序中调用 run()
方法。
如果我们在 IDE 中刷新我们的代码并让它重新启动,我们现在可以开始创建注释。查看以下屏幕截图:
前面的屏幕截图显示了添加到第一张图像的几条评论和正在编写的第三条评论。酷,嗯?
但也许,您想知道为什么我们花费所有精力将阅读和写作 评论分开?毕竟,Spring Data 似乎可以很容易地定义一个可以同时处理两者的存储库。这甚至可能意味着我们不需要 RabbitMQ,可以让 HomeController
和 CommentController
直接使用存储库。
使用消息传递的原因是提供一种可靠的方式将工作卸载到另一个系统。一个增长到数千甚至数百万用户的真实系统将看到巨大的流量。想想看。是否还有其他社交媒体平台可以让人们不断发表评论,但一次只能查看少量评论?
我们应用程序的这一方面在设计时考虑了可扩展性。如果我们有 100 万用户,他们每天可能会写数千万条消息。将我们的控制器直接连接到 MongoDB 可能会导致它崩溃。但是,如果我们将所有写入推送到单独的服务,我们就可以进行适当的调整。
读取次数要少得多。
添加了对其他人发布的图片comment 的功能后,开始gathering 就好了 指标。
为此,我们可以引入类似于第 14 章 Spring Boot 应用程序的开发人员工具 中所示的指标,如下所示:
与我们在本章前面编写的代码相比,最后一段代码有以下几处变化:
- A
MeterRegistry
is injected through the constructor and captured as a field. - It's used to increment a
comments.produced
metric with every comment. - Each metric is also "tagged" with the related imageId.
- We have to tune the
Mono
wrapping ourrabbitTemplate.convertAndSend()
, and ensure that the comment is passed viathen()
. Then it must be unpacked viaflatMap
in the part of the flow that writes metrics.
Note
与 meterRegistry
对话的代码是否应该包含在 Mono.fromRunnable()
?也许。编写时代码块,但在这个化身中,指标存储在内存中,因此成本低。然而,成本可能会上升,这意味着它应该得到妥善管理。如果服务变为外部服务,则支持使用单独的 Mono
包装的可能性会迅速增加。
同样,如果我们将 MeterRegistry
注入 CommentService
,我们也可以在那里使用它:
这与我们添加到 CommentController
的内容一致。前面的代码可以解释如下:
- Using the injected
MeterRegistry
, we increment acomments.consumed
metric with every comment. - It's also tagged with the comment's related imageId.
- The metrics are handled after the save is completed inside the
subscribe
method. This method grants us the ability to execute some code once the flow is complete.
Note
Spring AMQP 还不支持 Reactive Streams。这就是为什么 rabbitTemplate.convertAndSend()
必须包装在 Mono.fromRunnable
中的原因。像这种 subscribe()
方法这样的阻塞调用应该是危险信号,但是在这种情况下,在 Spring AMQP 能够添加支持之前,这是必要的邪恶。如果没有它,就没有其他方法可以指示此 Reactor 流执行。
重新启动我们的应用程序并手动输入大量评论的想法听起来并不令人兴奋。那么为什么不写一个模拟器来为我们做呢!
让我们把这个模拟器分开:
- The
@Profile
annotation indicates that this only operates ifspring.profiles.active=simulator
is present when the app starts - The
@Component
annotation will allow this class to get picked up by Spring Boot automatically and activated - The class itself is located in the root package,
com.greglturnquist.learningspring
, given that it pulls bits from both subpackages - The
@EventListener
annotation signals Spring to pipe application events issued to the app context. In this case, the method is interested inApplicationReadyEvent
s, fired when the application is up and operational Flux.interval(Duration.ofMillis(1000))
causes a stream of lazy ticks to get fired every 1000 ms, lazily- By flatMapping over this
Flux
, each tick is transformed into all images using theImageRepository
- Each image is used to generate a new, related comment
- Using the injected
CommentController
, it simulates the newly minted comment being sent in from the web
如果我们用 spring.profiles.active=simulator
重新配置我们的运行器,我们可以看到它运行。 IntelliJ IDEA 提供了轻松设置 Spring 配置文件的方法:
您可以在上一个屏幕截图的底部看到突出显示的条目。
如果我们在听到机器的风扇进入高速档后开始工作,我们可以在 http://localhost:8080/application/metrics/comments.consumed
和 http://localhost:8080/application/metrics/comments.produced
,并期待看到计数。
在最后一张截图中,我们可以清楚地看到 counter.comments.produced
和 counter.comments.consumed
,它们恰好是相同,这意味着没有人丢失。
通过消息传递将许多小型服务链接在一起是一种非常常见的 模式。随着微服务的兴起,它越来越受欢迎。使用 RabbitTemplate
或其他一些传输模板(KafkaTemplate
等)一遍又一遍地编写相同的模式是我们应该考虑的另一个复杂程度不要背负。
Spring Cloud Stream (http://cloud.spring.io/ spring-cloud-stream/) 来救援!
Spring Cloud Stream 采用 inputs、outputs 和transformers 来自 Spring Integration 并使得将它们链接在一起变得超级容易。
为了改变我们的社交媒体平台来做到这一点,我们可以从我们的构建文件中删除 spring-boot-starter-amqp
并添加它:
前面的依赖项带来了以下内容:
spring-cloud-stream-binder-rabbit-core
spring-cloud-stream-codec
spring-cloud-stream
spring-cloud-stream-reactive
spring-boot-starter-amqp
spring-integration-amqp
春云?那是什么?
Spring Cloud 是通过各种库提供的 Spring Boot 的扩展,旨在解决不同的云原生模式。在这种情况下,Spring Cloud Stream 旨在通过消息传递简化服务的链接。
要使用任何 Spring Cloud 库,我们需要将以下块添加到 build.gradle
文件的底部:
前面的代码片段是 Spring 的依赖管理 gradle 插件的一部分,引入 Spring Cloud BOM(材料清单)。在这种情况下,它有一个变量,springCloudVersion
,我们需要选择它。
Spring Cloud 有发布列车,这意味着每个库都有一个版本,但所有版本都是协调的。通过选择一列火车,我们得到了一系列可供选择的工具(我们将贯穿本书的其余部分!)。
与 Spring Boot 2.0 相关的 Spring Cloud 发布火车是 Finchley
,所以让我们把它放在我们的 Boot 版本旁边的顶部:
Note
如果您对 Spring Cloud 的各种发布系列感到好奇,请查看其项目页面 http://projects.spring.io/spring-cloud/。
随着 Spring Cloud 的 BOM 和 Spring Cloud Stream 添加到我们的构建中,让我们回到使用 Spring Cloud Stream 的核心接口配置消息传递,如如下:
最后一段代码与我们在本章前面创建的 CommentController
非常相似,但有以下区别:
@EnableBinding(Source.class)
flags this app as a source for new events. Spring Cloud Stream uses this annotation to signal the creation of channels, which, in RabbitMQ, translates to exchanges and queues.- The constructor proceeds to set up a
FluxSink
, the mechanism to emit new messages into a downstreamFlux
. This sink is configured to ignore downstream backpressure events. It starts publishing right away, autoconnecting to its upstream source upon subscription. - The objects being emitted are
Message<Comment>
, which is Spring's abstraction for a POJO wrapped as a transportable message. This includes the ability to add headers and other information. - Inside
addComments
, if the sink has been established, it mapsnewComment
into aMessage<Comment>
using Spring Messaging APIs. Finally, it transmits the message into the sink. - When the message is successfully emitted to
Flux
, a redirect is issued. - To transmit
Flux
ofMessage<Comment>
objects, a separate method,emit
, is wired up with an@StreamEmitter
annotation. This method is fed aFluxSender
, which provides us with a Reactor-friendly means to transmit messages into a channel. It lets us hook up theFlux
tied to ourFluxSink
. - The
@Output(Source.OUTPUT)
annotation marks up which channel it gets piped to (visitingSource.OUTPUT
reveals the channel name as output).
这个控制器中包含了很多东西。为了更好地理解它,有一些基本概念需要实现。
首先,创建一个 Flux
然后添加到它不是常见的做法。范式是将其包裹在其他东西上。为了明确这一点,Flux
本身就是一个抽象类。你不能实例化它。相反,您必须使用它的各种静态辅助方法来制作一个。因此,当我们想要获取与用户点击网站相关的行为并将其链接到应用程序启动时创建的 Flux
时,我们需要类似 FluxSink
将这两件事联系在一起。
Spring Cloud Stream 专注于将具有源/接收器语义的消息流链接在一起。对于 Reactor,这意味着将消息的 Flux
调整到通道上,这是 Spring Integration 多年来策划的一个概念。鉴于通道的具体性质已被抽象出来,我们使用何种传输技术并不重要。由于 Spring Boot 的强大功能,这是由类路径上的依赖项定义的。尽管如此,我们将继续使用 RabbitMQ,因为它既简单又强大。
顺便说一句,当我们访问 时,我们将再次看到将接收器连接到 Flux
的概念第 17 章,WebSockets with Spring Boot。将一次性对象连接到已建立的流时,这是一种常见的 Reactor 模式。
要声明一个 Spring Cloud Stream 消费者,我们只需要更新我们的 CommentService
如下:
在 CommentService
的顶部,我们需要添加 @EnableBinding(CustomProcessor.class)
。如果这是唯一的 Spring Cloud Stream 组件,我们可以使用 @EnableBinding(Processor.class)
,但是,我们不能与 <代码类="literal">CommentController。所以我们需要编写一组自定义通道,CustomProcessor
如下图:
这个自定义处理器与 Spring Cloud Stream 的 Processor
非常相似:
- It's a declarative interface.
- It has two channel names,
INPUT
andOUTPUT
. TheINPUT
channel uses the same asProcessor
. To avoid colliding with theOUTPUT
channel ofSource
, we create a different channel name,emptyOutput
. (Why call itemptyOutput
? We'll see in a moment!) - The is a
SubscribableChannel
for inputs and aMessageChannel
for outputs.
这将我们的应用程序标记为事件的 Sink
和 Source
。还记得我们在使用 RabbitTemplate
时必须提前 subscribe
吗?
值得庆幸的是,Spring Cloud Stream 对 Reactor 友好。在处理 Reactive Streams 时,我们的代码不应该是处理的终止点。因此,接收 Comment
对象的传入 Flux
必须导致传出 Flux
框架可以调用,我们很快就会看到。
在 CommentService
中,我们需要更新我们的 save
方法,如下所示:
让我们拆开之前更新的 save
版本:
- The
@RabbitListener
annotation has been replaced with@StreamListener
, indicating that it's transport-agnostic. - The argument
newComments
is tied to the input channel via the@Input()
annotation. - Since we've marked it as
Flux
, we can immediately consume it with our MongoDB repository. - Since we have to hand a stream back to the framework, we have marked up the whole method with
@Output
. - From there, we can flatMap it to generate metrics and then transform it into a
Flux
ofMono<Void>
s withMono.empty()
. This ensures that no more processing is done by the framework.
此方法与所有 Spring @*Listener
注解具有相同的概念——调用带有可选域对象的方法。但是这一次,它从我们配置 Spring Cloud Stream 使用的任何底层技术接收它们。好处是它小巧且易于管理,并且我们的代码不再直接绑定到 RabbitMQ。
话虽如此,我们需要向 Spring Cloud Stream 表达我们的源和接收器需要通过同一个 RabbitMQ 交换进行通信。为此,我们需要在 application.yml
中提供设置:
最后一个应用程序配置包含以下详细信息:
spring.cloud.stream.bindings
is configured for both theinput
and theoutput
channel's destination to belearning-spring-boot
. When using RabbitMQ bindings, this is the name of the exchange and Spring Cloud Stream uses topic exchanges by default.- We take advantage of Spring Cloud Streams' support for consumer groups by also setting the
group
property. This ensures that even if there are multiple stream listeners to a given channel, only one listener will consume any one message. This type of guarantee is required in cloud-native environments when we can expect to run multiple instances.
Note
如本书前面所述,您可以使用 application.properties
或 application.yml
。如果您发现自己配置了许多具有相同前缀的设置,请使用 YAML 使其更易于阅读并避免重复。
顺便说一句,还记得本章前面必须定义一个 Jackson2JsonMessageConverter
bean 来处理序列化吗?不再需要。 Spring Cloud Stream 使用 Esoteric Software 的 Kryo 库进行序列化/反序列化(https://github.com/EsotericSoftware/kryo)。这意味着,我们可以放弃那个 bean 定义。谈论精简代码!
如果我们再次运行模拟器 (spring.profiles.active=simulator
) 并检查 http://localhost:8080/application/指标
,我们可以看到我们的自定义指标列表所有内容。
有了这个,我们设法改变了 comments 解决方案,但仍保留了相同的指标集。
但是,通过切换到 Spring Cloud Stream,我们收集了全新的指标队列,如下图所示:
这是一个涵盖输入和输出通道的子集(太多无法填满一本书)。
还记得我们在上一章是如何编写自定义健康检查的吗?为 RabbitMQ 及其绑定提供一个会很方便。你猜怎么着?已经完成了。一探究竟:
- The RabbitMQ broker is up and operational
- Our RabbitMQ binders are operational as well
有了这个,我们就有了一个运行良好的 comment 系统。
总结一下,真正查看 Spring Cloud Stream 是如何处理事情会很好。为此,我们可以像这样调用 application.yml
中的日志级别:
最后一段代码调高了 Spring Cloud Stream 及其底层技术 Spring Integration 的日志级别。读者可以通过设置 org.springframework.amqp=DEBUG
来更改 RabbitTemplate
日志级别并看看会发生什么.
随着这些级别的增加,如果我们运行我们的应用程序,我们可以看到一些这样的:
上一个屏幕截图显示了与处理通道设置以及设置 AMQP 交换和队列的 Spring Integration 相比,绑定中涉及的 Spring Cloud Stream 之间的明显区别。
很高兴观察到日志记录前缀 o.s.c.s
是 org.springframework.cloud.stream
或 Spring Cloud Stream 的缩写。
此屏幕截图很好地显示了 Comment 被传输到 output 通道和然后在 input 频道上收到。
另请注意,日志记录前缀 o.s.i
表示 Spring Integration,而 s.i.m
是 Spring Integration 的消息 API。