vlambda博客
学习文章列表

读书笔记《第一部分 使用 Spring Boot 开始微服务开发》第7章开发响应式微服务

PART I

Getting Started with Microservice Development Using Spring Boot

在这一部分中,您将学习如何使用 Spring Boot 的一些最重要的特性来开发微服务。

本部分包括以下章节:

  • 第 1 章微服务简介
  • 第二章Spring Boot简介
  • 第 3 章创建一组协作微服务
  • 第 4 章使用 Docker 部署我们的微服务
  • 第 5 章使用 OpenAPI 添加 API 描述
  • 第 6 章添加持久性
  • 第 7 章开发响应式微服务

Developing Reactive Microservices

在本章中,我们将学习如何开发响应式微服务,即如何开发非阻塞同步 REST API 和异步事件驱动服务。我们还将了解如何在这两种选择之间进行选择。最后,我们将看到如何创建和运行反应式微服务环境的手动和自动化测试。

正如第 1 章微服务简介中所述,反应式系统的基础是它们是消息驱动的——它们使用异步通信。这使它们具有弹性,换句话说,具有可扩展性和弹性,这意味着它们可以容忍故障。弹性和弹性一起将使反应系统能够响应。

本章将涵盖以下主题:

  • 在非阻塞同步 API 和事件驱动异步服务之间进行选择
  • 开发非阻塞同步 REST API
  • 开发事件驱动的异步服务
  • 运行反应式微服务环境的手动测试
  • 运行反应式微服务环境的自动化测试

技术要求

有关如何安装本书中使用的工具以及如何访问本书源代码的说明,请参阅:

  • 第 21 章 用于 macOS
  • 第 22 章 适用于 Windows

本章代码示例均来自$BOOK_HOME/Chapter07中的源码。

如果您想查看本章中对源代码应用的更改,即查看如何使微服务响应式,您可以将其与 第 6 章的源代码进行比较 添加持久性。可以使用自己喜欢的diff工具,对比两个文件夹,即$BOOK_HOME/Chapter06$BOOK_HOME/Chapter07

在非阻塞同步 API 和事件驱动异步服务之间进行选择

在开发响应式微服务时,何时使用非阻塞同步 API 以及何时使用事件驱动异步服务并不总是很明显。一般来说,要使 微服务健壮和可扩展,使其尽可能自治很重要,例如,通过最小化其运行时依赖关系。这也是称为松散耦合。因此,事件的异步消息传递优于同步 API。这是因为微服务将仅依赖于在运行时对消息系统的访问,而不是依赖于对许多其他微服务的同步访问。

然而,在许多情况下,同步 API 可能是有利的。例如:

  • 对于最终用户正在等待响应的读取操作
  • 客户端平台更适合使用同步 API,例如移动应用程序或 SPA Web 应用程序
  • 客户端将从其他组织连接到服务的位置——可能难以就跨组织使用的通用消息传递系统达成一致

对于本书中的系统架构,我们将使用以下内容:

  • 产品组合微服务所暴露的创建、读取和删除服务将基于非阻塞同步 API。假设复合微服务在 Web 和移动平台上都有客户端,以及来自其他组织而不是操作系统环境的客户端。因此,同步 API 似乎是天作之合。
  • 核心微服务提供的读取服务也将被开发为非阻塞同步 API,因为有最终用户在等待他们的响应。
  • 核心微服务提供的创建和删除服务将被开发为事件驱动的异步服务,这意味着它们将监听每个微服务专用主题的创建和删除事件。
  • 复合微服务提供的用于创建和删除聚合产品信息的同步API将发布关于这些主题的创建和删除事件。如果发布操作成功,则返回 202(Accepted)响应,否则返回错误响应。 202 响应与正常的 200(OK)响应不同——它表示请求已被接受,但未完全处理。相反,处理将独立于 202 响应异步完成。

下图说明了这一点:

读书笔记《第一部分 使用 Spring Boot 开始微服务开发》第7章开发响应式微服务

图 7.1:微服务格局

首先,让我们学习如何开发非阻塞同步 REST API,然后,我们将看看如何开发事件驱动的异步服务。

开发非阻塞同步 REST API

在本节中,我们将学习如何开发读取 API 的非阻塞版本。复合服务将对三个核心服务并行进行响应式调用,即非阻塞调用。当复合服务收到来自所有核心服务的响应时,它将创建一个复合响应并将其发送回调用者。如下图所示:

读书笔记《第一部分 使用 Spring Boot 开始微服务开发》第7章开发响应式微服务

图 7.2:景观的 getCompositeProduct 部分

在本节中,我们将介绍以下内容:

  • 项目反应堆简介
  • 使用 Spring Data for MongoDB 的非阻塞持久性
  • 核心服务中的非阻塞 REST API,包括如何处理基于 JPA 的持久层的阻塞代码
  • 复合服务中的非阻塞 REST API

项目反应堆简介

正如我们 Chapter 2Spring WebFlux 部分中提到的,Spring Boot 简介Spring 5 中的响应式支持是基于 Project Reactor (https://projectreactor.io)。 Project Reactor 基于 Reactive Streams 规范 (http://www.reactive-streams.org),用于构建响应式应用程序的标准。 Project Reactor 是 基础的——它是 Spring WebFlux、Spring WebClient 和 Spring Data 赖以提供其反应性和非阻塞特性的东西。

编程模型基于处理数据流,Project Reactor中的核心数据类型是通量和单声道。 Flux 对象用于处理 0...n 元素的流和Mono 对象用于处理为空或最多返回一个元素的流。我们将在本章中看到许多使用它们的例子。作为一个简短的介绍,让我们看一下以下测试:

@Test
void testFlux() {

  List<Integer> list = Flux.just(1, 2, 3, 4)
    .filter(n -> n % 2 == 0)
    .map(n -> n * 2)
    .log()
    .collectList().block();

  assertThat(list).containsExactly(4, 8);
}

下面是对前面源码的解释:

  1. 我们使用整数 12 启动流, 3, 和 4 使用静态辅助方法Flux.just()
  2. 接下来,我们过滤 出奇数——我们只允许偶数通过流。在这个测试中,它们是 24
  3. 接下来,我们通过将流中的值乘以 map) -">2,所以它们变成 4 8
  4. 然后,我们记录map之后流过的数据 操作。
  5. 我们使用 collectList 方法将流中的所有项目收集到 列表,在流完成后发出。
  6. 到目前为止,我们只声明了流的处理。要真正处理流,我们需要有人订阅它。对 block 方法的最终调用将注册一个等待处理完成的订阅者。
  7. 结果列表保存在名为 list 的成员变量中。
  8. 我们现在可以通过使用 assertThat 方法来断言 list 流处理后包含预期的结果——整数 48

日志输出将如下所示:

读书笔记《第一部分 使用 Spring Boot 开始微服务开发》第7章开发响应式微服务

图 7.3:上述代码的日志输出

前面的日志输出,我们可以看出:

  1. 流的处理由订阅流并请求其内容的订阅者启动。
  2. 接下来,整数 48 通过log 操作。
  3. 处理结束时调用订阅者上的 onComplete 方法,通知它流已经结束。

完整的源代码见 util 中的 ReactorTests 测试类 项目。

通常,我们不会启动流的处理。相反,我们只定义如何处理它,而基础设施组件将负责启动处理。例如,Spring WebFlux 将作为对传入 HTTP 请求的响应来执行此操作。这个经验法则的一个例外是阻塞代码需要来自反应流的响应的情况。在这些情况下,阻塞代码可以调用 block() 方法">Flux 或 Mono 对象以阻塞方式获取响应。

使用 Spring Data for MongoDB 的非阻塞持久性

产品推荐服务反应式非常简单:

  • 将存储库的基类更改为 ReactiveCrudRepository
  • 更改自定义查找器方法以返回 MonoFlux< /代码>对象

ProductRepositoryRecommendationRepository 更改后如下所示:

public interface ProductRepository extends ReactiveCrudRepository <ProductEntity, String> {
    Mono<ProductEntity> findByProductId(int productId);
}

public interface RecommendationRepository extends ReactiveCrudRepository<RecommendationEntity, String> {
    Flux<RecommendationEntity> findByProductId(int productId);
}

review 服务的持久性代码没有应用任何更改;它将使用 JPA 存储库保持阻塞。关于如何处理持久层中的阻塞代码,请看下面的处理阻塞代码部分复习 服务。

对于完整的代码,请查看以下类:

  • product 项目中的 ProductRepository
  • recommendation 项目中的 RecommendationRepository

测试代码的变化

