vlambda博客
学习文章列表

读书笔记《developing-java-applications-with-spring-and-spring-boot-ebook》使用Spring Boot的AMQP消息传递

Chapter 15. AMQP Messaging with Spring Boot

我应该补充一点,我们从上到下都是@springboot / @SpringCloudOSS。

DaShaun Carter @dashaun

在上一章中,我们向社交媒体应用程序添加了一些工具,以加快开发人员的时间并提供基本的运营支持功能。

但没有什么是静止的。在各种社交媒体平台中,用户之间存在某种形式的消息传递。为什么不为我们创造一个呢?

在本章中,我们将学习以下主题:

  • Getting started with RabbitMQ, an AMQP broker
  • Creating a message-based module for our social media app
  • Adding customized metrics to track message flow
  • Creating dynamically routed messages
  • Taking a peek at Spring Cloud Stream and its RabbitMQ bindings

Getting started with RabbitMQ


RabbitMQ 是一个开源的 AMQP 代理。 高级消息队列协议 (AMQP) 是一个开放协议,包括通过网络发送的消息格式。这已经上升受欢迎程度比较 到 JMS 等其他消息传递解决方案。为什么?

JMS 是一种 API,而 AMQP 是一种协议。 JMS 定义了如何与代理对话,但没有定义其消息的格式。它仅限于 Java 应用程序。 AMQP 没有谈论如何与代理交谈,而是谈论如何将消息放在网络上以及如何将它们拉下。

为了说明这一点,想象两个不同的应用程序。如果它们都是 Java,它们可以通过 JMS 进行通信。但是,如果其中之一是 Ruby,JMS 就会被淘汰。

为了进一步展示 JMS 和 AMQP 之间的区别,讲 JMS 的代理实际上可以在底层使用 AMQP 来传输消息.

Note

事实上,我已经为 Pivotal Software 开发的 RabbitMQ JMS Client 做出了贡献,该客户端位于 <跨度>https://github.com/rabbitmq/rabbitmq-jms-client

在本章中,我们将本着最大选择的精神探索使用 RabbitMQ。

Installing RabbitMQ broker

为此,我们需要安装 RabbitMQ 代理。