当涉及到测试持久层时,我们必须做出一些改变。由于我们的持久化方法现在返回 MonoFlux对象,测试方法必须等待响应在返回的反应对象中可用。测试方法可以使用对 block() 方法的显式调用-">Mono/Flux 对象等待响应可用,或者他们可以使用 StepVerifier 来自 Project Reactor 的帮助器类,用于声明可验证的异步事件序列。

让我们看看如何更改以下测试代码以适用于存储库的响应式版本:

ProductEntity foundEntity = repository.findById(newEntity.getId()).get();
assertEqualsProduct(newEntity, foundEntity);

我们可以在 Monoblock() 方法> repository.findById() 方法返回的对象,并保持命令式编程风格,如下所示:

ProductEntity foundEntity = repository.findById(newEntity.getId()).block();
assertEqualsProduct(newEntity, foundEntity);

或者,我们 可以使用 StepVerifier 类来设置一系列处理步骤,执行存储库查找操作并验证结果。该序列通过对 verifyComplete() 方法的最终调用进行初始化,如下所示:

StepVerifier.create(repository.findById(newEntity.getId()))
  .expectNextMatches(foundEntity -> areProductEqual(newEntity, foundEntity))
  .verifyComplete();

有关使用 StepVerifier 类的测试示例,请参阅 PersistenceTests< /code> product 项目中的测试类。

有关使用 block() 方法的相应测试示例,请参阅 recommendation 项目中的>PersistenceTests 测试类。

核心服务中的非阻塞 REST API

有了 非阻塞 持久层,是时候制作 核心服务中的 API 也是非阻塞的。我们需要进行以下更改:

  • 更改 API,使其仅返回反应数据类型
  • 更改服务实现,使其不包含任何阻塞代码
  • 更改我们的测试,以便他们可以测试反应式服务
  • 处理阻塞代码——将仍然需要阻塞的代码与非阻塞代码隔离开来

API 的变化

使核心服务的 API 具有响应性,我们需要更新它们的方法,以便它们返回 MonoFlux 对象。

例如 product 服务中的 getProduct()现在返回 Mono 而不是 Product目的:

Mono<Product> getProduct(@PathVariable int productId);

完整源代码请看core接口">api 项目:

  • ProductService
  • RecommendationService
  • ReviewService

服务实现的变化

对于 productrecommendation 项目,使用响应式持久层,我们可以使用 Project Reactor 中的 fluent API。例如,getProduct() 方法的实现类似于以下代码:

public Mono<Product> getProduct(int productId) {

    if (productId < 1) {
      throw new InvalidInputException("Invalid productId: " + productId);
    }

    return repository.findByProductId(productId)
        .switchIfEmpty(Mono.error(new NotFoundException("No product found
         for productId: " + productId)))
        .log(LOG.getName(), FINE)
        .map(e -> mapper.entityToApi(e))
        .map(e -> setServiceAddress(e));
}

让我们检查一下代码的作用:

  1. 该方法将返回一个 Mono 对象;仅在此处声明处理。该处理由 Web 框架 Spring WebFlux 触发,一旦收到对该服务的请求,就会订阅 Mono 对象!
  2. 将使用 productId 从底层数据库中使用 检索产品持久存储库中的 findByProductId() 方法。
  3. 如果没有为给定的 productId 找到产品,则 NotFoundException 将被抛出。
  4. log 方法将产生日志输出。
  5. 将调用 mapper.entityToApi() 方法将返回的实体从持久层转换为 API 模型对象。
  6. 最终的 map 方法 将使用辅助方法 setServiceAddress(),在 serviceAddress中设置处理请求的微服务的DNS名称和IP地址 模型对象的字段。

成功处理的一些示例日志输出如下:

读书笔记《第一部分 使用 Spring Boot 开始微服务开发》第7章开发响应式微服务

图 7.4:处理成功时的日志输出

以下是失败处理的示例日志输出(抛出 NotFoundException):

读书笔记《第一部分 使用 Spring Boot 开始微服务开发》第7章开发响应式微服务

图 7.5:处理失败时的日志输出

有关完整的源代码,请参见以下类:

  • product 项目中的 ProductServiceImpl
  • recommendation 项目中的 RecommendationServiceImpl

测试代码的变化

服务实现的测试代码已按照与我们之前描述的持久层测试相同的方式进行了更改。为了处理响应式返回类型的异步行为,MonoFlux ,测试混合使用调用 block() 方法和使用 StepVerifier 辅助类。

有关完整的源代码,请参阅以下测试类:

  • product 项目中的 ProductServiceApplicationTests
  • recommendation 项目中的 RecommendationServiceApplicationTests

处理阻塞代码

review 服务的 案例中,该服务使用 JPA 以关系访问其数据数据库,我们不支持非阻塞编程模型。相反,我们可以使用 Scheduler 运行阻塞代码,它能够在专用线程池中的线程上运行阻塞代码,具有线程数有限。为阻塞代码使用线程池可以避免耗尽微服务中的可用线程,并避免影响微服务中的并发非阻塞处理(如果有的话)。

让我们看看如何通过以下步骤进行设置:

  1. 首先,我们在主类ReviewServiceApplication中配置一个调度器bean及其线程池,如下:
    
    @Autowired
    public ReviewServiceApplication(
      @Value("${app.threadPoolSize:10}") Integer threadPoolSize,
      @Value("${app.taskQueueSize:100}") Integer taskQueueSize
    ) {
      this.threadPoolSize = threadPoolSize;
      this.taskQueueSize = taskQueueSize;
    }
    
    @Bean
    public Scheduler jdbcScheduler() {
      return Schedulers.newBoundedElastic(threadPoolSize,
        taskQueueSize, "jdbc-pool");
    }
    								

    从的代码中我们可以看到调度器bean被命名为jdbcScheduler ,我们可以使用下面的属性来配置它的线程:

    • app.threadPoolSize,指定池中的最大线程数;默认为 10
    • app.taskQueueSize,指定允许放入队列等待的最大任务数对于可用线程;默认为 100
  2. 接下来,我们将名为jdbcScheduler的调度器注入到review 服务实现类,如下图:
    
    @RestController
    public class ReviewServiceImpl implements ReviewService {
    
      private final Scheduler jdbcScheduler;
    
      @Autowired
      public ReviewServiceImpl(
        @Qualifier("jdbcScheduler")
        Scheduler jdbcScheduler, ...) {
        this.jdbcScheduler = jdbcScheduler;
      }
    								
  3. 最后,我们在 getReviews() 方法的响应式实现中使用调度器的线程池,如下所示:
    @Override
    public Flux<Review> getReviews(int productId) {
    
      if (productId < 1) {
        throw new InvalidInputException("Invalid productId: " + 
          productId);
      }
    
      LOG.info("Will get reviews for product with id={}", 
        productId);
      return Mono.fromCallable(() -> internalGetReviews(productId))
        .flatMapMany(Flux::fromIterable)
        .log(LOG.getName(), FINE)
        .subscribeOn(jdbcScheduler);
    }
    
    private List<Review> internalGetReviews(int productId) {
    
      List<ReviewEntity> entityList = repository.
        findByProductId(productId);
      List<Review> list = mapper.entityListToApiList(entityList);
      list.forEach(e -> e.setServiceAddress(serviceUtil.
        getServiceAddress()));
    
      LOG.debug("Response size: {}", list.size());
    
      return list;
    }
    								

    _id 这里,缓冲代码在 internalGetReviews() 方法中,并被包裹在一个 Mono 对象使用 Mono.fromCallable() 方法。 getReviews() 使用方法 subscribeOn() jdbcScheduler 的线程池中的线程中运行阻塞代码的方法。

当我们稍后在本章中运行测试时,我们可以查看 review 服务的日志输出并查看 SQL 语句在其中运行的证明调度程序专用池中的线程。我们将能够看到这样的日志输出:

读书笔记《第一部分 使用 Spring Boot 开始微服务开发》第7章开发响应式微服务

图 7.6:审查服务的日志输出

从前面的日志输出中,我们可以看到以下内容:

  • 第一个日志输出来自 LOG.info() 调用>getReviews() 方法,它在一个名为 ctor-http-nio-4 的 HTTP 线程上执行,使用的线程通过 WebFlux。
  • 第二个日志输出中,我们可以看到 Spring Data JPA 生成的 SQL 语句,在后台使用 Hibernate。 SQL语句对应方法调用repository.findByProductId()。它在一个名为 jdbc-pool-1 的线程上执行,这意味着它是在用于阻塞代码的专用线程池中的一个线程中执行的,如预期的!

完整的源代码见 ReviewServiceApplicationReviewServiceImpl review 项目中的类。

有了处理阻塞代码的逻辑,我们就完成了在核心服务中实现非阻塞 REST API。让我们继续看看如何使组合服务中的 REST API 成为非阻塞的。

组合服务中的非阻塞 REST API

为了使我们在复合服务中的REST API无阻塞,我们需要执行以下操作:

  • 更改 API,使其操作仅返回反应数据类型
  • 更改服务实现,使其以非阻塞方式并行调用核心服务的 API
  • 更改集成层,使其使用非阻塞 HTTP 客户端
  • 更改我们的测试,以便他们可以测试响应式服务

API 的变化

为了使 复合服务的 API 具有反应性,我们需要应用与之前描述的核心服务 API 相同类型的更改。这意味着 getProduct() 方法的返回类型,ProductAggregate ,需要替换成Mono createProduct()deleteProduct() 方法需要更新为返回 Mono 而不是 void ,否则我们无法将任何错误响应传播回 API 的调用者。

完整源码见api<中的ProductCompositeService接口/代码>项目。

服务实现的变化

为了能够并行调用这三个API,服务实现使用静态zip() Mono 类上的 code> 方法。 zip 方法能够处理多个并行响应式请求,并在它们全部完成后将它们压缩在一起。代码如下所示:

@Override
public Mono<ProductAggregate> getProduct(int productId) {
  return Mono.zip(

    values -> createProductAggregate(
      (Product) values[0],
      (List<Recommendation>) values[1],
      (List<Review>) values[2],
      serviceUtil.getServiceAddress()),

    integration.getProduct(productId),
    integration.getRecommendations(productId).collectList(),
    integration.getReviews(productId).collectList())

    .doOnError(ex ->
      LOG.warn("getCompositeProduct failed: {}",
      ex.toString()))
    .log(LOG.getName(), FINE);
}

让我们仔细看看:

  • zip 方法的第一个参数是一个 lambda 函数,它将接收数组中的响应,名为 。该数组将包含一个产品、一个推荐列表和一个评论列表。来自三个 API 调用的响应的实际聚合由与以前相同的辅助方法 createProductAggregate() 处理,没有任何更改。
  • lambda 函数后面的参数是 zip 方法将并行调用的请求列表,一个 Mono 每个请求的对象。在我们的例子中,我们发送了三个由集成类中的方法创建的 Mono 对象,每个请求发送到每个核心微服务。

完整源代码见product-中的ProductCompositeServiceImpl类复合项目。

有关 createProductdeleteProduct API 操作的信息在 product-composite 服务中实现,请参阅 在复合服务中发布事件 部分稍后的。

集成层的变化

ProductCompositeIntegration 集成类中,我们替换了阻塞的 HTTP 客户端,RestTemplate,自带非阻塞HTTP客户端WebClient与春天 5。

要创建 WebClient 实例,builder pattern 是 使用过。如果需要自定义,例如设置通用标题或过滤器,可以使用构建器完成。有关可用的配置选项,请参阅 https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html#webflux-client-builder

WebClient 使用如下:

  1. 在构造函数中,WebClient 是自动注入的。我们在没有任何配置的情况下构建 WebClient 实例:
    公共类 ProductCompositeIntegration 实现 ProductService, RecommendationService, ReviewService { 私有最终 WebClient webClient; @自动连线 公共 ProductCompositeIntegration( WebClient.Builder webClient, ... ) { this.webClient = webClient.build(); } 
  2. 接下来,我们使用 webClient 实例发出非阻塞请求以调用 产品服务:
    @Override public Mono<产品> ; getProduct(int productId) { 字符串 url = productServiceUrl + "/product/" + productId; 返回 webClient.get().uri(url).retrieve() .bodyToMono(Product.class) .log(LOG.getName(),精细) .onErrorMap(WebClientResponseException.class, 前->处理异常(例如) ); } 

如果对 product 服务的 API 调用失败并返回 HTTP 错误响应,则整个 API 请求将失败。 WebClient 中的 onErrorMap() 方法将调用我们的handleException(ex) 方法,将HTTP层抛出的HTTP异常映射到我们自己的异常,例如一个NotFoundExceptionInvalidInputException

但是,如果调用 product 服务成功,但调用 recommendationreview API 失败,我们不想让整个请求失败。相反,我们希望将尽可能多的信息返回给调用者。因此,我们不会在这些情况下传播异常,而是返回一个空的推荐或评论列表。为了抑制错误,我们将调用 onErrorResume(error -> empty())。为此,代码如下所示:

@Override
public Flux<Recommendation> getRecommendations(int productId) {

  String url = recommendationServiceUrl + "/recommendation?
  productId=" + productId;

  // Return an empty result if something goes wrong to make it
  // possible for the composite service to return partial responses
  return webClient.get().uri(url).retrieve()
    .bodyToFlux(Recommendation.class)
    .log(LOG.getName(), FINE)
    .onErrorResume(error -> empty());
}

GlobalControllerExceptionHandler 类,来自 util 项目,将如前所述,捕获异常并将它们转换为正确的 HTTP 错误响应,然后发送回复合 API 的调用者。通过这种方式,我们可以确定来自底层 API 调用的特定 HTTP 错误响应是否会导致 HTTP 错误响应或只是部分空响应。

完整的源代码见product-中的ProductCompositeIntegration类复合项目。

测试代码的变化

测试类中唯一需要的更改是更新 Mockito 的设置及其对集成类的模拟。 mock 需要返回 MonoFlux 对象。 setup() 方法使用辅助方法 Mono.just() Flux.fromIterable(),如下代码所示:

class ProductCompositeServiceApplicationTests {
    @BeforeEach
    void setUp() {
        when(compositeIntegration.getProduct(PRODUCT_ID_OK)).
            thenReturn(Mono.just(new Product(PRODUCT_ID_OK, "name", 1,
             "mock-address")));
        when(compositeIntegration.getRecommendations(PRODUCT_ID_OK)).
            thenReturn(Flux.fromIterable(singletonList(new
             Recommendation(PRODUCT_ID_OK, 1, "author", 1, "content",
             "mock address"))));
        when(compositeIntegration.getReviews(PRODUCT_ID_OK)).
            thenReturn(Flux.fromIterable(singletonList(new
             Review(PRODUCT_ID_OK, 1, "author", "subject", "content",
             "mock address"))));

完整源代码见product中的ProductCompositeServiceApplicationTests测试类-复合项目。

这样就完成了我们的非阻塞同步 REST API 的实现。现在是时候开发我们的事件驱动异步服务了。

开发事件驱动的异步服务

在本节中,我们将学习如何开发创建和删除服务的事件驱动和异步版本。复合服务将在每个核心服务主题上发布创建和删除事件,然后将 OK 响应返回给调用者,而无需等待核心服务中发生处理。如下图所示:

读书笔记《第一部分 使用 Spring Boot 开始微服务开发》第7章开发响应式微服务

图 7.7:景观的 createCompositeProduct 和 deleteCompositeProduct 部分

我们将涵盖以下主题:

  • 处理消息传递的挑战
  • 定义主题和事件
  • Gradle 构建文件中的更改
  • 消费核心服务中的事件
  • 在复合服务中发布事件

处理消息传递的挑战

为了实现事件驱动的创建和删除服务,我们将使用Spring Cloud Stream。在第2章Spring Boot简介中,我们已经看到了发布和消费是多么容易关于使用 Spring Cloud Stream 的主题的消息。编程模型基于功能范式,其中实现功能接口之一的功能 Supplier, Function,或者Consumer在包java.util.function 可以链接在一起以执行分离的基于事件的处理。要从外部触发这种基于功能的处理,可以使用辅助类 StreamBridge 从非功能代码。

例如,要将 HTTP 请求的正文发布到主题,我们只需编写以下内容:

@Autowired
private StreamBridge streamBridge;

@PostMapping
void sampleCreateAPI(@RequestBody String body) {
  streamBridge.send("topic", body);
}

帮助类 StreamBridge使用触发处理。它将发布有关主题的消息。可以通过实现功能接口 java.util.function.Consumer 来定义从主题中消费事件(不创建新事件)的函数:

@Bean
public Consumer<String> mySubscriber() {
   return s -> System.out.println("ML RECEIVED: " + s);
}

为了将各种功能联系在一起,我们使用配置。我们将在下面的添加用于发布事件的配置添加用于消费事件的配置部分中看到此类配置的示例。

该编程模型可以独立于所使用的消息传递系统使用,例如 RabbitMQ 或 Apache Kafka!

尽管发送异步消息优于同步 API 调用,但它也有其自身的挑战。我们将看到如何使用 Spring Cloud Stream 来处理其中的一些。将涵盖 Spring Cloud Stream 中的以下功能:

  • 消费群体
  • 重试和死信队列
  • 保证订单和分区

我们将在以下部分中研究每一个。

消费群体

这里的问题是,如果我们扩大消息消费者的实例数量,例如,如果我们启动两个实例product 微服务的两个实例,product微服务将使用相同的消息,如下图所示:

读书笔记《第一部分 使用 Spring Boot 开始微服务开发》第7章开发响应式微服务

图 7.8:产品 #1 和 #2 使用相同的消息

这可能导致一条消息被处理两次,从而可能导致数据库中出现重复或其他不希望的不一致。因此,我们只希望每个消费者有一个实例来处理每条消息。这可以通过引入消费者群体来解决,如下图所示:

读书笔记《第一部分 使用 Spring Boot 开始微服务开发》第7章开发响应式微服务

图 7.9:消费者组

在 Spring Cloud Stream 中,可以在消费者端配置一个消费者组。例如,对于 product 微服务,它将如下所示:

spring.cloud.stream:
  bindings.messageProcessor-in-0:
    destination: products
    group: productsGroup

从这个配置中,我们可以了解到以下内容:

  • 默认情况下,Spring Cloud Stream 应用用于将配置绑定到函数的命名约定。对于发送到函数的消息,绑定名称为 -in-
    • functionName 为函数名,messageProcessor 在前面的例子中。
    • index 设置为 0,除非函数需要多个输入或输出参数。我们不会使用多参数函数,因此 index 将始终设置为 0 在我们的示例中。
    • 对于传出消息,绑定名称约定为 -out- .
  • destination 属性指定将使用消息的主题的名称,产品 在这种情况下。
  • group 属性指定要添加 product< 实例的消费者组/code> 微服务,在本例中为 productsGroup。这意味着发送到 products 主题的消息将仅由 Spring Cloud Stream 传递到 产品 微服务。

重试和死信队列

如果 消费者未能处理消息,则可能会为失败的消费者重新排队,直到成功处理。如果消息的内容无效,也就是称为中毒消息,该消息会阻止消费者处理其他消息直到它被手动删除。如果失败是由于临时问题,例如由于临时网络错误而无法访问数据库,则在重试多次后处理可能会成功。

必须可以指定重试次数,直到将消息移动到另一个存储以进行故障分析和纠正。失败的消息通常被移动到称为死信队列的专用队列。为了避免在临时故障(例如网络错误)期间使基础设施过载,必须可以配置重试执行的频率,最好是每次重试之间的时间长度增加。

在Spring Cloud Stream中,这个可以在消费者端配置,例如,对于product 微服务,如下图:

spring.cloud.stream.bindings.messageProcessor-in-0.consumer:
  maxAttempts: 3
  backOffInitialInterval: 500
  backOffMaxInterval: 1000
  backOffMultiplier: 2.0

spring.cloud.stream.rabbit.bindings.messageProcessor-in-0.consumer:
  autoBindDlq: true
  republishToDlq: true

spring.cloud.stream.kafka.bindings.messageProcessor-in-0.consumer:
  enableDlq: true

在前面的示例中,我们指定 Spring Cloud Stream 应该在将消息放入死信队列之前执行 3 重试。第一次重试将在 500 ms 之后尝试,另外两次在 1000 毫秒。

启用死信队列是绑定特定的;因此,我们有一种用于 RabbitMQ 的配置和一种用于 Kafka 的配置。

保证顺序和分区

如果业务逻辑要求消息按照发送的顺序被消费和处理,我们不能为每个消费者使用多个实例来提高处理性能;例如,我们不能使用消费者组。在某些情况下,这可能会导致处理传入消息时出现不可接受的延迟。

我们可以使用 partitions 来确保 消息按照发送的顺序传递,但不会损失性能和可扩展性。

在大多数情况下,只有影响相同业务实体的消息才需要严格的消息处理顺序。例如,在许多情况下,影响产品 ID 1 的产品的消息可以独立于影响产品 ID < 的产品的消息进行处理代码类="Code-In-Text--PACKT-">2。这意味着仅需要为具有相同产品 ID 的消息保证订单。

对此的解决方案是可以为每条消息指定一个 key,消息传递系统可以使用它来保证在具有相同密钥的消息之间保持顺序。这可以通过在主题中引入子主题(也称为 partitions)来解决。 消息系统根据其密钥将消息放置在特定分区中。

具有相同键的消息总是放在同一个分区中。消息系统只需要保证同一分区中消息的传递顺序即可。为了确保消息的顺序,我们在消费者组中为每个分区配置一个消费者实例。通过增加分区的数量,我们可以允许消费者增加它的实例数量。这提高了它的消息处理性能,而不会丢失交付顺序。如下图所示:

读书笔记《第一部分 使用 Spring Boot 开始微服务开发》第7章开发响应式微服务

图 7.10:为消息指定键

如上图所示,所有 Key 设置为 的消息123 总是转到分区 Products-1,而带有 Key设置为456去分区产品-2

在 Spring Cloud Stream 中,这需要在发布者和消费者端进行配置。在发布者端,必须指定分区的键和数量。例如,对于 product-composite 服务,我们有以下内容:

spring.cloud.stream.bindings.products-out-0.producer:
  partition-key-expression: headers['partitionKey']
  partition-count: 2

此配置意味着密钥将从名称为 partitionKey 的消息头中获取,并且将使用两个分区。

每个消费者可以指定它想从哪个分区消费消息。例如,对于 product 微服务,我们有以下内容:

spring.cloud.stream.bindings.messageProcessor-in-0:
  destination: products
  group:productsGroup
  consumer:
    partitioned: true
    instance-index: 0

这个配置告诉Spring Cloud Stream这个消费者只会消费来自分区号0的消息,也就是第一个分区。

定义主题和事件

正如我们已经在 Chapter 2Spring Cloud Stream 部分中提到的, Spring Boot 简介,Spring Cloud Stream 基于发布和订阅模式,发布者向主题和订阅者发布消息订阅他们有兴趣从中接收消息的主题。

我们将为每种类型的实体使用一个主题:产品建议评论 .

消息系统处理通常由标题和正文组成的消息。 事件是描述已发生事件的消息。对于事件,消息体可用于描述事件的类型、事件数据和事件发生时间的时间戳。

在本书的范围内,事件由以下内容定义:

  • 事件的类型,例如创建或删除事件
  • 标识数据的key,例如产品ID
  • 一个data元素,即事件中的实际数据
  • 时间戳,描述事件发生的时间

我们将使用的事件类如下所示:

public class Event<K, T> {

    public enum Type {CREATE, DELETE}

    private Event.Type eventType;
    private K key;
    private T data;
    private ZonedDateTime eventCreatedAt;

    public Event() {
        this.eventType = null;
        this.key = null;
        this.data = null;
        this.eventCreatedAt = null;
    }

    public Event(Type eventType, K key, T data) {
        this.eventType = eventType;
        this.key = key;
        this.data = data;
        this.eventCreatedAt = now();
    }

    public Type getEventType() {
        return eventType;
    }

    public K getKey() {
        return key;
    }

    public T getData() {
        return data;
    }

    public ZonedDateTime getEventCreatedAt() {
        return eventCreatedAt;
    }
}

下面详细解释一下前面的源码代码:

  • Event 类是一个 通用类 参数化其 keydata 的类型字段,KT
  • 事件类型声明为具有允许值的枚举器,即 CREATE删除
  • 该类定义了两个构造函数,一个为空的,一个可用于初始化类型、键和值成员
  • 最后,该类为其成员变量定义 getter 方法

完整源代码见api<中的Event类/代码>项目。

Gradle 构建文件中的更改

要引入 Spring Cloud Stream 及其用于 RabbitMQ 和 Kafka 的绑定器,我们需要添加两个启动器依赖项 称为 spring-cloud-starter-stream-rabbitspring-cloud-starter-流卡夫卡。我们还需要在 product-composite 项目,spring -cloud-stream::test-binder,引入测试支持。以下代码显示了这一点:

dependencies {
  implementation 'org.springframework.cloud:spring-cloud-starter-stream-rabbit'
  implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka'
  testImplementation 'org.springframework.cloud:spring-cloud-stream::test-binder'
}

要指定我们要使用的 Spring Cloud 版本,我们首先为版本声明一个变量:

ext {
    springCloudVersion = "2020.0.3"
}

接下来,我们使用该变量为指定的 Spring Cloud 版本设置依赖管理,如下所示:

dependencyManagement {
    imports {
        mavenBom "org.springframework.cloud:spring-cloud-
        dependencies:${springCloudVersion}"
    }
}

对于完整源代码,请参见每个build.gradle构建文件<一个 id="_idIndexMarker478"> 微服务项目。

将所需的依赖项添加到 Gradle 构建文件中后,我们可以开始学习如何在核心服务中使用事件。

消费核心服务中的事件

为了能够使用核心服务中的事件,我们需要请执行下列操作:

  • 声明使用在核心服务主题上发布的事件的消息处理器
  • 更改我们的服务实现以使用反应式持久层
  • 添加消费事件所需的配置
  • 更改我们的测试,以便他们可以测试事件的异步处理

消费事件的源代码在所有三个核心服务中的结构都是相同的,所以我们将只介绍 product 的源代码服务。

声明消息处理器

创建和删除实体 的 REST API 已替换为 消息处理器 在每个核心微服务中使用每个实体主题的创建和删除事件。为了能够消费已经发布到某个主题的消息,我们需要声明一个 Spring Bean 来实现功能接口 java.util.function.Consumer< /代码>。

product 服务的消息处理器声明为:

@Configuration
public class MessageProcessorConfig {

  private final ProductService productService;

  @Autowired
  public MessageProcessorConfig(ProductService productService)
  {
    this.productService = productService;
  }

  @Bean
  public Consumer<Event<Integer,Product>> messageProcessor() {
    ...

前面的代码,我们可以看出:

  • 该类使用 @Configuration 进行注释,告诉 Spring 在该类中查找 Spring bean。
  • 我们在构造函数中注入 ProductService 接口的实现。 productService bean 包含执行产品实体的实际创建和删除的业务逻辑。
  • 我们将消息处理器声明为实现功能接口 Consumer 的 Spring bean,接受一个事件作为 事件<整数,产品>

Consumer 函数的实现如下所示:

return event -> {

  switch (event.getEventType()) {

    case CREATE:
      Product product = event.getData();
      productService.createProduct(product).block();
      break;

    case DELETE:
      int productId = event.getKey();
      productService.deleteProduct(productId).block();
      break;

    default:
      String errorMessage = "Incorrect event type: " +
        event.getEventType() +
        ", expected a CREATE or DELETE event";
      throw new EventProcessingException(errorMessage);
  }

};

前面的实现执行以下操作:

  • 它将 Event 类型的事件作为输入参数
  • 使用 switch 语句,根据事件类型创建或删除产品实体
  • 它使用注入的 productService bean 来执行实际的创建和删除操作
  • 如果事件类型既不是create也不是delete,会抛出异常

为了确保我们可以将 productService bean 抛出的异常传播回消息系统,我们调用 block() 方法在我们从 productService bean 返回的响应上。这确保消息处理器等待 productService bean 完成其在底层数据库中的创建或删除。如果不调用 block() 方法,我们将无法传播异常并且消息传递系统将无法重新排队失败尝试或可能将消息移动到死信队列;相反,该消息将被静默丢弃。

从性能和可扩展性的角度来看,调用 block() 方法通常被认为是一种不好的做法。但在这种情况下,我们将只并行处理一些传入消息,每个分区一个,如上所述。这意味着我们只会有几个线程同时阻塞,这不会对性能或可伸缩性产生负面影响。

如需完整的源代码,请参阅 product< 中的 MessageProcessorConfig 类/code>、recommendationreview 项目。

服务实现的变化

product Code-In-Text--PACKT-">recommendation 服务已被重写以使用 MongoDB 的非阻塞响应式持久层。例如,创建产品实体如下:

@Override
public Mono<Product> createProduct(Product body) {

  if (body.getProductId() < 1) {
    throw new InvalidInputException("Invalid productId: " +
      body.getProductId());
  }

  ProductEntity entity = mapper.apiToEntity(body);
  Mono<Product> newEntity = repository.save(entity)
    .log(LOG.getName(), FINE)
    .onErrorMap(
      DuplicateKeyException.class,
      ex -> new InvalidInputException
        ("Duplicate key, Product Id: " + body.getProductId()))
    .map(e -> mapper.entityToApi(e));
  return newEntity;
}

从前面的代码可以看出,onErrorMap() 方法用于映射DuplicateKeyException 对我们自己的 InvalidInputException 异常的持久性异常。

对于使用 JPA 的阻塞持久层的 review 服务,创建和删除方法已按照与处理阻塞代码部分。

有关完整的源代码,请参见以下类:

  • product 项目中的 ProductServiceImpl
  • recommendation 项目中的 RecommendationServiceImpl
  • review 项目中的 ReviewServiceImpl

添加消费事件的配置

我们还需要为消息传递系统设置配置,以便能够使用事件。为此,我们需要完成以下步骤:

  1. 我们声明 RabbitMQ 是默认的消息系统,默认的内容类型是 JSON:
    spring.cloud.stream: defaultBinder: 兔子 default.contentType:应用程序/json 
  2. 接下来,我们将消息处理器的输入绑定到特定的主题名称,如下所示:
    spring.cloud.stream: bindings.messageProcessor-in-0: 目的地:产品 
  3. 最后,我们声明 Kafka 和 RabbitMQ 的连接信息:
    spring.cloud.stream.kafka.binder:
      brokers: 127.0.0.1
      defaultBrokerPort: 9092
    
    spring.rabbitmq:
      host: 127.0.0.1
      port: 5672
      username: guest
      password: guest
    
    ---
    spring.config.activate.on-profile: docker
    
    spring.rabbitmq.host: rabbitmq
    spring.cloud.stream.kafka.binder.brokers: kafka
    								

在默认的 Spring 配置文件中,我们指定在没有 Docker 的情况下在 localhost 上使用 IP 地址 127.0.0.1。在 docker Spring 配置文件中,我们指定在 Docker 中运行和使用 Docker Compose 时将使用的主机名,即 rabbitmqkafka

添加到此配置中,消费者配置还指定消费者组、重试处理、死信队列和分区,因为它们在前面的处理消息传递挑战部分中有所描述。

完整源代码见application.yml配置文件="Code-In-Text--PACKT-">产品、推荐审查项目。

测试代码的变化

由于核心服务现在接收用于创建和删除其实体的事件,因此需要更新测试以便它们发送事件而不是调用 REST API,就像它们在前几章中所做的那样.为了能够从测试类调用消息处理器,我们将消息处理器 bean 注入到成员变量中:

@SpringBootTest
class ProductServiceApplicationTests {

  @Autowired
  @Qualifier("messageProcessor")
  private Consumer<Event<Integer, Product>> messageProcessor;

从上面的代码可以看出,我们不仅注入了任何Consumer函数,而且使用了@Qualifier 注解,指定我们要注入名为 Consumer 函数“文本中的代码--PACKT-”>messageProcessor。

为了向消息处理器发送创建和删除事件,我们添加了两个帮助方法,sendCreateProductEventsendDeleteProductEvent,在测试类中:

  private void sendCreateProductEvent(int productId) {
    Product product = new Product(productId, "Name " + productId, productId, "SA");
    Event<Integer, Product> event = new Event(CREATE, productId, product);
    messageProcessor.accept(event);
  }

  private void sendDeleteProductEvent(int productId) {
    Event<Integer, Product> event = new Event(DELETE, productId, null);
    messageProcessor.accept(event);
  }

注意我们在Consumer accept()方法code> 函数接口声明来调用消息处理器。这意味着我们在测试中缩短了消息传递系统并直接调用消息处理器。

用于创建和删除实体的测试已更新为使用这些辅助方法。

有关完整的源代码,请参阅以下测试类:

  • product 项目中的 ProductServiceApplicationTests
  • recommendation 项目中的 RecommendationServiceApplicationTests
  • review 项目中的 ReviewServiceApplicationTests

我们已经看到了在核心微服务中消费事件需要什么。现在让我们看看如何在复合微服务中发布事件。

在复合服务中发布事件

复合服务收到创建和删除复合产品的HTTP请求时,会发布相应的事件到核心服务的主题。为了能够在复合服务中发布事件,我们需要执行以下步骤:

  1. 在集成层中发布事件
  2. 添加发布事件的配置
  3. 更改测试,以便他们可以测试事件的发布

请注意,复合服务实现类不需要更改——它由集成层负责!

在集成层中发布事件

要在集成层发布事件,我们需要:

  1. 根据HTTP请求中的body创建Event对象
  2. 创建一个 Message 对象,其中使用了 Event 对象Event 对象中的有效负载和键字段用作标头中的分区键
  3. 使用帮助类 StreamBridge 发布所需主题的事件

发送创建产品事件的代码如下所示:

  @Override
  public Mono<Product> createProduct(Product body) {
    return Mono.fromCallable(() -> {

      sendMessage("products-out-0",
        new Event(CREATE, body.getProductId(), body));
      return body;
    }).subscribeOn(publishEventScheduler);
  }

  private void sendMessage(String bindingName, Event event) {
    Message message = MessageBuilder.withPayload(event)
      .setHeader("partitionKey", event.getKey())
      .build();
    streamBridge.send(bindingName, message);
  }

前面的代码中,我们可以看到:

  • 集成层实现ProductService createProduct()方法code> 接口,使用辅助方法 sendMessage()。辅助方法采用输出绑定的名称和事件对象。绑定名称 products-out-0 将绑定到 product 服务在下面的配置中。
  • 由于 sendMessage() 使用阻塞代码,当调用 streamBridge ,它在专用调度程序 publishEventScheduler 提供的线程上执行。这与在 review 微服务中处理阻塞 JPA 代码的方法相同。有关详细信息,请参阅处理阻塞代码部分。
  • 辅助方法 sendMessage() 创建一个 Message 对象并设置 payloadpartitionKey 标头如上所述。最后,它使用 streamBridge 对象将事件发送到消息系统,消息系统会将其发布到配置中定义的主题上。

完整的源代码见product-中的ProductCompositeIntegration类复合项目。

添加发布事件的配置

我们还需要为消息系统设置配置,以便能够发布事件;这与我们为消费者所做的类似。将 RabbitMQ 声明为默认消息系统,将 JSON 声明为默认内容类型,将连接信息声明为 Kafka 和 RabbitMQ 与消费者相同。

要声明用于输出绑定名称的主题,我们有以下配置:

spring.cloud.stream:
  bindings:
    products-out-0:
      destination: products
    recommendations-out-0:
      destination: recommendations
    reviews-out-0:
      destination: reviews

使用分区时,我们还需要指定分区键和将使用的分区数量:

spring.cloud.stream.bindings.products-out-0.producer:
  partition-key-expression: headers['partitionKey']
  partition-count: 2

在前面的配置中,我们可以看到:

  • 该配置适用于绑定名称products-out-0
  • 使用的分区键将取自消息头 partitionKey
  • 将使用两个分区

完整源码见application.yml配置文件>产品复合项目。

测试代码的变化

测试异步 事件驱动的微服务本质上是困难的。测试通常需要以某种方式在异步后台处理上同步,以便能够验证结果。 Spring Cloud Stream 以测试绑定器的形式提供支持,可用于验证已发送哪些消息,而无需在测试期间使用任何消息传递系统!

请参阅前面的 Changes in the Gradle build files 部分,了解如何在 product-composite 中包含测试支持 项目。

测试 支持包括一个 OutputDestination 帮助器类,可用于获取已发送的消息在测试期间发送。添加了一个新的测试类 MessagingTests 来运行测试以验证是否发送了预期的消息。让我们来看看测试类中最重要的部分:

  1. 为了能够在测试类中注入 OutputDestination bean,我们还需要从类 TestChannelBinderConfiguration。这是通过以下代码完成的:
    @SpringBootTest @Import({TestChannelBinderConfiguration.class}) 类消息测试{ @自动连线 私有 OutputDestination 目标; 
  2. 接下来,我们声明了几个帮助方法来读取消息并能够清除主题。代码如下所示:
    private void purgeMessages(String bindingName) {
      getMessages(bindingName);
    }
    
    private List<String> getMessages(String bindingName){
      List<String> messages = new ArrayList<>();
      boolean anyMoreMessages = true;
    
      while (anyMoreMessages) {
        Message<byte[]> message = 
          getMessage(bindingName);
    
        if (message == null) {
          anyMoreMessages = false;
    
        } else {
          messages.add(new String(message.getPayload()));
        }
      }
      return messages;
    }
    
    private Message<byte[]> getMessage(String bindingName){
      try {
        return target.receive(0, bindingName);
      } catch (NullPointerException npe) {
        LOG.error("getMessage() received a NPE with binding = {}", bindingName);
        return null;
      }
    }
    								

    前面的代码中,我们可以看到:

    • getMessage() 方法使用 OutputDestination bean,命名为 target
    • getMessages() 方法使用 getMessage() 方法返回一个主题中的所有消息
    • purgeMes​​sages() 方法使用 getMessages() 方法从所有当前消息中清除主题
  3. 每个测试都从使用带有 setup() 方法清除测试中涉及的所有主题开始-PACKT-">@BeforeEach:
    @BeforeEach
      void setUp() {
        purgeMessages("products");
        purgeMessages("recommendations");
        purgeMessages("reviews");
      }
    								
  4. 实际测试可以使用 getMessages() 方法验证主题中的消息。例如,请参阅以下有关创建复合产品的测试:
    @Test
    void createCompositeProduct1() {
    
      ProductAggregate composite = new ProductAggregate(1, "name", 1, null, null, null);
      postAndVerifyProduct(composite, ACCEPTED);
    
      final List<String> productMessages = getMessages("products");
      final List<String> recommendationMessages = getMessages("recommendations");
      final List<String> reviewMessages = getMessages("reviews");
    
      // Assert one expected new product event queued up
      assertEquals(1, productMessages.size());
    
      Event<Integer, Product> expectedEvent =
        new Event(CREATE, composite.getProductId(), new Product(composite.getProductId(), composite.getName(), composite.getWeight(), null));
      assertThat(productMessages.get(0), is(sameEventExceptCreatedAt(expectedEvent)));
    
      // Assert no recommendation and review events
      assertEquals(0, recommendationMessages.size());
      assertEquals(0, reviewMessages.size());
    }
    								

    前面的代码中,我们可以看到一个测试示例:

    1. 首先发出 HTTP POST 请求,请求创建复合产品。
    2. 接下来,获取三个主题的所有消息,每个底层核心服务一个。
    3. 对于这些测试,创建事件的具体时间戳无关紧要。为了能够将实际事件与预期事件进行比较,忽略字段 eventCreatedAt 中的差异,一个名为 IsSameEvent 可以使用。 sameEventExceptCreatedAt() 方法是IsSameEvent 类比较 Event 对象,如果所有字段都相等,则将它们视为相等,除了 eventCreatedAt 字段。
    4. 最后,它验证可以找到预期的事件,并且没有其他事件。

完整源代码,请参见测试类 MessagingTestsproduct-composite 项目中的 -In-Text--PACKT-">IsSameEvent。

运行反应式微服务环境的手动测试

现在,我们拥有完全反应式微服务,包括非阻塞同步 REST API 和事件驱动的异步服务。让我们试试看!

我们将学习如何使用 RabbitMQ 和 Kafka 作为消息代理运行测试。由于 RabbitMQ 可以在有分区和没有分区的情况下使用,我们将测试这两种情况。将使用三种不同的配置,每一种都在单独的 Docker Compose 文件中定义:

  • 在不使用分区的情况下使用 RabbitMQ
  • 使用 RabbitMQ,每个主题有两个分区
  • 使用 Kafka,每个主题有两个分区

但是,在测试这三个配置之前,我们需要添加两个功能才能测试异步处理:

  • 使用 RabbitMQ 时保存事件以供以后检查
  • 可用于监控微服务环境状态的健康 API

保存事件

在对事件驱动的异步服务运行一些测试之后,看看实际发送了哪些事件可能会很有趣。将 Spring Cloud Stream 与 Kafka 一起使用时,即使在消费者处理完事件之后,事件也会保留在主题中。但是,当使用带有 RabbitMQ 的 Spring Cloud Stream 时,事件在成功处理后会被删除。

为了能够查看每个主题上发布了哪些事件,Spring Cloud Stream 被配置为将发布的事件保存在单独的消费者组中,auditGroup , 每个主题。对于 products 主题,配置如下所示:

spring.cloud.stream:
  bindings:
    products-out-0:
      destination: products
      producer:
        required-groups: auditGroup

使用 RabbitMQ 时,这将导致创建额外的队列来存储事件以供以后检查。

完整源码见application.yml配置文件="Code-In-Text--PACKT-">产品复合 项目。

添加健康 API

测试结合使用同步 API 和异步消息传递的微服务系统环境具有挑战性。例如,我们如何知道新启动的微服务环境以及它们的数据库和消息传递系统何时准备好处理请求和消息?

为了更容易知道所有微服务何时准备就绪,我们在微服务中添加了健康 API。健康 API 基于对 Spring Boot 模块 附带的 health endpoints 的支持执行器。默认情况下,如果微服务本身和所有依赖项 Spring引导知道可用。 Spring Boot 知道的依赖关系包括,例如,数据库和消息传递系统。如果微服务本身或其任何依赖项不可用,则健康端点会回答 DOWN(并返回 500 作为 HTTP 返回状态)。

我们还可以扩展健康端点以覆盖 Spring Boot 不知道的依赖项。我们将使用此功能扩展到产品组合的 health 端点,因此它还包括三个核心服务的健康。这意味着产品复合 health 端点将只响应 UP< /code> 自身和三个核心微服务是否健康。这可以由 test-em-all.bash 脚本手动或自动使用,以找出所有微服务及其依赖项何时启动和运行。

ProductCompositeIntegration 类中,我们添加了检查三个核心微服务健康状况的辅助方法,如下:

public Mono<Health> getProductHealth() {
    return getHealth(productServiceUrl);
}

public Mono<Health> getRecommendationHealth() {
    return getHealth(recommendationServiceUrl);
}

public Mono<Health> getReviewHealth() {
    return getHealth(reviewServiceUrl);
}

private Mono<Health> getHealth(String url) {
    url += "/actuator/health";
    LOG.debug("Will call the Health API on URL: {}", url);
    return webClient.get().uri(url).retrieve().bodyToMono(String.class)
        .map(s -> new Health.Builder().up().build())
        .onErrorResume(ex -> Mono.just(new
         Health.Builder().down(ex).build()))
        .log(LOG.getName(), FINE);
}

这个代码类似于我们之前调用核心服务读取API的代码。请注意,健康端点默认设置为 /actuator/health

完整的源代码见product-中的ProductCompositeIntegration类复合项目。

在主应用程序类 ProductCompositeServiceApplication 中,我们使用这些辅助方法来注册使用 Spring Actuator 类的复合健康检查 CompositeReactiveHealthContributor:

@Autowired
ProductCompositeIntegration integration;

@Bean
ReactiveHealthContributor coreServices() {

  final Map<String, ReactiveHealthIndicator> registry = new LinkedHashMap<>();

  registry.put("product", () -> integration.getProductHealth());
  registry.put("recommendation", () -> integration.getRecommendationHealth());
  registry.put("review", () -> integration.getReviewHealth());

  return CompositeReactiveHealthContributor.fromMap(registry);
}

完整源代码见ProductCompositeServiceApplication类-In-Text--PACKT-">产品复合 项目。

最后,在所有四个微服务的 application.yml 配置文件中,我们配置 Spring Boot Actuator 使其执行以下操作:

  • 显示有关健康状态的详细信息,不仅包括 UP DOWN,还有关于它的依赖的信息
  • 通过 HTTP 公开其所有端点

这两个设置的配置如下所示:

management.endpoint.health.show-details: "ALWAYS"
management.endpoints.web.exposure.include: "*"

完整源代码示例见application.yml配置文件PACKT-">产品复合 项目。

警告:这些配置设置在开发过程中很有帮助,但在生产系统的执行器端点中泄露太多信息可能是一个安全问题。因此,计划在生产中尽量减少执行器端点暴露的信息!

这可以通过替换 "*" 来完成,例如, health,info 在上面的 management.endpoints.web.exposure.include 属性的设置中。

有关 Spring Boot Actuator 公开的端点的详细信息,请参阅 https://docs.spring.io/spring-boot/docs/current/reference/html/production-ready-endpoints.html .

可以通过以下命令手动使用健康端点(先不要尝试,等到我们启动了下面的微服务环境!):

curl localhost:8080/actuator/health -s | jq .

这将导致包含以下内容的响应:

读书笔记《第一部分 使用 Spring Boot 开始微服务开发》第7章开发响应式微服务

图 7.11:健康端点响应

前面的输出中,我们可以看到复合服务报告它是健康的,即它的状态是 向上 。在响应结束时,我们可以看到所有三个核心微服务也都报告为健康。

有了健康 API,我们就可以测试我们的反应式微服务了。

在不使用分区的情况下使用 RabbitMQ

在本节中,我们将与 RabbitMQ 一起测试 反应式微服务,但不使用分区。

默认的 docker-compose.yml Docker Compose 文件用于此配置。文件中添加了以下更改:

  • RabbitMQ 已添加,如下所示:
    rabbitmq:
        image: rabbitmq:3.8.11-management
        mem_limit: 512m
        ports:
          - 5672:5672
          - 15672:15672
        healthcheck:
          test: ["CMD", "rabbitmqctl", "status"]
          interval: 5s
          timeout: 2s
          retries: 60
    								
    RabbitMQ的 声明可以:

    • 我们使用 RabbitMQ v3.8.11 的 Docker 镜像,包括管理插件和 Admin Web UI
    • 我们公开了用于连接 RabbitMQ 和 Admin Web UI 的标准端口,567215672
    • 我们添加了一个健康检查,以便 Docker 可以发现 RabbitMQ 何时准备好接受连接
  • 现在,微服务在 RabbitMQ 服务上声明了一个依赖项。这意味着在 RabbitMQ 服务被报告为健康之前,Docker 不会启动微服务容器:
    depends_on:
      rabbitmq:
        condition: service_healthy
    								

要运行手动测试,请执行以下步骤:

  1. 使用以下命令构建并启动系统环境:
    cd $BOOK_HOME/Chapter07 ./gradlew build && docker-compose build &&码头工人组成 -d 
  2. Now, we have to wait for the microservice landscape to be up and running. Try running the following command a few times:
    curl -s localhost:8080/actuator/health | jq -r .status 

    当它返回 UP 时,我们就可以运行我们的测试了!

  3. First, create a composite product with the following commands:
    body='{"productId":1,"name":"product name C","weight":300, "recommendations":[ {"recommendationId":1,"author":"author 1","rate":1,"content":"content 1"}, {"recommendationId":2,"author":"author 2","rate":2,"content":"content 2"}, {"recommendationId":3,"author":"author 3","rate":3,"content":"content 3"} ], "reviews":[ {"reviewId":1,"author":"author 1","subject":"subject 1","content":"content 1"}, {"reviewId":2,"author":"author 2","subject":"subject 2","content":"content 2"}, {"reviewId":3,"author":"author 3","subject":"subject 3","content":"content 3"} ]}' curl -X POST localhost:8080/product-composite -H "Content-Type: application/json" --data "$body" 

    使用 Spring Cloud Stream 和 RabbitMQ 时,它将为每个主题创建一个 RabbitMQ 交换和一组队列,具体取决于我们的配置。让我们看看 Spring Cloud Stream 为我们创建了哪些队列!

  4. Open the following URL in a web browser: http://localhost:15672/#/queues. Log in with the default username/password guest/guest. You should see the following queues:
    读书笔记《第一部分 使用 Spring Boot 开始微服务开发》第7章开发响应式微服务

    图 7.12:队列列表

    对于每个主题,我们可以看到 auditGroup 的一个队列,由对应的核心微服务使用,一个死信队列。我们还可以看到 auditGroup 队列包含消息,正如预期的那样!

  5. Click on the products.auditGroup queue and scroll down to the Get messages section, expand it, and click on the button named Get Message(s) to see the message in the queue:
    读书笔记《第一部分 使用 Spring Boot 开始微服务开发》第7章开发响应式微服务

    图 7.13:查看队列中的消息

    之前的屏幕截图中,请注意 Payload 以及标题 partitionKey,我们 将在下一节中使用它来尝试使用分区的 RabbitMQ。

  6. 接下来,尝试使用以下代码获取产品组合:
    curl -s localhost:8080/product-composite/1 | jq 
  7. 最后,使用以下命令将其删除:
    curl -X DELETE localhost:8080/product-composite/1 
  8. 尝试再次获取已删除的产品。它应该导致 404 - "NotFound" 响应!
  9. 如果您再次查看 RabbitMQ 审计队列,您应该能够找到包含删除事件的新消息。
  10. 通过使用以下命令关闭微服务环境来结束测试:
    docker-compose down 

完成了我们使用没有分区的RabbitMQ的测试。现在,让我们继续并使用分区测试 RabbitMQ。

使用带有分区的 RabbitMQ

现在,让我们试试 Spring Cloud Stream 中的分区支持!

我们为使用 RabbitMQ 准备了一个单独的 Docker Compose 文件,每个主题有两个分区:docker-compose-partitions.yml。它还将为每个核心微服务启动两个实例,每个分区一个。例如,第二个 product 实例配置如下:

  product-p1:
    build: microservices/product-service
    mem_limit: 512m
    environment:
      - SPRING_PROFILES_ACTIVE=docker,streaming_partitioned,streaming_instance_1
    depends_on:
      mongodb:
        condition: service_healthy
      rabbitmq:
        condition: service_healthy

下面是对上述配置的解释:

  • 我们使用与第一个 product 实例相同的源代码和 Dockerfile,但配置不同。
  • 为了让所有微服务实例都知道它们将使用分区,我们将 Spring 配置文件 streaming_partitioned 添加到它们的环境变量 SPRING_PROFILES_ACTIVE
  • 我们使用不同的 Spring 配置文件将两个 product 实例分配给不同的分区。 Spring 配置文件 streaming_instance_0 由第一个产品实例和 streaming_instance_1< /code> 由第二个实例 product-p1 使用。
  • 第二个 product 实例只会处理异步事件;它不会响应 API 调用。由于它具有不同的名称,product-p1(也用作其 DNS 名称),它不会响应对以开头的 URL 的调用http://product:8080

使用以下命令启动微服务环境:

export COMPOSE_FILE=docker-compose-partitions.yml
docker-compose build && docker-compose up -d

中创建一个 复合产品,方法与上一节中的测试相同,但也使用产品 ID 创建一个复合产品设置为 2。如果您查看 Spring Cloud Stream 设置的队列,您将看到每个分区一个队列,并且产品审计队列现在每个包含一条消息;产品 ID 1 的事件被放置在一个分区中,产品 ID 的事件 2 被放置在另一个分区中。如果您在 Web 浏览器中返回 http://localhost:15672/#/queues,您应该会看到如下内容:

读书笔记《第一部分 使用 Spring Boot 开始微服务开发》第7章开发响应式微服务

图 7.14:队列列表

要使用分区结束使用 RabbitMQ 的测试,请使用以下命令关闭微服务环境:

docker-compose down
unset COMPOSE_FILE

我们现在 完成了使用 RabbitMQ 的测试,包括分区和不分区。我们将尝试的最终测试配置是与 Kafka 一起测试微服务。

使用 Kafka,每个主题有两个分区

现在,我们将尝试Spring Cloud Stream的一个非常酷的特性:将消息系统从RabbitMQ更改为Apache Kafka!

这可以通过从 spring.cloud.stream.defaultBinder 属性的值来完成--PACKT-">rabbit 到 kafka。这由 docker-compose-kafka.yml Docker Compose 文件处理,该文件也已将 RabbitMQ 替换为 Kafka 和 ZooKeeper。 Kafka 和 ZooKeeper 的配置如下:

kafka:
  image: wurstmeister/kafka:2.12-2.5.0
  mem_limit: 512m
  ports:
    - "9092:9092"
  environment:
    - KAFKA_ADVERTISED_HOST_NAME=kafka
    - KAFKA_ADVERTISED_PORT=9092
    - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
  depends_on:
    - zookeeper

zookeeper:
  image: wurstmeister/zookeeper:3.4.6
  mem_limit: 512m
  ports:
    - "2181:2181"
  environment:
    - KAFKA_ADVERTISED_HOST_NAME=zookeeper

Kafka 还配置为每个主题使用两个分区,并且像以前一样,我们为每个核心微服务启动两个实例,每个分区一个。有关详细信息,请参阅 Docker Compose 文件 docker-compose-kafka.yml

使用以下命令启动微服务环境:

export COMPOSE_FILE=docker-compose-kafka.yml
docker-compose build && docker-compose up -d

重复 上一节中的 测试:创建两个产品,一个产品 ID 设置为 1 和一个产品 ID 设置为 2

不幸的是,Kafka 没有提供任何可用于检查主题、分区和其中放置的消息的图形工具。相反,我们可以在 Kafka Docker 容器中运行 CLI 命令。

要查看主题列表,请运行以下命令:

docker-compose exec kafka /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper --list

期望输出如下所示:

读书笔记《第一部分 使用 Spring Boot 开始微服务开发》第7章开发响应式微服务

图 7.15:查看主题列表

这是我们在前面的输出中看到的:

  • error 为前缀的主题是死信队列对应的主题。
  • 您不会像 RabbitMQ 那样找到任何 auditGroup。由于事件被 Kafka 保留在主题中,即使在消费者处理之后,也不需要额外的 auditGroup

要查看特定主题中的分区,例如 products 主题,请运行以下命令:

docker-compose exec kafka /opt/kafka/bin/kafka-topics.sh --describe --zookeeper zookeeper --topic products

期望输出如下所示:

读书笔记《第一部分 使用 Spring Boot 开始微服务开发》第7章开发响应式微服务

图 7.16:在 products 主题中查看分区

要查看 特定主题(例如 products 主题)中的所有消息,请运行以下命令:

docker-compose exec kafka /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic products --from-beginning --timeout-ms 1000

期望输出如下所示:

读书笔记《第一部分 使用 Spring Boot 开始微服务开发》第7章开发响应式微服务

图 7.17:查看 products 主题中的所有消息

要查看特定分区中的所有消息,例如 1 -">products 主题,运行以下命令:

docker-compose exec kafka /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic products --from-beginning --timeout-ms 1000 --partition 1

期望输出如下所示:

读书笔记《第一部分 使用 Spring Boot 开始微服务开发》第7章开发响应式微服务

图 7.18:在 products 主题中查看分区 1 中的所有消息

输出将以超时异常结束,因为我们通过为命令指定 1000 毫秒的超时来停止命令。

使用以下命令关闭微服务环境:

docker-compose down
unset COMPOSE_FILE

现在,我们已经了解了Spring Cloud Stream 可以如何用于将消息代理从 RabbitMQ 切换到 Kafka,而无需任何更改源代码。它只需要对 Docker Compose 文件进行一些更改。

让我们继续本章的最后一部分,学习如何自动运行这些测试!

运行反应式微服务环境的自动化测试

为了能够自动而不是手动运行反应式微服务环境的测试,自动化的test-em- all.bash 测试脚本已得到增强。最重要的变化如下:

  • 该脚本使用新的 health 端点来了解微服务环境何时可以运行,如下所示:
    waitForService curl http://$HOST:$PORT/actuator/health 
  • 该脚本有一个新的 waitForMessageProcessing() 函数,在设置好测试数据后调用。其目的只是等待异步创建服务完成测试数据的创建。

要使用测试脚本自动运行 RabbitMQ 和 Kafka 测试,请执行以下步骤:

  1. 使用默认的 Docker Compose 文件运行测试 ,即使用不带分区的 RabbitMQ,使用以下命令:
    取消设置 COMPOSE_FILE ./test-em-all.bash 开始停止 
  2. 使用带有以下命令的 Docker Compose docker-compose-partitions.yml 文件运行 RabbitMQ 的测试,每个主题有两个分区:
    export COMPOSE_FILE=docker-compose-partitions.yml ./test-em-all.bash 开始停止 取消设置 COMPOSE_FILE 
  3. 最后,使用带有以下命令的 Docker Compose docker-compose-kafka.yml 文件使用 Kafka 和每个主题两个分区运行测试:
    export COMPOSE_FILE=docker-compose-kafka.yml ./test-em-all.bash 开始停止 取消设置 COMPOSE_FILE 

在本节中,我们学习了如何使用 test-em-all.bash 测试脚本来自动运行响应式微服务环境的测试,已配置为使用 RabbitMQ 或 Kafka 作为其消息代理。

概括

在本章中,我们了解了如何开发反应式微服务!

使用 Spring WebFlux 和 Spring WebClient,我们可以开发非阻塞同步 API,可以处理传入的 HTTP 请求和发送传出的 HTTP 请求,而不会阻塞任何线程。使用 Spring Data 对 MongoDB 的响应式支持,我们还可以以非阻塞方式访问 MongoDB 数据库,即在等待数据库响应时不会阻塞任何线程。 Spring WebFlux、Spring WebClient 和 Spring Data 依赖 Project Reactor 来提供它们的响应式和非阻塞特性。当我们必须使用阻塞代码时,例如使用 Spring Data for JPA 时,我们可以通过将阻塞代码的处理调度到专用线程池中来封装阻塞代码的处理。

我们还看到了如何使用 Spring Data Stream 开发事件驱动的异步服务,这些服务可以在 RabbitMQ 和 Kafka 上作为消息传递系统运行,而无需对代码进行任何更改。通过做一些配置,我们可以使用 Spring Cloud Stream 中的特性,如消费者组、重试、死信队列和分区来处理异步消息传递的各种挑战。

我们还学习了如何手动和自动测试由反应式微服务组成的系统环境。

这是关于如何在 Spring Boot 和 Spring Framework 中使用基本功能的最后一章。

接下来是对 Spring Cloud 的介绍,以及如何使用它来使我们的服务可用于生产、可扩展、健壮、可配置、安全和有弹性!

问题

  1. 为什么知道如何开发反应式微服务很重要?
  2. 您如何在非阻塞同步 API 和事件/消息驱动的异步服务之间进行选择?
  3. 是什么让事件与消息不同?
  4. 列举消息驱动的异步服务的一些挑战。我们如何处理它们?
  5. Why is the following test not failing?
    @Test void testStepVerifier() { StepVerifier.create(Flux.just(1, 2, 3, 4) .filter(n -> n % 2 == 0) .map(n -> n * 2) .log()) .expectNext(4, 8, 12); } 

    首先,确保测试失败。接下来,更正测试以使其成功。

  6. 使用 JUnit 使用响应式代码编写测试有哪些挑战,我们如何处理它们?