在 macOS 上,如果我们使用 Homebrew (http://brew.sh/),就这么简单:

$ brew install rabbitmq==> Installing dependencies for rabbitmq: openssl, libpng, libtiff,
    wx...==> Pouring openssl-1.0.2j.el_capitan.bottle.tar.gz/usr/local/Cellar/openssl/1.0.2j: 1,695 files, 12M==> Pouring libpng-1.6.25.el_capitan.bottle.tar.gz/usr/local/Cellar/libpng/1.6.25: 25 files, 1.2M==> Pouring libtiff-4.0.6_2.el_capitan.bottle.tar.gz/usr/local/Cellar/libtiff/4.0.6_2: 261 files, 3.4M==> Pouring wxmac-3.0.2_3.el_capitan.bottle.tar.gz/usr/local/Cellar/wxmac/3.0.2_3: 809 files, 23.6M==> Pouring erlang-19.1.el_capitan.bottle.tar.gz/usr/local/Cellar/erlang/19.1: 7,297 files, 279.8M==> Installing rabbitmq/usr/local/Cellar/rabbitmq/3.6.4: 187 files, 5.8M, built in 6 
    seco...

在 Debian Linux 上,您可以使用以下命令:

$ sudo apt-get install rabbitmq-server

在任何 Red Hat Linux 系统上,都可以运行以下命令:

$ yum install erlang$ yum install rabbitmq-server-<version>.rpm

在包括 Cloud Foundry 在内的各种云解决方案上,RabbitMQ 可以作为服务找到(包括 Pivotal 的 RabbitMQ 用于 PCF,位于 https://network.pivotal.io/products/p-rabbitmq),我们将在 第 19 章使用 Spring Boot 将您的应用程序投入生产

有关下载和安装的更多详细信息,请访问 https://www.rabbitmq。 com/download.html

Launching the RabbitMQ broker

安装了 RabbitMQ 代理后,我们只需要启动它。有这两种方法可以做到这一点:

  • Starting it in our current shell
  • Having it start when the machine boots

要在我们当前的 shell 中启动,我们可以执行以下命令:

$ rabbitmq-serverRabbitMQ 3.6.4. Copyright (C) 2007-2016 Pivotal Software...##  ##      Licensed under the MPL.  See http://www.rabbitmq.com/##  ############  Logs: /usr/local/var/log/rabbitmq/[email protected]######  ##        /usr/local/var/log/rabbitmq/rabbit@localhost-sasl....##########Starting broker...completed with 10 plugins.

在带有 Homebrew 的 macOS 上,使用以下命令作为守护进程启动并在我们重新启动时重新启动:

$ brew services start rabbitmq==> Tapping homebrew/servicesCloning into '/usr/local/Homebrew/Library/Taps/homebrew/homebrew-services'...remote: Counting objects: 10, done.remote: Compressing objects: 100% (7/7), done.remote: Total 10 (delta 0), reused 6 (delta 0), pack-reused 0Unpacking objects: 100% (10/10), done.Checking connectivity... done.Tapped 0 formulae (36 files, 46K)==> Successfully started `rabbitmq` (label: homebrew.mxcl.rabbitmq)

如果您使用 Homebrew,则有一个功能可以管理各种 服务。键入 homebrew services 以查看可用的命令。例如,brew services list 将列出所有服务及其状态:

$ brew services list  Name     Status  User       Plist  activemq stopped  mongodb  started gturnquist 
  /Users/gturnquist/Library/LaunchAgents/hom...  mysql    stopped  neo4j    stopped  rabbitmq started gturnquist   
  /Users/gturnquist/Library/LaunchAgents/hom...  redis    stopped  tor      stopped

现在我们可以看到 RabbitMQ 已经加入了 MongoDB(我们在 Chapter 12 中安装了它,使用 Spring Boot 进行响应式数据访问)。

从本质上讲,这利用了 macOS X 的 launchctl 系统和 Homebrew 提供的守护进程控制文件。

对于 Windows,请查看 https://www.rabbitmq.com/安装-windows.html。它有指向 下载 代理的链接。安装后,它将使用各种默认值配置它并启动它。

要控制代理,请查看 sbin 文件夹中的 rabbitmqctl.bat 脚本(以管理员身份)。使用以下命令:

  • rabbitmqctl start
  • rabbitmqctl stop
  • rabbitmqctl status

Note

想以更直观的方式探索 RabbitMQ 代理?运行rabbitmq-plugins enable rabbitmq_managment,访问http://localhost:15672。 RabbitMQ 的默认用户名/密码是 guest/guest。我建议先查看 ExchangesQueues

随着 RabbitMQ 代理的启动和运行,我们现在可以将重点转移到我们的应用程序工作上。

Adding messaging as a new component to an existing application


到目前为止,我们为社交媒体平台构建了什么?我们有能力上传和删除图片。然而,任何社交媒体平台的一个关键部分是允许用户相互交互。这通常通过评论社交媒体内容或直接相互聊天来完成。

让我们从添加评论图像的功能开始。但在我们开始之前,让我们停下来讨论架构。

多年来,人们一直使用分层方法来拆分应用程序。从根本上说,我们不希望在一个包中包含所有类的大型应用程序,因为要跟上所有内容太难了。

到目前为止,我们的所有内容都位于 com.greglturnquist.learningspringboot 中。从历史上看,该模式一直将事物拆分为域层、服务层和控制器层,如下面的屏幕截图所示:

读书笔记《developing-java-applications-with-spring-and-spring-boot-ebook》使用Spring Boot的AMQP消息传递

在这种结构中,我们会将每个服务放入 services 子包中,并在需要时创建更多子子包。我们将所有域对象放在 domain 中,所有控制器将进入 controllers

这个想法是控制器调用服务和服务返回域对象。它防止了诸如服务调用控制器之类的纠缠,这在当时是有意义的。

但是随着微服务的兴起(我们将在 第 16 章中深入探讨,使用 Spring Boot 的微服务),当应用程序变得非常大时,这些基于层的方法会成为一个问题。当进行重构时,由于我们可能创建的不必要的耦合,在功能无关的同一软件包中找到的服务可能会变得棘手。

一种更苗条和精简的方法是使用垂直切片< /strong> 而不是水平的

读书笔记《developing-java-applications-with-spring-and-spring-boot-ebook》使用Spring Boot的AMQP消息传递

使用前面屏幕截图中显示的结构,我们将 split 东西分成 images comments,更基于函数的性质。

我们会将与处理图像相关的所有内容放在 former 中,将所有与评论相关的内容放在后者中。如果需要,这些包中的任何一个都可以进一步拆分为子包,如下所示:

读书笔记《developing-java-applications-with-spring-and-spring-boot-ebook》使用Spring Boot的AMQP消息传递

担心这会导致domain/services/controllers爆炸我们的代码中的三重奏?不要恐慌!我们只根据需要这样做,并且鉴于每个 domain 子包的范围与旧层方法相比相对较小,因此功能应该是高度内聚的,即具有彼此有很多共同点。

由于我们即将创建一个单独的功能(注释),继续将我们的应用程序分解为 images 是有意义的评论。所以让我们这样做吧!

首先,让我们创建 imagescomments 子包。有了这些,最明显的变化就是移动 ImageImageRepositoryImageService 进入 image 子包。很容易。

这给我们留下了以下内容:

  • LearningSpringBootApplication
  • HomeController
  • LearningSpringBootHealthIndicator

LearningSpringBootApplication 体现了整个应用程序,所以它应该保持在顶层。这不仅仅是一个语义陈述。该类包含我们的 SpringBootApplication 注释,它支持应用程序的自动配置行为,如组件扫描。组件扫描应从顶层开始并搜索所有子包。

HomeController 代表了一个有趣的概念。尽管它调用了 ImageService,因为它服务于应用程序的顶层视图,所以我们也将它留在顶层。

至于LearningSpringBootHealthIndicator,可以做一个类似的案例来保持它的根。既然我们是为了让事情在顶部保持轻松,我们为什么不创建 一个单独的模块来encompass 所有基于 Ops 的功能,并不特定于任何一个模块,ops

鉴于所有这些决定,我们的新 结构 现在看起来像这样:

读书笔记《developing-java-applications-with-spring-and-spring-boot-ebook》使用Spring Boot的AMQP消息传递

Note

花这么多时间讨论包结构值得吗?在任何敏捷环境中,只要不花费两周的努力,就可以尝试一些东西。停止花 10 分钟思考可维护的结构是一项可以接受的投资,尤其是如果我们愿意稍后在需要时对其进行更改。

Creating a message producer/message consumer


将我们的应用程序重构为 make 评论空间,让我们开始吧!

首先,我们需要在我们的构建文件中添加一个新的依赖项,这是通过以下代码完成的:

    compile('org.springframework.boot:spring-boot-starter-amqp')

这将使我们能够访问包含 RabbitMQ 支持的 Spring AMQP。

在我们的应用程序中添加消息传递技术可能会让我们大声疾呼,好吧,编写一些与 RabbitMQ 对话的代码。但这并不是一个很好的流程。相反,我们应该从两个角度之一开始——编写单元测试或编写一些 UI。

任何一种方法都旨在找出我们试图解决的用例。在解决手头的问题之前,我们需要弄清楚我们的确切问题是什么。在这种情况下,让我们从 UI 角度开始。

为此,我们可以利用上一章中的 Spring Boot DevToolslaunch 我们的应用程序在 Debug 模式下使用 LiveReload 功能已启用。这样,当我们 make 更改时,我们可以立即看到它们:

读书笔记《developing-java-applications-with-spring-and-spring-boot-ebook》使用Spring Boot的AMQP消息传递

通过前面的屏幕截图,我们可以看到我们的应用程序在启用 LiveReload 服务器的情况下启动并运行(并预加载了一些示例数据)。

Displaying comments

现在我们可以对 Thymeleaf 模板进行 编辑,并创建输入字段供人们撰写评论:

    <td> 
        <ul> 
            <li th:each="comment : ${image.comments}" th:text="${comment.comment}"></li> 
        </ul> 
    </td> 
    <td> 
        <form th:method="post" th:action="@{'/comments'}"> 
            <input name="comment" value="" type="text" /> 
            <input name="imageId" th:value="${image.id}" type="hidden" /> 
            <input type="submit" /> 
        </form> 
    </td> 

我们前面的模板中每一行被渲染的部分可以解释如下:

  • There is a new column containing an HTML unordered list to display each comment
  • The unordered list consists of an HTML line item for each comment via Thymeleaf's th:each construct
  • There is also a new column containing an HTML form to post a new comment
  • The form contains an HTML text input for the comment itself
  • The form also contains a hidden HTML element specifying the ID of the image that the comment will be associated with

为了支持这一点,我们需要更新 HomeController 如下:

    private final ImageService imageService; 
    private final CommentReaderRepository repository; 
 
    public HomeController(ImageService imageService, 
     CommentReaderRepository repository) { 
       this.imageService = imageService; 
       this.repository = repository; 
    } 

我们更新了类定义如下:

  • A new repository field is created for CommentReaderRepository (which we'll define further ahead in the chapter)
  • This field is initialized by constructor injection

我们需要查看评论。为此,我们需要一个可以读取评论的 Spring Data repository。在我们的社交媒体应用程序的这个阶段,阅读评论是这个存储库需要做的所有事情。

让我们使用这个新的存储库并在 GET / 的 Spring WebFlux 处理程序中使用它,如下所示:

    @GetMapping("/") 
    public Mono<String> index(Model model) { 
      model.addAttribute("images", 
       imageService 
       .findAllImages() 
       .flatMap(image -> 
         Mono.just(image) 
           .zipWith(repository.findByImageId( 
             image.getId()).collectList())) 
       .map(imageAndComments -> new HashMap<String, Object>(){{ 
            put("id", imageAndComments.getT1().getId()); 
            put("name", imageAndComments.getT1().getName()); 
            put("comments", 
              imageAndComments.getT2()); 
        }})
      ); 
      model.addAttribute("extra", 
       "DevTools can also detect code changes too"); 
       return Mono.just("index"); 
    } 

最后一段代码包含对模型的 images 属性的轻微调整:

  • The code takes the Flux returned from our ImageService.findAll() method and flatMaps each entry from an Image into a call to find related comments.
  • repository.findByImageId(image.getId()).collectList() actually fetches all Comment objects related to a given Image, but turns it into Mono<List<Comment>>. This waits for all of the entries to arrive and bundles them into a single object.
  • The collection of comments and it's related image are bundled together via Mono.zipWith(Mono), creating a tuple-2 or a pair. (This is the way to gather multiple bits of data and pass them on to the next step of any Reactor flow. Reactor has additional tuple types all the way up to Tuple8.)
  • After flatMapping Flux<Image> into Flux<Tuple2<Image,List<Comment>>>, we then map each entry into a classic Java Map to service our Thymeleaf template.
  • Reactor's Tuple2 has a strongly typed getT1() and getT2(), with T1 being the Image and T2 being the list of comments, which is suitable for our needs since it's just a temporary construct used to assemble details for the web template.
  • The image's id and name attributes are copied into the target map from T1.
  • The comments attribute of our map is populated with the complete List<Comment> extracted from T2.

Note

由于 Thymeleaf 模板在键值语义上运行,因此无需定义新的域对象来捕获此构造。 Java Map 可以正常工作。

随着我们继续使用 Reactor 类型,希望这些类型的流程变得熟悉。放置这样的流程时,拥有一个提供代码完成功能的 IDE 是一项关键资产。我们使用这些类型的转换越多,它们就越容易。

Note

如果您注意到,鉴于我们使用 MongoDB 的反应式驱动程序,ImageService 是完全反应式的。检索评论的操作是也是反应式的。将响应式调用链接在一起,使用 Reactor 的运算符并将它们连接到 Thymeleaf 的响应式解决方案,确保尽可能高效地获取所有内容,并且仅在必要时获取。编写反应式应用程序取决于拥有一个完全反应式的堆栈。

round我们的阅读评论,我们需要定义CommentReaderRepository如下:

    public interface CommentReaderRepository 
     extends Repository<Comment, String> { 
 
       Flux<Comment> findByImageId(String imageId); 
    } 

上述代码可以描述如下:

  • It's a declarative interface, similar to how we created ImageRepository earlier in this book.
  • It extends Spring Data Commons' Repository interface, which contains no operations. We are left to define them all. This lets us create a read-only repository.
  • It has a findByImageId(String imageId) method that returns a Flux of Comment objects.

这个 repository 为我们提供了关于评论的只读读数。这很方便,因为它可以让我们获取评论并且不会意外地让人们通过它来写。相反,我们打算在本章中进一步实现一些不同的东西。

我们的 CommentReaderRepository 需要一件事:一个 Comment 域对象:

    package com.greglturnquist.learningspringboot.images; 
 
    import lombok.Data; 
 
    import org.springframework.data.annotation.Id; 
 
    @Data 
    public class Comment { 
 
      @Id private String id; 
      private String imageId; 
      private String comment; 
 
    } 

前面的域对象包含以下内容:

  • The @Data annotation tells Lombok to generate getters, setters, toString(), equals(), and hashCode() methods
  • The id field is marked with Spring Data Commons' @Id annotation so we know it's the key for mapping objects
  • The imageId field is meant to hold an Image.id field, linking comments to images
  • The comment field is the place to store an actual comment

Note

对于 CommentReaderRepositoryComment,会显示整个类,包括包。这表明它位于我们在本章前面定义的 images 子包中。此域对象提供与图像相关的评论信息。并且此信息是只读的,这意味着这不是更新评论的地方。

Producing comments

编写了显示评论的代码后,现在是时候制作 位来创建评论了。

我们已经看到了对模板的更改,添加了一个 HTML 表单来写评论。我们把对应的控制器在comments子包中编码,如下:

    @Controller 
    public class CommentController { 
 
      private final RabbitTemplate rabbitTemplate; 
 
      public CommentController(RabbitTemplate rabbitTemplate) { 
        this.rabbitTemplate = rabbitTemplate; 
      } 
 
      @PostMapping("/comments") 
      public Mono<String> addComment(Mono<Comment> newComment) { 
        return newComment.flatMap(comment -> 
         Mono.fromRunnable(() -> rabbitTemplate 
           .convertAndSend( 
             "learning-spring-boot", 
             "comments.new", 
             comment))) 
           .log("commentService-publish") 
           .then(Mono.just("redirect:/")); 
      } 
 
    } 

代码可以解释如下:

  • It's the first class we have put in the new comments subpackage.
  • The @Controller annotation marks this as another Spring controller.
  • It contains a RabbitTemplate initialized by constructor injection. This RabbitTemplate is created automatically by Spring Boot when it spots spring-amqp on the classpath.
  • The @PostMapping("/comments") annotation registers this method to respond to the form submissions that we added earlier in the template with th:action="@{'/comments'}".
  • Spring will automatically convert the body of the POST into a Comment domain object. Additionally, since we are using WebFlux, deserializing the request body is wrapped in a Mono, hence that process will only occur once the framework subscribes to the flow.
  • The incoming Mono<Comment> is unpacked using flatMap and then turned into a rabbitTemplate.convertAndSend() operation, which itself is wrapped in Mono.fromRunnable.
  • The comment is published to RabbitMQ's learning-spring-boot exchange with a routing key of comments.new.
  • We wait for this to complete with then(), and when done, return a Spring WebFlux redirect to send the webpage back to the home page.

暂停。关于 RabbitMQ 交换和路由密钥的要点可能听起来有点复杂。

Note

评论发布到 RabbitMQ 的 learning-spring-boot 交换,路由键为 comments.new

我们需要把它拆开来更好地理解 AMQP 的基础知识。

AMQP fundamentals

如果您已经使用过 JMS,那么您就会知道它有队列和主题。 AMQP 也有 queues 但语义不同。

基于 JMS 的生产者发送的每条消息仅由该队列的一个客户端使用。基于 AMQP 的生产者不直接发布到队列,而是发布到 exchanges。当 queues 被声明时,它们必须绑定到一个交换器。多个队列可以绑定到同一个交换,模拟主题的概念。

JMS 具有消息选择器,允许消费者选择他们从队列或主题接收到的消息。 AMQP 具有 路由键,它们的行为基于 type 的交易所,如下所列。

直接交换路由消息基于一个固定的路由键,通常是队列的名称。例如,我们刚刚看到的最后一个代码提到了 learning-spring-boot 作为交换的名称和 comments.new作为路由键。任何使用 comments.new 路由键将自己的队列绑定到该交换的消费者都将收到之前发布的每条消息的副本。

topic exchange 允许路由键具有通配符,例如 comments.*。这种情况最适合 客户端,其中实际路由键在用户提供条件之前是未知的。例如,想象一个股票交易应用程序,其中用户必须提供他或她有兴趣监控的股票代码列表。

fanout exchange 盲目广播每条消息 绑定到它的每个队列,无论路由键如何。

关于 AMQP 的语义,让我们通过分块查看 CommentService(也在 comments 子包中)进一步探索:

    @Service 
    public class CommentService { 
 
      private CommentWriterRepository repository; 
 
      public CommentService(CommentWriterRepository repository) { 
        this.repository = repository; 
      }
      ... more to come below...
    } 

该代码可以描述如下:

  • The @Service annotation marks it as a Spring service to be registered with the application context on startup
  • CommentWriterRepository is a Spring Data repository used to write new comments and is initialized by the constructor injection

这给我们带来了这项服务的实质,如下所示:

    @RabbitListener(bindings = @QueueBinding( 
      value = @Queue, 
      exchange = @Exchange(value = "learning-spring-boot"), 
      key = "comments.new" 
    )) 
    public void save(Comment newComment) { 
      repository 
       .save(newComment) 
       .log("commentService-save") 
       .subscribe(); 
    } 

最后一个小功能很重要,所以让我们把它拆开:

  • The @RabbitListener annotation is the easiest way to register methods to consume messages.
  • The @QueueBinding annotation is the easiest way to declare the queue and the exchange it's bound to on-the-fly. In this case, it creates an anonymous queue for this method and binds to the learning-spring-boot exchange.
  • The routing key for this method is comments.new, meaning any message posted to the learning-spring-boot exchange with that exact routing key will cause this method to be invoked.
  • It's possible for the @RabbitListener methods to receive a Spring AMQP Message, a Spring Messaging Message, various message headers, as well as a plain old Java object (which is what we have here).
  • The method itself invokes our CommentWriterRepository to actually save the comment in the data store.

要使用 RabbitMQ,我们通常需要 @EnableRabbit,但是由于 Spring Boot,它会在 spring-boot-starter-amqp 时自动激活code> 在类路径上。再一次,Boot 知道我们想要什么并照做了。

需要理解的重要一点是,@RabbitListener 可以动态创建操作所需的所有交换和队列。但是,它仅在 AmqpAdmin 的实例位于应用程序上下文中时才有效。没有它,所有的交换和队列都必须声明为单独的 Spring bean。但是Spring Boot的RabbitMQ自动配置政策提供了一种,因此没有汗水!

这种方法有一个小问题会导致它无法运行——对象序列化。如果我们声明了方法签名以向我们提供 Spring AMQP Message 对象,我们将拉下一个字节数组。但是,开箱即用,Spring AMQP 在序列化自定义域对象方面的功能有限。毫不费力,它可以处理简单的字符串和可序列化的对象。

但是对于自定义域对象,还有一个更优选的方案——Spring AMQP 消息转换器,如下图:

    @Bean 
    Jackson2JsonMessageConverter jackson2JsonMessageConverter() { 
      return new Jackson2JsonMessageConverter(); 
    } 

save(Comment newComment) 方法下面列出的前面的 bean 可以描述如下:

  • @Bean registers this as a bean definition.
  • It creates Jackson2JsonMessageConverter, an implementation of Spring AMQP's MessageConverter, used to serialize and deserialize Spring AMQP Message objects. In this case, is uses Jackson to convert POJOs to/from JSON strings.

Spring Boot 的 RabbitMQ 自动配置策略将查找 Spring AMQP 的 MessageConverter 实例的任何实现,并将它们注册到我们之前使用的 RabbitTemplate以及它在我们的代码中发现 @RabbitListener 时创建的 SimpleMessageListenerContainer

为了从头开始我们的应用程序,我们在 CommentService 的底部有以下代码:

    @Bean 
    CommandLineRunner setUp(MongoOperations operations) { 
      return args -> { 
        operations.dropCollection(Comment.class); 
      }; 
    } 

最后的代码可以描述如下:

  • The @Bean annotation will register this chunk of code automatically
  • By implementing Spring Boot's CommandLineRunner interface, the Java 8 lambda expression will run itself when all beans have been created
  • It receives a copy of MongoOperations, the blocking MongoDB object we can use to drop the entire collection based on Comment

Note

此代码便于开发,但应在生产中删除或包装在 @Profile("dev") 注释中,使其仅在 spring.profiles.active=dev 存在。

为了在我们的数据存储中保留评论,我们有以下 Spring Data 存储库:

    public interface CommentWriterRepository 
     extends Repository<Comment, String> { 
 
       Mono<Comment> save(Comment newComment); 
 
       // Needed to support save() 
       Mono<Comment> findById(String id); 
    } 

前面的存储库并不难剖析,可以按如下方式完成:

  • It's an interface, which means that we don't have to write any code. We just declare the semantics and Spring Data does the rest.
  • By extending Spring Data Commons' Repository interface, it will be picked up as a repository. Being an empty interface, it comes with no predefined operations.
  • It contains a save() operation to store a new comment (and return it after it gets saved). If the ID value is null, Spring Data MongoDB will automatically generate a unique string value for us.
  • Spring Data requires a findOne() operation in order to perform saves because that's what it uses to fetch what we just saved in order to return it.
  • All of these method signatures use Reactor Mono types.

这个存储库专注于将数据写入 MongoDB,仅此而已。即使它有一个 findOne(),它也不是为读取数据而构建的。这已保留在 images 子包中。

为了完成我们的 comments 子包中的内容,让我们看一下核心域对象:

    package com.greglturnquist.learningspringboot.comments; 
 
    import lombok.Data; 
 
    import org.springframework.data.annotation.Id; 
    import org.springframework.data.mongodb.core.mapping.Document; 
 
    @Data 
    @Document 
    public class Comment { 
 
      @Id private String id; 
      private String imageId; 
      private String comment; 
    } 

这个先前的域对象包含以下内容:

  • The @Data annotation tells Lombok to generate getters, setters, toString(), equals(), and hashCode() methods
  • The id field is marked with Spring Data Common's @Id annotation so we know it's the key for mapping objects
  • The imageId field is meant to hold an Image.id field linking comments to images
  • The comment field is the place to store an actual comment

Note

等一等!这不是 com.greglturnquist.learningspringboot.images.Comment 中的 完全相同的代码吗?现在是。但重要的是要认识到不同的 slices 将来可能需要不同的属性。通过保留一个特定于切片的域对象,我们可以更改一个而不会有更改另一个的风险。事实上,我们有可能(剧透警告!),在本书的后面部分,将整个评论系统移动到一个单独的微服务中。通过将事物保持在良好划分的切片中,可以降低紧密耦合的风险。

另一个因素是 RabbitMQ 不是反应式的。调用 rabbitTemplate.convertAndSend() 是阻塞的。鉴于 AMQP 是一种发布/订阅技术,这听起来可能很尴尬。但是将消息发布到 RabbitMQ 代理的整个过程 阻碍了我们的线程,并且根据定义,是阻塞的。

因此,我们的代码将其包装在 Java Runnable 中,并通过 Reactor 的 Mono 将其转换为 Mono .fromRunnable。这使得只有当我们在正确的时间准备好时才可以调用这个阻塞任务。重要的是要知道 Mono-wrapped-Runnable 不像传统的 Java Runnable 并且不会在单独的线程中启动。取而代之的是,Runnable 接口提供了一个方便的包装器,Reactor 可以精确控制何时在其调度程序中调用 run() 方法。

如果我们在 IDE 中刷新我们的代码并让它重新启动,我们现在可以开始创建注释。查看以下屏幕截图:

读书笔记《developing-java-applications-with-spring-and-spring-boot-ebook》使用Spring Boot的AMQP消息传递

前面的屏幕截图显示了添加到第一张图像的几条评论和正在编写的第三条评论。酷,嗯?

但也许,您想知道为什么我们花费所有精力将阅读和写作 评论分开?毕竟,Spring Data 似乎可以很容易地定义一个可以同时处理两者的存储库。这甚至可能意味着我们不需要 RabbitMQ,可以让 HomeControllerCommentController 直接使用存储库。

使用消息传递的原因是提供一种可靠的方式将工作卸载到另一个系统。一个增长到数千甚至数百万用户的真实系统将看到巨大的流量。想想看。是否还有其他社交媒体平台可以让人们不断发表评论,但一次只能查看少量评论?

我们应用程序的这一方面在设计时考虑了可扩展性。如果我们有 100 万用户,他们每天可能会写数千万条消息。将我们的控制器直接连接到 MongoDB 可能会导致它崩溃。但是,如果我们将所有写入推送到单独的服务,我们就可以进行适当的调整。

读取次数要少得多。

 

Adding customized metrics to track message flow


添加了对其他人发布的图片comment 的功能后,开始gathering 就好了 指标。

为此,我们可以引入类似于第 14 章 Spring Boot 应用程序的开发人员工具 中所示的指标,如下所示:

    @Controller 
    public class CommentController { 
 
      private final RabbitTemplate rabbitTemplate; 
 
      private final MeterRegistry meterRegistry; 
 
      public CommentController(RabbitTemplate rabbitTemplate, 
       MeterRegistry meterRegistry) { 
         this.rabbitTemplate = rabbitTemplate; 
         this.meterRegistry = meterRegistry; 
      } 
 
      @PostMapping("/comments") 
      public Mono<String> addComment(Mono<Comment> newComment) { 
        return newComment.flatMap(comment -> 
         Mono.fromRunnable(() -> 
          rabbitTemplate 
           .convertAndSend( 
             "learning-spring-boot", 
             "comments.new", 
           comment)) 
            .then(Mono.just(comment))) 
            .log("commentService-publish") 
            .flatMap(comment -> { 
              meterRegistry
                .counter("comments.produced", "imageId", comment.getImageId())
                .increment(); 
              return Mono.just("redirect:/"); 
            }); 
      } 
    } 

 

 

 

 

与我们在本章前面编写的代码相比,最后一段代码有以下几处变化:

  • A MeterRegistry is injected through the constructor and captured as a field.
  • It's used to increment a comments.produced metric with every comment.
  • Each metric is also "tagged" with the related imageId.
  • We have to tune the Mono wrapping our rabbitTemplate.convertAndSend(), and ensure that the comment is passed via then(). Then it must be unpacked via flatMap in the part of the flow that writes metrics.

Note

meterRegistry 对话的代码是否应该包含在 Mono.fromRunnable()?也许。编写时代码块,但在这个化身中,指标存储在内存中,因此成本低。然而,成本可能会上升,这意味着它应该得到妥善管理。如果服务变为外部服务,则支持使用单独的 Mono 包装的可能性会迅速增加。

同样,如果我们将 MeterRegistry 注入 CommentService,我们也可以在那里使用它:

    @RabbitListener(bindings = @QueueBinding( 
      value = @Queue, 
      exchange = @Exchange(value = "learning-spring-boot"), 
      key = "comments.new" 
    )) 
    public void save(Comment newComment) { 
      repository 
       .save(newComment) 
       .log("commentService-save") 
       .subscribe(comment -> { 
         meterRegistry
          .counter("comments.consumed", "imageId", comment.getImageId())
          .increment(); 
       }); 
    } 

 

 

这与我们添加到 CommentController 的内容一致。前面的代码可以解释如下:

  • Using the injected MeterRegistry, we increment a comments.consumed metric with every comment.
  • It's also tagged with the comment's related imageId.
  • The metrics are handled after the save is completed inside the subscribe method. This method grants us the ability to execute some code once the flow is complete.

Note

Spring AMQP 还不支持 Reactive Streams。这就是为什么 rabbitTemplate.convertAndSend() 必须包装在 Mono.fromRunnable 中的原因。像这种 subscribe() 方法这样的阻塞调用应该是危险信号,但是在这种情况下,在 Spring AMQP 能够添加支持之前,这是必要的邪恶。如果没有它,就没有其他方法可以指示此 Reactor 流执行。

重新启动我们的应用程序并手动输入大量评论的想法听起来并不令人兴奋。那么为什么不写一个模拟器来为我们做呢!

@Profile("simulator")
@Component
public class CommentSimulator {

  private final CommentController controller;
  private final ImageRepository repository;

  private final AtomicInteger counter;

  public CommentSimulator(CommentController controller,
              ImageRepository repository) {
    this.controller = controller;
    this.repository = repository;
    this.counter = new AtomicInteger(1);
  }

  @EventListener
  public void onApplicationReadyEvent(ApplicationReadyEvent event) {
    Flux
      .interval(Duration.ofMillis(1000))
      .flatMap(tick -> repository.findAll())
      .map(image -> {
        Comment comment = new Comment();
        comment.setImageId(image.getId());
        comment.setComment(
          "Comment #" + counter.getAndIncrement());
        return Mono.just(comment);
      })
      .flatMap(newComment ->
        Mono.defer(() ->
          controller.addComment(newComment)))
      .subscribe();
  }
}

让我们把这个模拟器分开:

  • The @Profile annotation indicates that this only operates if spring.profiles.active=simulator is present when the app starts
  • The @Component annotation will allow this class to get picked up by Spring Boot automatically and activated
  • The class itself is located in the root package, com.greglturnquist.learningspring, given that it pulls bits from both subpackages
  • The @EventListener annotation signals Spring to pipe application events issued to the app context. In this case, the method is interested in ApplicationReadyEvents, fired when the application is up and operational
  • Flux.interval(Duration.ofMillis(1000)) causes a stream of lazy ticks to get fired every 1000 ms, lazily
  • By flatMapping over this Flux, each tick is transformed into all images using the ImageRepository
  • Each image is used to generate a new, related comment
  • Using the injected CommentController, it simulates the newly minted comment being sent in from the web

如果我们用 spring.profiles.active=simulator 重新配置我们的运行器,我们可以看到它运行。 IntelliJ IDEA 提供了轻松设置 Spring 配置文件的方法:

读书笔记《developing-java-applications-with-spring-and-spring-boot-ebook》使用Spring Boot的AMQP消息传递

您可以在上一个屏幕截图的底部看到突出显示的条目。

如果我们在听到机器的风扇进入高速档后开始工作,我们可以在 http://localhost:8080/application/metrics/comments.consumed 和 http://localhost:8080/application/metrics/comments.produced,并期待看到计数。

在最后一张截图中,我们可以清楚地看到 counter.comments.producedcounter.comments.consumed,它们恰好是相同,这意味着没有人丢失。

我们还可以看到 equal 消息数量 它们(正如我们的模拟器所期望的那样)。

Peeking at Spring Cloud Stream (with RabbitMQ)


通过消息传递将许多小型服务链接在一起是一种非常常见的 模式。随着微服务的兴起,它越来越受欢迎。使用 RabbitTemplate 或其他一些传输模板(KafkaTemplate 等)一遍又一遍地编写相同的模式是我们应该考虑的另一个复杂程度不要背负。

Spring Cloud Stream (http://cloud.spring.io/ spring-cloud-stream/) 来救援!

Spring Cloud Stream 采用 inputsoutputstransformers 来自 Spring Integration 并使得将它们链接在一起变得超级容易。

为了改变我们的社交媒体平台来做到这一点,我们可以从我们的构建文件中删除 spring-boot-starter-amqp 并添加它:

    compile(
      'org.springframework.cloud:spring-cloud-starter-stream-rabbit') 
    compile(
      'org.springframework.cloud:spring-cloud-stream-reactive') 

前面的依赖项带来了以下内容:

  • spring-cloud-stream-binder-rabbit-core
  • spring-cloud-stream-codec
  • spring-cloud-stream
  • spring-cloud-stream-reactive
  • spring-boot-starter-amqp
  • spring-integration-amqp

Note

Spring Cloud Stream 有很多入门者。本质上,我们必须选择底层传输技术,但我们不必直接与传输技术交互。

Introduction to Spring Cloud

春云?那是什么?

Spring Cloud 是通过各种库提供的 Spring Boot 的扩展,旨在解决不同的云原生模式。在这种情况下,Spring Cloud Stream 旨在通过消息传递简化服务的链接。

要使用任何 Spring Cloud 库,我们需要将以下块添加到 build.gradle 文件的底部:

    dependencyManagement { 
      imports { 
        mavenBom "org.springframework.cloud:spring-cloud- 
         dependencies:${springCloudVersion}" 
      } 
    } 

前面的代码片段是 Spring 的依赖管理 gradle 插件的一部分,引入 Spring Cloud BOM材料清单)。在这种情况下,它有一个变量,springCloudVersion,我们需要选择它。

Spring Cloud 有发布列车,这意味着每个库都有一个版本,但所有版本都是协调的。通过选择一列火车,我们得到了一系列可供选择的工具(我们将贯穿本书的其余部分!)。

与 Spring Boot 2.0 相关的 Spring Cloud 发布火车是 Finchley,所以让我们把它放在我们的 Boot 版本旁边的顶部:

    buildscript { 
      ext { 
        springBootVersion = '2.0.0.M5' 
        springCloudVersion = 'Finchley.M3' 
      } 
      ... 
    }

Note

如果您对 Spring Cloud 的各种发布系列感到好奇,请查看其项目页面 http://projects.spring.io/spring-cloud/

随着 Spring Cloud 的 BOM 和 Spring Cloud Stream 添加到我们的构建中,让我们回到使用 Spring Cloud Stream 的核心接口配置消息传递,如如下:

    @Controller 
    @EnableBinding(Source.class) 
    public class CommentController { 
 
      private final CounterService counterService; 
      private FluxSink<Message<Comment>> commentSink; 
      private Flux<Message<Comment>> flux; 
 
      public CommentController(CounterService counterService) { 
        this.counterService = counterService; 
        this.flux = Flux.<Message<Comment>>create( 
          emitter -> this.commentSink = emitter, 
          FluxSink.OverflowStrategy.IGNORE) 
          .publish() 
          .autoConnect(); 
      } 
 
      @PostMapping("/comments") 
      public Mono<String> addComment(Mono<Comment> newComment) { 
        if (commentSink != null) { 
          return newComment 
           .map(comment -> commentSink.next(MessageBuilder 
           .withPayload(comment) 
           .build())) 
           .then(Mono.just("redirect:/")); 
        } else { 
            return Mono.just("redirect:/"); 
        } 
      } 
 
      @StreamEmitter 
      public void emit(@Output(Source.OUTPUT) FluxSender output) { 
        output.send(this.flux); 
      } 
 
    } 

最后一段代码与我们在本章前面创建的 CommentController 非常相似,但有以下区别:

  • @EnableBinding(Source.class) flags this app as a source for new events. Spring Cloud Stream uses this annotation to signal the creation of channels, which, in RabbitMQ, translates to exchanges and queues.
  • The constructor proceeds to set up a FluxSink, the mechanism to emit new messages into a downstream Flux. This sink is configured to ignore downstream backpressure events. It starts publishing right away, autoconnecting to its upstream source upon subscription.
  • The objects being emitted are Message<Comment>, which is Spring's abstraction for a POJO wrapped as a transportable message. This includes the ability to add headers and other information.
  • Inside addComments, if the sink has been established, it maps newComment into a Message<Comment> using Spring Messaging APIs. Finally, it transmits the message into the sink.
  • When the message is successfully emitted to Flux, a redirect is issued.
  • To transmit Flux of Message<Comment> objects, a separate method, emit, is wired up with an @StreamEmitter annotation. This method is fed a FluxSender, which provides us with a Reactor-friendly means to transmit messages into a channel. It lets us hook up the Flux tied to our FluxSink.
  • The @Output(Source.OUTPUT) annotation marks up which channel it gets piped to (visiting Source.OUTPUT reveals the channel name as output).

这个控制器中包含了很多东西。为了更好地理解它,有一些基本概念需要实现。

首先,创建一个 Flux 然后添加到它不是常见的做法。范式是将其包裹在其他东西上。为了明确这一点,Flux 本身就是一个抽象类。你不能实例化它。相反,您必须使用它的各种静态辅助方法来制作一个。因此,当我们想要获取与用户点击网站相关的行为并将其链接到应用程序启动时创建的 Flux 时,我们需要类似 FluxSink 将这两件事联系在一起。

Spring Cloud Stream 专注于将具有源/接收器语义的消息流链接在一起。对于 Reactor,这意味着将消息的 Flux 调整到通道上,这是 Spring Integration 多年来策划的一个概念。鉴于通道的具体性质已被抽象出来,我们使用何种传输技术并不重要。由于 Spring Boot 的强大功能,这是由类路径上的依赖项定义的。尽管如此,我们将继续使用 RabbitMQ,因为它既简单又强大。

顺便说一句,当我们访问 时,我们将再次看到将接收器连接到 Flux 的概念第 17 章WebSockets with Spring Boot。将一次性对象连接到已建立的流时,这是一种常见的 Reactor 模式。

要声明一个 Spring Cloud Stream 消费者,我们只需要更新我们的 CommentService 如下:

    @Service 
    @EnableBinding(CustomProcessor.class) 
    public class CommentService { 

CommentService 的顶部,我们需要添加 @EnableBinding(CustomProcessor.class)。如果这是唯一的 Spring Cloud Stream 组件,我们可以使用 @EnableBinding(Processor.class),但是,我们不能与 <代码类="literal">CommentController。所以我们需要编写一组自定义通道,CustomProcessor 如下图:

public interface CustomProcessor {

   String INPUT = "input";
   String OUTPUT = "emptyOutput";

@Input(CustomProcessor.INPUT)
   SubscribableChannel input();

@Output(CustomProcessor.OUTPUT)
   MessageChannel output();

}

这个自定义处理器与 Spring Cloud Stream 的 Processor 非常相似:

  • It's a declarative interface.
  • It has two channel names, INPUT and OUTPUT. The INPUT channel uses the same as Processor. To avoid colliding with the OUTPUT channel of Source, we create a different channel name, emptyOutput. (Why call it emptyOutput? We'll see in a moment!)
  • The is a SubscribableChannel for inputs and a MessageChannel for outputs.

这将我们的应用程序标记为事件的 SinkSource。还记得我们在使用 RabbitTemplate 时必须提前 subscribe 吗?

值得庆幸的是,Spring Cloud Stream 对 Reactor 友好。在处理 Reactive Streams 时,我们的代码不应该是处理的终止点。因此,接收 Comment 对象的传入 Flux 必须导致传出 Flux 框架可以调用,我们很快就会看到。

CommentService 中,我们需要更新我们的 save 方法,如下所示:

    @StreamListener 
    @Output(CustomProcessor.OUTPUT) 
    public Flux<Void> save(@Input(CustomProcessor.INPUT) 
     Flux<Comment> newComments) { 
       return repository 
        .saveAll(newComments) 
        .flatMap(comment -> { 
          meterRegistry
            .counter("comments.consumed", "imageId", comment.getImageId())
            .increment(); 
          return Mono.empty(); 
        }); 
    } 

让我们拆开之前更新的 save 版本:

  • The @RabbitListener annotation has been replaced with @StreamListener, indicating that it's transport-agnostic.
  • The argument newComments is tied to the input channel via the @Input() annotation.
  • Since we've marked it as Flux, we can immediately consume it with our MongoDB repository.
  • Since we have to hand a stream back to the framework, we have marked up the whole method with @Output.
  • From there, we can flatMap it to generate metrics and then transform it into a Flux of Mono<Void> s with Mono.empty(). This ensures that no more processing is done by the framework.

此方法与所有 Spring @*Listener 注解具有相同的概念——​调用带有可选域对象的方法。但是这一次,它从我们配置 Spring Cloud Stream 使用的任何底层技术接收它们。好处是它小巧且易于管理,并且我们的代码不再直接绑定到 RabbitMQ。

话虽如此,我们需要向 Spring Cloud Stream 表达我们的源和接收器需要通过同一个 RabbitMQ 交换进行通信。为此,我们需要在 application.yml 中提供设置:

    spring: 
      cloud: 
        stream: 
          bindings: 
            input: 
              destination: learning-spring-boot-comments 
              group: learning-spring-boot 
            output: 
              destination: learning-spring-boot-comments 
              group: learning-spring-boot 

最后一个应用程序配置包含以下详细信息:

  • spring.cloud.stream.bindings is configured for both the input and the output channel's destination to be learning-spring-boot. When using RabbitMQ bindings, this is the name of the exchange and Spring Cloud Stream uses topic exchanges by default.
  • We take advantage of Spring Cloud Streams' support for consumer groups by also setting the group property. This ensures that even if there are multiple stream listeners to a given channel, only one listener will consume any one message. This type of guarantee is required in cloud-native environments when we can expect to run multiple instances.

Note

如本书前面所述,您可以使用 application.propertiesapplication.yml。如果您发现自己配置了许多具有相同前缀的设置,请使用 YAML 使其更易于阅读并避免重复。

顺便说一句,还记得本章前面必须定义一个 Jackson2JsonMessageConverter bean 来处理序列化吗?不再需要。 Spring Cloud Stream 使用 Esoteric Software 的 Kryo 库进行序列化/反序列化(https://github.com/EsotericSoftware/kryo)。这意味着,我们可以放弃那个 bean 定义。谈论精简代码!

如果我们再次运行模拟器 (spring.profiles.active=simulator) 并检查 http://localhost:8080/application/指标,我们可以看到我们的自定义指标列表所有内容。

有了这个,我们设法改变了 comments 解决方案,但仍保留了相同的指标集。

但是,通过切换到 Spring Cloud Stream,我们收集了全新的指标队列,如下图所示:

读书笔记《developing-java-applications-with-spring-and-spring-boot-ebook》使用Spring Boot的AMQP消息传递

这是一个涵盖输入和输出通道的子集(太多无法填满一本书)。

还记得我们在上一章是如何编写自定义健康检查的吗?为 RabbitMQ 及其绑定提供一个会很方便。你猜怎么着?已经完成了。一探究竟:

读书笔记《developing-java-applications-with-spring-and-spring-boot-ebook》使用Spring Boot的AMQP消息传递

在最后一张截图中,我们可以看到以下内容:

  • The RabbitMQ broker is up and operational
  • Our RabbitMQ binders are operational as well

有了这个,我们就有了一个运行良好的 comment 系统。

Logging with Spring Cloud Stream

总结一下,真正查看 Spring Cloud Stream 是如何处理事情会很好。为此,我们可以像这样调用 application.yml 中的日志级别:

    logging: 
       level: 
        org: 
          springframework: 
            cloud: DEBUG 
            integration: DEBUG 

最后一段代码调高了 Spring Cloud Stream 及其底层技术 Spring Integration 的日志级别。读者可以通过设置 org.springframework.amqp=DEBUG 来更改 RabbitTemplate 日志级别并看看会发生什么.

随着这些级别的增加,如果我们运行我们的应用程序,我们可以看到一些这样的:

读书笔记《developing-java-applications-with-spring-and-spring-boot-ebook》使用Spring Boot的AMQP消息传递

上一个屏幕截图显示了与处理通道设置以及设置 AMQP 交换和队列的 Spring Integration 相比,绑定中涉及的 Spring Cloud Stream 之间的明显区别。

很高兴观察到日志记录前缀 o.s.c.sorg.springframework.cloud.stream 或 Spring Cloud Stream 的缩写。

如果我们在网页上添加新评论,我们可以看到结果,如下所示:

读书笔记《developing-java-applications-with-spring-and-spring-boot-ebook》使用Spring Boot的AMQP消息传递

此屏幕截图很好地显示了 Comment 被传输到 output 通道和然后在 input 频道上收到。

另请注意,日志记录前缀 o.s.i 表示 Spring Integration,而 s.i.m 是 Spring Integration 的消息 API。

Summary


在本章中,我们创建了一个基于消息的解决方案,供用户评论图像。我们首先使用 Spring AMQP 和 RabbitTemplate 将写入分派到单独的 slice。然后我们用带有 RabbitMQ 绑定的 Spring Cloud Stream 替换它。这让我们可以通过消息传递解决评论情况,但我们的代码不会绑定到特定的传输技术。

在下一章中,我们将把我们快速增长的单体应用分解成更小的微服务,并使用 Spring Cloud 来简化这些分布式组件之间的集成。