读书笔记《hands-on-reactive-programming-in-spring-5》反应式数据库访问
第 7 章反应式数据库访问
上一章介绍了 Spring 框架家族的一个新成员——Spring WebFlux。这一添加将反应式编程带到了应用程序前端,并支持对各种 HTTP 请求进行非阻塞处理。
在本章中,我们将学习如何使用 Spring Data 模块以响应式方式访问数据。这种能力对于创建一个完全反应式和响应式的应用程序至关重要,该应用程序可以最有效地利用所有可用的计算资源,提供最大的业务价值,同时还需要最低的运营成本 。
即使我们选择的数据库不提供反应式或异步驱动程序,仍然可以使用专用线程池围绕它构建反应式应用程序——本章将介绍如何做到这一点。但是,在响应式应用程序中总是不鼓励阻塞 I/O。
在本章中,我们将介绍以下主题:
- Patterns of data storage and data processing in the modern world
- Pros and cons of synchronous data access
- How Spring Data allows reactive data access and how to use this in reactive applications
- What reactive connectors are available at the moment
- How to adapt blocking I/O to the reactive programming model
现代世界中的数据处理模式
尽管单体软件systems仍然存在,运行并支持我们的许多日常活动,但大多数新系统的设计目的是——或者至少在某个时候过渡到微服务。微服务现在可能是现代应用程序最主要的架构风格,尤其是 云原生应用程序。在大多数情况下,这种方法允许软件产品的快速开发周期。同时,它还为更具成本效益的底层基础设施(服务器、网络、备份等)提供了机会,尤其是在依赖 AWS、Google Cloud Platform 或 Pivotal Cloud Foundry 等云提供商时。
笔记
有关云原生应用程序的更多信息,请参阅 云原生计算基金会(CNCF) https://cncf.io/about/charter. 第 10 章介绍了在响应式编程上下文中云原生应用程序的更多优缺点, < span class="emphasis">最后,释放它!
现在,我们将在微服务、可能的策略、实现方法的上下文中概述数据存储的基础知识,以及与数据持久性相关的一些建议。
领域驱动设计
领域驱动设计 (DDD) 作者:Eric Evans (Addison-Wesley, 2004) 应该在每个软件工程师的书架上占据一席之地。这是 因为它定义并形式化 成功的微服务架构的重要理论基础。 DDD 建立了一个通用词汇表(即上下文、领域、模型和普遍存在的语言),并制定了一组维护模型完整性的原则。 DDD 最重要的后果之一是,根据 DDD 定义的单个有界上下文通常映射到单独的微服务中,如下所示:
图 7.1 由实施领域驱动设计和领域驱动设计提炼
由于 DDD 非常关注业务核心领域, 尤其是表达、创建和检索领域模型的工件,因此以下对象通常会在本章中被引用——entity, 值对象、聚合、存储库。
笔记
要了解有关 DDD 概念的更多信息,请阅读以下文章:http://dddcommunity.org/resources /ddd_terms。
在使用 DDD 实现应用程序期间,如果服务中存在这样的层,则应将上述对象映射到应用程序持久层。这样的领域模型为逻辑和物理数据模型奠定了基础。
微服务时代的数据存储
微服务架构引入的主要与持久性相关的变化可能是强烈鼓励 不 在服务之间共享数据存储。这意味着每个逻辑服务都拥有并管理其数据库(如果它需要数据库),理想情况下,没有其他服务可以以不同于服务 API 调用的方式访问数据。
解释这种分离的所有原因超出了本书的范围,但最重要的原因如下:
- The ability to evolve different services separately, without tight coupling on a database schema
- The potential for more precise resource management
- The chance of horizontal scalability
- The possibility to use the best fit persistence implementation
考虑下图:
图 7.2 每个服务的独立数据库
在物理级别,存储分离可以通过几种不同的方式实现。最简单的方法是为所有服务拥有一个数据库服务器和一个数据库,但使用单独的模式(每个微服务的模式)来划分它们。这样的配置可以很容易地实现,需要最少的服务器资源,并且在生产中不需要太多的管理,所以它在应用程序开发的第一阶段非常有吸引力。可以使用数据库访问控制规则强制执行数据分离。这样的做法很容易实现,但同时也很容易被破解。由于数据存储在同一个数据库中,开发人员有时很想编写一个查询来检索或更新属于多个服务的数据。通过仅损害一项服务来损害整个系统的安全性也更容易。下图显示了上述系统的设计:
图 7.3 每个服务的架构
服务可能共享单个数据库服务器,但具有具有不同访问凭据的不同数据库。这种方法改进了数据分离,因为编写能够访问外部数据的单个查询要困难得多。但是,这也使备份例程变得有点复杂。下图描述了这一点:
图 7.4 每个服务的数据库
每个服务都可能有它的数据库服务器。这种方法需要更多的管理,但为细粒度调整数据库服务器以满足具体服务的需求提供了一个良好的起点。此外,在这种情况下,可以仅 垂直和水平 扩展 需要这种可扩展性的数据库。下图显示了这一点:
图 7.5 每个服务的数据库服务器
在实现一个软件系统时,所有前面提到的技术可以 很容易 根据实际需求 系统。以下架构显示了这样一个系统的设计:
图 7.6 持久性策略的混合组合
此外,可以有不同的数据库服务器实例,我们现在可以一起使用不同的数据库引擎(SQL 和 NoSQL)来获得最佳结果。这种方法被称为多语言持久性。
多语言持久性
2006 年, Neal Ford 提出了术语多语言编程 .这表达了这样的想法,即软件系统可以用不同的语言混合编写,以便从最好的语言中获得最显着的提升——适合业务或技术环境的语言。之后,引入了许多新的编程语言,目的是在特定领域做到最好,或者擅长多个领域。
与此同时,在数据持久性领域发生了另一种类似的思维转变。这导致人们质疑如果不同的应用程序部分根据业务或技术要求使用不同的持久性技术会发生什么。例如,为分布式 Web 应用程序存储 HTTP 会话和在社交网络上存储朋友图需要不同的操作特性,因此需要不同的数据库。如今,一个系统同时拥有两种或多种不同的数据库技术是相当标准的。
从历史上看,大多数关系数据库管理系统 (RDBMSes)是基于相同的 ACID 原则构建,并提供非常相似的 SQL 方言用于与存储进行通信。一般而言,RDBMS 适用于广泛的应用程序,但对于许多常见用例(例如,存储图、内存存储和分布式存储),它们很少具有最佳性能和操作特性。相比之下,NoSQL 数据库出现 最近有更广泛的基本原则,即使大多数 NoSQL 数据库不能有效地用作通用数据存储,也可以为常见用例提供更好的功能。下图描述了这一点:
图 7.7 每个服务都使用最适合其需求的持久化技术
此外,多语言持久性是以复杂性为代价的。每一种新的存储机制都会引入新的 API 和需要学习的范式、需要开发或采用的新客户端库,以及需要在开发和生产中解决的一组新标准问题。此外,对 NoSQL 数据库的错误使用可能意味着对服务的完全重新设计。使用 right 持久性技术(SQL 或 NoSQL)应该会让这更舒服,但挑战不会消失。
Spring Framework 有单独的子项目专门用于数据持久性,称为 Spring Data (http://projects .spring.io/spring-data)。本章的其余部分描述了 Spring Data 中可用的不同设计方法和数据库连接器,特别是针对响应式编程模式以及它们如何改变应用程序访问存储在多语言持久层中的数据的方式。
数据库即服务
在一个设计合理的microservice架构中,所有的服务都是无状态的,所有的状态都存储在知道如何管理的特殊服务中数据持久化。在云环境中,无状态服务可实现高效的可扩展性和高可用性。然而,管理和扩展 database 服务器要困难得多,尤其是那些不是为云设计的服务器。大多数 云提供商通过使用他们的数据库即服务(DBaaS ) 解决方案。此类存储解决方案可能是普通数据库(MySQL、PostgreSQL 和 Redis)的定制版本,也可能是从头开始设计为仅在云中运行(AWS Redshift、Google BigTable 和 Microsoft CosmosDB)。
通常,云存储或数据库使用的算法按以下方式工作:
- 客户端发出访问数据库或文件存储的请求 (通过管理页面或 API)。
- 云提供商授予对 API 或服务器资源的访问权限,这些资源可用于数据持久性。同时,客户端不知道甚至不关心提供的 API 是如何实现的。
- 客户端使用存储 API 或提供访问凭据的数据库驱动程序。
- 云提供商根据客户端订阅计划、存储的数据大小、查询频率、并发连接或其他特征向客户端收费。
通常,这种方法允许客户(在我们的例子中是软件开发人员)和云提供商专注于他们的主要目标。云提供商实施了存储和处理客户数据的最有效方式,最大限度地减少了底层基础设施的支出。同时,客户端专注于应用程序的主要业务目标,不会花时间配置数据库服务器、复制或备份。这种关注点分离对客户来说并不总是最有利可图的,甚至可能根本不可能。然而,当相关时,它有时允许我们只用少数工程师构建成功且广泛使用的应用程序。
笔记
在其他用例中,Foursquare 的应用程序每月有超过 5000 万人使用 主要是使用 AWS 技术堆栈构建的,即用于云托管的 Amazon EC2、用于存储图像和其他数据的 Amazon S3 以及 Amazon Redshift 作为数据库。
一些最知名的云原生数据存储和数据库服务如下:
- AWS S3 provides key-value storage through web service interfaces (REST API or AWS SDK). This is designed for storing files, images, backups, or any other information that could be represented as a bucket of bytes.
- AWS DynamoDB is a fully managed proprietary NoSQL database that provides synchronous replication across multiple data centers.
- AWS Redshift is a data warehouse built on top of technology for parallel processing (MPP). This enables analytics workloads on big data.
- Heroku PostgreSQL as a Service is a PostgreSQL database that is fully managed by the Heroku cloud provider, allowing shared and exclusive database servers for applications deployed to the Heroku cluster.
- Google Cloud SQL is a fully-managed PostgreSQL and MySQL database provided by Google.
- Google BigTable is compressed, high performance, and proprietary data storage designed to handle massive workloads at a consistent low latency and high throughput.
- Azure Cosmos DB is Microsoft's proprietary globally distributed multi-model database with a few different APIs, including MongoDB driver-level protocol support.
跨微服务共享数据
在实际业务系统中,为了处理客户端请求,通常需要查询两个或多个服务拥有的数据。例如,客户可能希望查看他们的所有订单以及与他们的订单相对应的付款状态。在微服务架构之前,这可以通过单个连接查询来实现,但现在这违反了最佳实践。为了处理多服务请求,需要实现一个适配器服务来查询订单和支付服务,应用所有需要的转换,并将聚合结果返回给客户端。此外,很明显,如果两个服务通信很多或高度依赖于彼此,它们可能是合并为一个服务的正确候选者(如果这种服务合并不会损害域驱动设计)。下图描述了这一点:
图 7.8 从 Order 服务和 Payment 服务聚合数据的适配器服务
阅读策略非常简单,但要同时实施和更新需要几个服务的策略要困难得多。假设客户想要下订单,但只能 验证适当的库存余额和付款信息。每个服务都有自己的数据库,因此在一个工作流中,一个业务事务涉及两个或多个微服务和数据库。 解决这样的问题有几种方法,但最流行的两种方法是 分布式事务和事件驱动架构。
分布式事务
分布式事务是一个事务,它更新两个或多个联网计算机系统上的数据。换句话说,几个服务一致同意某个动作是否发生。在实践中,大多数数据库系统使用强、严格和两阶段锁定来确保全局可串行化。
服务经常使用分布式事务以原子方式更新数据存储。它们 经常用于单体应用程序中 以确保对不同数据存储的可靠操作。这有助于从故障中正确恢复 同时。但是,现在不鼓励在多个微服务之间使用分布式事务。这有几个原因,但最重要的原因如下:
- A service that allows distributed transactions requires an API that supports two-phase commit and is not very trivial to implement
- Microservices that are involved in a distributed transaction are tightly coupled, which is always discouraged in a microservices architecture
- Distributed transactions do not scale, limiting system bandwidth and consequently reducing the scalability of the system
事件驱动架构
在微服务环境中实现分布式 business 事务的最佳方式是通过事件驱动架构,我们已经探索了一个到目前为止,这本书有几次。
如果需要更改系统状态,则第一个服务会更改其自己数据库中的数据,并且同一内部事务将事件发布到消息代理。因此,即使涉及事务,它们也不会跨越服务的边界。第二个服务注册对所需事件类型的订阅,接收事件并相应地更改其存储,以及 可能 发送事件。服务不阻塞在一起,互不依赖;系统中存在的唯一耦合是它们交换的消息。相比之下,对于分布式事务,事件驱动架构允许系统继续进行,即使在第一个服务动作时第二个服务没有运行。这一特性非常重要,因为它直接影响系统的弹性。分布式事务要求所有相关组件(微服务)都必须可用,并且必须在事务的整个持续时间内正确运行才能使其继续进行。一个系统拥有的微服务越多,或者微服务在分布式事务中的参与范围越广,系统就越难以进步。
与以前一样,当两个服务以闲聊的方式进行大量通信时,可能会考虑将它们合并。此外,可以使用事件来实现对少数服务进行多次更新的适配器服务。
最终一致性
让我们回过头来分析分布式事务在软件系统中的作用。很明显,我们使用分布式事务来确定系统状态,或者换句话说,我们消除了系统中某些状态可能不一致的不确定性。然而,这种消除不确定性的要求非常严格。 实施领域驱动设计的作者 Vaughn Vernon 提出将不确定性嵌入到领域模型中。据他说,如果很难保护系统免受不一致状态的影响,并且无论我们如何努力对抗它仍然会出现不一致,那么接受不确定性并将其嵌入到常规业务工作流程中可能是有益的。
例如,我们的系统仍然可以通过引入一个名为 verifying payment info 的新状态来创建没有经过验证的付款信息的订单。这个新事件将一个不确定的情况(支付信息可能有效或无效)转换为一个单独的业务步骤,该步骤可能被占用有限的时间(直到付款信息得到验证)。使用这种方法,我们不需要我们的系统始终保持一致。相反,需要确保系统对每个业务交易的状态有一致的看法。这种未来的一致性称为最终一致性。 以下 图描述了这一点:
图 7.9 Order 和 Payment 服务都更新私有数据库并通过消息传递工作流状态
通常,最终的一致性保证足以构建一个成功推进其操作的健全系统。此外,任何分布式系统都必须处理最终一致性,以便可用(处理用户请求)和分区容错(在组件之间的网络中断中幸存)。
SAGA 模式
分布式 事务最流行的模式之一,尤其是在微服务 世界,被称为SAGA模式。这是 1987 年引入的,用于管理数据库中的长期事务。
saga 由一些小事务组成,每个事务都是其微服务的本地事务。在这里,外部请求启动 saga,然后它启动第一个小事务,成功完成后触发第二个事务,依此类推。如果一个交易在中间没有成功,它会触发对之前交易的补偿动作。实现该模式的主要方法有两种——基于事件的编排和 通过协调器进行编排服务。
事件溯源
为了处理通过微服务应用程序的事件,微服务可以使用 事件采购模式。事件溯源将业务实体的状态保存为一系列状态变化的事件。例如,银行账户可以表示为初始金额和一系列存款/取款操作。有了这些信息,我们不仅可以通过重播更新事件来计算当前帐户状态,还可以提供实体更改的可靠审计日志,并允许查询确定实体在过去任何时间点的状态。通常,实现事件溯源的服务提供允许其他服务订阅实体更新的 API。
为了优化计算当前状态所需的时间,应用程序可以定期构建和保存快照。为了减少存储大小,可以删除所选快照之前的事件。在这种情况下,很明显 更新事件的整个 历史的一部分丢失了。
银行账户 111-11 事件日志:
日期 |
动作 |
金额 |
2018-06-04 22:00:01 |
创造 |
$0 |
2018-06-05 00:05:00 |
订金 |
50 美元 |
2018-06-05 09:30:00 |
提取 |
10 美元 |
2018-06-05 14:00:30 |
订金 |
20 美元 |
2018-06-06 15:00:30 |
订金 |
115 美元 |
2018-06-07 10:10:00 |
提取 |
40 美元 |
当前余额:135 美元
尽管它很简单,但由于其不熟悉且略微陌生的编程方法以及学习曲线,事件溯源并不经常使用。此外,由于不断的状态重新计算,事件溯源不允许有效的查询,尤其是在查询很复杂的时候。在这种情况下,命令查询职责分离可能会有所帮助。
命令查询职责分离
Command Query Responsibility Segregation(CQRS)常用同时 事件溯源。 CQRS 由两部分组成:
- The writing part receives state-update commands and stores them in an underlying event store, but does not return an entity state.
- The reading part does not change the state and returns it for a requested query type. State representations for distinct queries are stored in views, which are recalculated asynchronously after update events are received as commands.
图 7.10 订单服务的 CQRS 实现。写入部分存储更新命令,读取部分异步计算预期查询的视图
CQRS 模式允许软件系统以流式方式处理大量数据,同时,它可以快速响应有关当前系统状态的各种查询。
无冲突的复制数据类型
我们的 application 越大,它必须处理的数据就越多,即使对于具有明确定义范围的单个微服务也是如此。正如我们之前提到的,事务不能很好地扩展,并且随着应用程序的增长,即使在一个微服务的边界内也很难保持全局状态的一致性。因此,出于性能和系统可扩展性的考虑,我们可能允许不同的服务实例同时更新数据,而无需全局锁或事务一致性。这种方法称为乐观复制并允许数据副本并行发展< /span> 可能存在的不一致应<span class="strong">解决 稍后。在这种情况下,副本之间的一致性会在副本合并时重新建立。此时,必须解决冲突,但这通常意味着必须恢复某些更改,从用户的角度来看,这可能是不可接受的。但是,也有一些具有数学属性的数据结构可以确保 merge 过程总是成功 。此类数据结构称为无冲突复制数据类型 (CRDT< /跨度>)。
CRDT 描述的数据类型可以跨多个计算单元复制,无需任何协调即可同时更新,然后合并以获得一致的状态。这个概念由 Marc Shapiro、Nuno Preguica、Marek Zawirski 和 Carlos Baquero 在 2011 年进行了描述。在撰写本文时,CRDT 有一些数据类型,例如 Grow-only Counter、Grow-only Set、Two -Phase Set、Last-Write-Wins-Element Set 以及其他一些只能 涵盖 典型业务工作流子集的集合。然而,CRDT 仍然被证明对于协作文本编辑、在线聊天和在线赌博非常有用。 SoundCloud 音频分发平台使用 CRDT,Phoenix Web 框架使用 CRDT 实现实时多节点信息共享,而微软的 Cosmos DB 使用 CRDT 写入多主数据。 Redis database 还以 无冲突复制数据库(CRDB)。
作为数据存储的消息系统
基于事件溯源的想法,我们可以得出结论一个 消息具有消息持久存储的代理可能会减少对单个微服务的专用数据库的需求。实际上,如果所有实体更新事件(包括实体快照)都在消息代理中存储了足够长的时间并且可以随时重新读取,则系统的整个状态可能仅由该事件定义。在启动期间,每个服务可能会读取最近的事件历史记录(直到最后一个快照)并重新计算实体在内存中的状态。因此,服务可能仅通过处理新的更新命令和读取查询以及不时生成实体快照并将其发送到代理来运行。
Apache Kafka 是一种流行的分布式消息代理,具有可靠的持久层,可以用作系统中的主要数据存储,也可能是唯一的数据存储。
正如我们所看到的,现在多语言持久性和基于消息代理的事件驱动架构经常被串联使用,以在一个高度易变、可扩展、不断变化的软件系统中实现可靠的复杂工作流。 本章的其余部分是重点介绍 Spring Framework 提供的持久化机制,而 第 8 章, Scaling借助 Cloud Streams, 揭示了 Spring 生态系统中可以使用哪些技术来实现基于事件驱动架构的高效应用程序。
数据检索的同步模型
要了解响应式持久性的所有好处和缺陷,首先我们必须回顾一下应用程序如何在前反应时代。我们还必须了解客户端和数据库在发出和处理查询时如何通信,这种通信的哪些部分可以异步完成,以及哪些部分可以从应用反应式编程模式中受益。由于数据库持久性由几层抽象组成,我们将遍历所有这些层,描述它们并尝试响应式装备。
用于数据库访问的有线协议
有多种类型的数据库称为嵌入式数据库。此类数据库在应用程序进程本身内部运行,不需要通过网络进行任何通信。对于嵌入式数据库,没有硬性要求要有有线协议,即使有些已经或可能作为嵌入式模式或作为单独的服务运行。在本章的后面,我们将在几个示例中使用 H2 嵌入式数据库。
但是,大多数软件使用在不同服务器(或不同容器)上的不同进程中运行的数据库。应用程序使用一个名为 数据库驱动程序的专用客户端库来通信 与外部数据库。此外,有线协议定义了数据库驱动程序和数据库本身的通信方式。它定义了客户端和数据库之间发送的消息顺序的格式。在大多数情况下,有线协议与语言无关,因此 Java 应用程序可以查询用 C++ 编写的数据库。
由于有线协议通常设计为在 TCP/IP 上工作,因此无需阻塞有线协议。与同步 HTTP 通信一样,协议本身并没有阻塞,它是客户端在等待结果的同时决定阻塞。此外,TCP 是一种异步协议,通过滑动窗口实现的流量控制来支持背压。但是,滑动窗口 approach 已针对通过网络发送字节块进行了调整,可能无法反映应用程序背压的需求以最好的方式。例如,当从数据库接收行时,更自然地以行数请求数据处理的下一部分,而不是依赖于定义网络缓冲区大小的系统设置。当然,有线协议可能有意使用另一种机制,甚至是多种机制的组合来实现背压,但必须记住,TCP 机制也一直在幕后工作。
也可以使用更高级的协议作为数据库有线协议的基础。例如,我们可能会使用 HTTP2、WebSockets、gRPC 或 RSocket。 第 8 章, 使用 Cloud Streams 扩展,对 RSocket 和 gRPC 协议进行了简要比较。
除了背压问题之外,还有不同的方法可以在客户端和数据库之间传递大数据集。例如,客户端插入数万行数据,或者分析查询结果包含数百万行。为了简单起见,让我们只考虑后一个用例。一般来说,有几种方法可以传达这样的结果集:
- Calculate the entire result set on the database side, putting the data into a container and sending the container entirely as soon as the query finishes. This approach does not imply any logical backpressure and requires huge buffers on the database side (and also potentially on the client side). Furthermore, the client receives its first results only after the whole query is executed. Such an approach is easy to implement. Furthermore, the query execution process does not last too long and may cause less contention with updated queries happening at the same time.
- Send result set in chunks as the client requests them. The query may be executed entirely, and results may be stored in a buffer. Alternatively, the database may execute the query only to the point where it fills one or a few requested chunks and continues execution only after it has communicated the client's demand. This way of operating may require fewer memory buffers, returns the first rows when the query is still running, and makes it possible to propagate logical backpressure or query cancelation.
- Send results as a stream as soon as such results are obtained during the query execution. On top of that, the client may also inform the database about the demand and propagate logical backpressure that may, in turn, impact the query execution process. Such an approach requires almost no additional buffers, and the client receives the first row of the result as soon as it is possible. However, this way of communicating may under-utilize the network and CPU due to a very chatty manner of communication and frequent system calls.
下图显示了分块结果流的交互流:
图 7.11 使用块迭代查询结果
一般来说,不同的数据库 在其有线协议中实现 一种或多种方法 。例如,MySQL 知道如何将数据作为一个整体或作为一个流逐行发送。同时,PostgreSQL数据库有一个概念称为portal,它允许客户端传播准备接收的尽可能多的数据行。上图描述了 Java 应用程序如何使用这种方法。
在这个级别上,一个设计良好的数据库有线协议可能已经具备反应所需的所有特性。同时,即使是最直接的协议也可能被包含一个反应式驱动程序,该驱动程序可能使用 TCP 控制流进行背压传播。
数据库驱动程序
数据库驱动程序是一个库,它使数据库有线协议适应语言结构,例如方法调用、回调或潜在的反应流。在关系数据库的情况下,驱动程序通常实现语言级别的 API,例如用于 Python 的 DB-API 或用于 Java 的 JDBC。
以同步 阻塞方式编写的软件使用相同的数据访问方法也就不足为奇了。此外,通常通过驱动程序与外部数据库的通信与与外部 HTTP 服务的通信没有什么不同。例如,Apache Phoenix JDBC 驱动程序基于 Apache Calcite 框架的 Avatica 组件,并使用 JSON 或 HTTP 上的协议缓冲区。因此,理论上,我们也可以将响应式设计应用于数据库通信协议并获得非常相似的好处,就像 Spring WebFlux 模块中的响应式 WebClient
。下图显示,从网络通信的角度来看,HTTP 请求和数据库查询非常相似:
图7.12 阻塞HTTP请求和数据库请求的类似阻塞IO行为
通常,数据库驱动程序的阻塞特性是由上层 API 决定的,而不是由有线协议决定的。因此,实现具有适当语言级 API 的反应式数据库驱动程序应该不难。本章稍后将介绍此类 API 的候选者。同时,NoSQL 数据库驱动程序没有既定的语言级 API 可以实现,因此可以自由地异步或响应式地实现自己的 API。例如,MongoDB、Cassandra 和 Couchbase 决定采用这条路线,它们现在提供异步或响应式驱动程序。
JDBC
1997 年首次发布 Java 数据库连接 (JDBC) 此后一直定义应用程序如何与数据库通信(主要关系), 为数据访问提供统一的 API一个Java平台。最新的 API 修订版 4.3 于 2017 年发布,并包含在 Java SE 9 中。
JDBC 允许多个数据库客户端 drivers 存在并被同一个应用程序使用。 JDBC Driver Manager 负责正确注册、加载和使用所需的驱动程序实现。加载驱动程序时,客户端可以使用适当的访问凭据创建连接。 JDBC 连接使得初始化和执行语句成为可能,例如 SQL 的 SELECT
,CREATE
,插入
、更新
和DELETE
。更新数据库状态作为执行结果的语句返回一些受影响的行,查询语句返回java.sql.ResultSet
,它是result.ResultSet
是很久以前设计的,有一个奇怪的 API。例如,枚举一行中的列的索引从1
开始,而不是从0
。
ResultSet
接口设计用于向后迭代,甚至是随机访问,但这种级别的兼容性要求驱动程序在允许任何处理之前加载所有行。为简单起见,我们假设 ResultSet
类似于结果行的简单迭代器。这种假设允许底层实现对分块的结果集进行操作,并按需从数据库中加载批次。任何底层异步实现都必须包装到 JDBC 级别的同步阻塞调用中。
在性能方面,JDBC 允许对非选择查询进行批处理。这应该允许在更少的网络请求中与数据库进行通信。但是,由于 JDBC 被设计为同步和阻塞的,因此在处理大型数据集时这无济于事。
尽管 JDBC 被设计为业务逻辑级 API,但它操作 使用表、行和列,而不是 span class="emphasis">entities 和 aggregates 作为领域驱动设计的建议。因此,如今,JDBC 被认为太低级而无法直接使用。为此,Spring 生态系统具有 Spring Data JDBC 和 Spring Data JPA 模块。还有很多完善的库封装了 JDBC 并提供了更令人愉快的 API 来使用。这种库的一个例子是Jdbi
。它不仅提出了流畅的 API,而且与 Spring 生态系统有很好的集成。
连接管理
现代应用程序很少直接创建 JDBC 连接。 连接池更常见。这背后的原因很简单——建立一个新的connection成本很高。因此,以允许重用的方式管理 connections 缓存是明智的。创建连接的成本可能来自两个方面。首先,连接发起过程可能需要客户端认证和授权,这会占用宝贵的时间。其次,一个新的连接可能会花费数据库一大笔钱。例如,每当建立新连接时,PostgreSQL 都会创建一个全新的进程(而不是线程!),这在功能强大的 Linux 机器上可能需要数百毫秒。在撰写本文时,Java 平台最常用的连接池是 Apache Commons DBCP2、C3P0、Tomcat JDBC 和 HikariCP。 HikariCP 被认为是 Java 世界中最快的连接池。
请注意,即使连接池广泛用于 JDBC 连接,它也不是数据库通信的固有部分。例如,Oracle 数据库驱动程序允许连接多路复用,这允许我们通过单个网络连接 漏斗 多个逻辑连接。当然,这种支持不仅由驱动程序启用,而且由有线协议和数据库实现本身启用。
使关系数据库访问反应
由于 JDBC 是 Java 世界中用于数据访问的 primary 语言级 API(至少对于关系数据源而言),它塑造建立在它之上的所有抽象级别的行为。之前,我们展示了不建议在反应式应用程序中使用阻塞 API,因为它们限制了应用程序的可扩展性。因此,对于我们来说,拥有一个适当的语言级数据库访问 API 以用于反应式应用程序至关重要。不幸的是,没有简单的解决方案可以为此目的稍微调整 JDBC。目前,有两个很有前途的 API 草案可能适合这一领域,我们将在本章后面讨论它们。下图描述了制作反应式 JDBC API 所需的内容:
图 7.13 当前的 JDBC 栈和潜在的响应式替换
弹簧 JDBC
为了简化环绕原始JDBC的麻烦,Spring提供 Spring JDBC 模块,它很老但描述得很好。该模块提供了几个版本的JdbcTemplate
类,可帮助执行查询并将关系行映射到实体中。它还处理资源的创建和释放,有助于省略常见错误,例如忘记关闭准备好的语句或连接。JdbcTemplate
还捕获 JDBC 异常并将它们转换为通用org.springframework.dao
异常。
假设我们在 SQL 数据库中有书籍集合,实体由以下 Java 类表示:
使用JdbcTemplate
和一个通用的BeanPropertyRowMapper
, 我们可以通过以下方式创建一个Spring存储库:
或者,我们可以提供我们自己的映射器类来指导 Spring 如何将一个 ResultSet
转换为域实体:
让我们使用BookMapper
类来实现BookJdbcRepository.findAll()
方法:
JdbcTemplate
的另一项改进是由 NamedParameterJdbcTemplate
类实现的。这增加了对使用人类可读名称而不是 ?
占位符传递 JDBC 参数的支持。因此,准备好的 SQL 查询及其对应的 Java 代码可能如下所示:
这是一个经典的准备好的 SQL 语句:
这似乎只是一个很小的改进,但命名参数比索引提供了更好的代码可读性,尤其是当查询需要六个参数时。
总之,Spring JDBC 模块由高级抽象使用的实用程序、帮助类和工具组成。更高级别的 API 不限制 Spring JDBC 模块,因此它可以相对容易地吸收所需的响应式支持,因为底层 API 也支持它。
弹簧数据 JDBC
Spring Data JDBC 是 Spring Data 系列中一个相当新的模块。它旨在简化基于 JDBC 的存储库的实现。 Spring Data 存储库,包括基于 JDBC 的存储库,由 < em>领域驱动设计 作者:Eric Evans。这意味着我们建议每个聚合根都有一个存储库。 Spring Data JDBC 具有用于简单聚合的 CRUD 操作,支持 @Query
注解和实体生命周期事件。
笔记
注意——Spring Data JDBC and Spring JDBC 是不同的模块!
要使用 Spring Data JDBC,我们必须修改 Book
实体并应用org.springframework.data.annotation.Id
注解到 id
字段。存储库要求实体具有唯一标识符,因此重构以在存储库中使用的Book
实体如下所示:
现在让我们定义BookRepository
接口,从CrudRepository<Book, Integer>
派生它:
- 通过扩展
CrudRepository
,我们的图书存储库获得了十几个基本CRUD操作的方法,例如save(...)
,saveAll(...)
,findById(...)
,deleteAll()< /代码>。
- 它通过提供在
@Query
注释中定义的自定义 SQL 来注册一个自定义方法来查找具有最长标题的书籍。然而,与 Spring JDBC相比,我们看不到任何ResultSet
转换。JdbcTemplate
不是必需的,我们唯一需要编写的就是一个接口。 Spring Framework 生成实现,处理许多陷阱。作为findByLongestTitle
方法(2.1)
的结果,存储库返回一个列表 container,所以客户端只有在整个查询结果到达时才解除阻塞。
- 或者,存储库可能会返回一个
Stream
书籍,因此当客户端调用findByShortestTitle
方法(3.1)
,根据底层实现,API 可能允许在数据库仍在执行查询时处理第一个元素。当然,这只是底层实现和数据库本身支持这种操作模式的情况。 - 通过
findBookByTitleAsync
方法(4.1)
,存储库利用了 Spring Framework 的异步支持。 方法返回CompletableFuture
所以客户端的线程不会被阻塞等待结果。不幸的是,由于 JDBC 的阻塞方式,一个底层线程仍然必须被锁定。 - 此外,还可以像在
findBooksByIdBetweenAsync
CompletableFuture和Stream
>方法(5.1)
。这样,在第一行到达之前,客户端的线程不应该被阻塞,然后结果集可能会被分块遍历。不幸的是,对于执行的第一部分,必须阻塞底层线程,并且稍后在检索下一个数据块时会阻塞客户端线程。这样的行为是我们可以通过 JDBC 实现的最好的,并且没有响应式支持。
为了通知 Spring 需要使用 Spring Data JDBC 生成BookRepository
实现,我们必须向 Spring Boot 应用程序添加下一个依赖项:
还需要在应用程序配置中添加 @EnableJdbcRepositories
注解。在底层,Spring Data JDBC使用Spring JDBC和< code class="literal">NamedParameterJdbcTemplate,之前讨论过。
Spring Data JDBC是一个非常小的模块,它提供了一种为小型微服务构建简单持久层的便捷方法。但是,它被有意设计为简单,而不是针对 ORM 方面,例如缓存、实体的延迟加载和复杂的实体关系。出于这些目的,Java 生态系统有一个单独的规范,称为 Java Persistence API(JPA< /strong>)。
使 Spring Data JDBC 反应式
Spring Data JDBC 是一个更大的项目 Spring Data Relational 的一部分。 Spring Data JDBC 需要 JDBC,这是一个完全阻塞的 API,不适合完全反应式堆栈。在撰写本文时,Spring Data 团队开发了 R2DBC 规范,该规范允许驱动程序提供与数据库的完全反应式和非阻塞集成。这些努力可能会在 Spring Data R2DBC 模块中采用,该模块将成为 Spring Data Relational 项目的一部分。下图显示了 Spring Data Relational 的潜在反应堆栈:
图 7.14 当前的 Spring Data JDBC 栈和潜在的响应式替换
JPA
JPA第一次出现是在2006年(最新版本2.2,2013年发布,有时也叫JPA2),以及它旨在描述 Java 应用程序中的关系数据管理。如今,JPA 是定义应用程序如何组织持久层的标准。它由 API 本身和 Java Persistence Query Language (JPQL)。 JPQL 是一种类似于 SQL 的平台独立语言,它通过 存储库而不是数据库查询JPA 实体对象。
与用于数据库访问的标准 的JDBC 相比,JPA 是一个标准 用于 对象关系映射( ORM),它允许将代码中的对象映射到数据库中的表。 ORM 通常在后台使用 JDBC 和动态生成的 SQL 查询,但这种机制大多对应用程序开发人员隐藏。 JPA 不仅允许映射实体,还允许映射实体关系,以便轻松加载关联对象。
最常用的 JPA 实现是 Hibernate (http://hibernate.org) 和 EclipseLink (<类="ulink" href="http://www.eclipse.org/eclipselink" target="_blank">http://www.eclipse.org/eclipselink)。这两者都实现了 JPA 2.2 并且可以互换。除了实现 JPA 标准外,这两个项目还提出了一组额外的特性,这些特性未在规范中定义,但在某些情况下可能很方便。例如,EclipseLink 使得处理数据库更改事件和描述实体到多个数据库中的表的映射成为可能。另一方面,Hibernate 提出了对时间戳和自然 ID 的更好支持。这两个库都支持多租户。但是,我们应该明白,当使用 exclusive 功能时,这些库不再是可互换的。
使用 JPA 实现而不是纯 JDBC 的另一个原因是由于 Hibernate 和 EclipseLink 提供的缓存特性。这两个库都允许我们最小化在一级会话缓存中缓存实体的实际数据库请求数量,甚至在二级外部缓存中。仅此功能可能会对应用程序性能产生显着影响。
春季数据 JPA
Spring Data JPA 同样允许我们像 Spring Data JDBC 一样构建存储库,但在内部 它 使用更强大的 JPA ——基于实施。 Spring Data JPA 对 Hibernate 和 EclipseLink 都有 excellent 支持。 Spring Data JPA 动态生成基于方法名称约定的 JPA 查询,提供 Generic DAO 模式的实现并添加对 Querydsl 库的支持 (http://www.querydsl.com),它支持优雅的、类型安全的、基于 Java 的查询。
现在,让我们创建一个简单的应用程序来演示 Spring Data JPA 的基础知识。以下依赖项获取 Spring Boot 应用程序所需的所有模块:
Spring Boot 足够聪明,可以推断出正在使用 Spring Data JPA,因此甚至不需要添加 @EnableJpaRepositories
注释(但如果我们愿意,我们可以这样做)。 Book
实体应如下所示:
标有javax.persistence.Entity
注解的Book
实体允许setting JPQL 查询中使用的实体名称。 javax.persistence.Table
注解定义了表的坐标,也可以定义约束和索引。需要注意的是,我们必须使用 javax.persistence.Id 而不是
注解。org.springframework.data.annotation.Id
注解
现在,让我们定义一个 CRUD 存储库,其中包含一个使用命名约定生成查询的自定义方法,以及另一个使用 JPQL 查询的方法:
类路径中的 JDBC 驱动程序、一个 Spring Boot 依赖项、Book
实体类和BookJpaRepository
接口足以提供一个基于下一个技术堆栈的基本但非常通用的持久层——Spring Data JPA、JPA、JPQL、Hibernate 和 JDBC。
春季数据 NoSQL
Spring Data JPA 和 Spring Data JDBC 都是 优秀 解决方案,用于连接至少提供 JDBC 驱动程序的关系型数据库,但是大多数 NoSQL 数据库 不这样做。在这种情况下,Spring Data 项目有几个单独的模块,分别针对流行的 NoSQL 数据库。 Spring 团队积极为 MongoDB、Redis、Apache Cassandra、Apache Solr、Gemfire、Geode 和 LDAP 开发模块。同时,社区为以下数据库和存储开发模块——Aerospike、ArangoDB、Couchbase、Azure Cosmos DB、DynamoDB、Elasticsearch、Neo4j、Google Cloud Spanner、Hazelcast 和 Vault。
笔记
值得一提的是,EclipseLink 和 Hibernate 都支持 NoSQL 数据库。 EclipseLink 支持 MongoDB、Oracle NoSQL、Cassandra、Google BigTable 和 Couch DB。以下文章描述了 EclipseLink 中的 NoSQL 支持:https://wiki.eclipse .org/EclipseLink/Examples/JPA/NoSQL。此外,Hibernate 有一个名为 Hibernate OGM 的子项目 (http://hibernate.org/ogm),针对 NoSQL 支持,即 Infinispan、MongoDB、Neo4j 等。然而,由于 JPA 本质上是一种关系 API,因此与专门的 Spring Data 模块相比,此类解决方案缺乏与 NoSQL 相关的特性。此外,当应用于 NoSQL 数据存储时,JPA 及其关系假设可能会导致应用程序设计朝着错误的方向发展。
使用 MongoDB 的代码几乎 与 Spring Data JDBC 示例中的代码相同。要使用 MongoDB 存储库,我们必须添加以下依赖项:
假设我们必须实现一个在线图书目录。解决方案应该基于 MongoDB 和 Spring Framework。为此,我们可以使用以下 Java 类定义Book
实体:
在这里,我们使用 @Document
annotation(1) 而不是 JPA
来自@Entity
org.springframework.data.mongodb.core.mapping
包。此注释特定于 MongoDB,并且可以引用正确的数据库集合。此外,为了定义实体的内部 ID,我们使用 MongoDB 特定类型,org.bson.types.ObjectId
(3)
, 结合 Spring Data annotation
org.springframework.data.annotation.Id
(2)
。我们的实体以及我们的数据库 document 将包含一个title
字段,其中也将被 MongoDB 索引。为此,我们使用@Indexed
annotation(4)
装饰该字段。此注释提供了一些有关索引详细信息的配置选项。此外,一本书可能有一个或多个作者,我们通过将 authors
字段的类型声明为 List<String> 来表示这一点。
(5)
。 authors
字段也被索引。请注意,这里我们没有创建对具有多对多关系的单独author
表的引用,因为它很可能是用关系数据库实现的,而是我们嵌入作者姓名作为book
实体的子文档。最后,我们定义publishingYear
字段。实体和数据库中的字段名称不同。 @Field
注解允许对这种情况进行自定义映射 (6)
。
在数据库中,这样的book
实体将由以下 JSON 文档表示:
正如我们所见,MongoDB 使用专门设计的数据类型来表示文档的 ID (1)
。< /span>在这种情况下, publishingYear
映射到pubYear
字段(2 )
和作者
由数组(3)
表示。此外,Spring Data MongoDB 添加了 support_class
字段,该字段描述了用于对象-文档映射的 Java 类。
对于 MongoDB,存储库接口应该扩展org.springframework.data.mongodb.repository.MongoRepository
接口(1)
,它反过来扩展了CrudRepository
, 我们已经在前面的例子中使用过:
当然,MongoDB 存储库支持基于命名约定的查询生成,因此findByAuthorsOrderByPublishingYearDesc< /code>method 按作者搜索书籍并返回按出版年份排序的结果,从最近的出版物开始。此外,
org.springframework.data.mongodb.repository.Query
注解允许我们编写特定于 MongoDB 的查询。例如,前面的查询(3)
巧妙地搜索不止一位作者的书籍。
应用程序的其余部分应该与使用 Spring Data JDBC 或 Spring Data JPA 的情况相同。
尽管我们已经谈到了使用 Spring 进行数据持久化的主要方法,但我们几乎没有触及这个领域的皮毛。我们完全省略了事务管理、数据库初始化和迁移(Liquibase、Flyway),这些都是实体映射、缓存和性能调优的最佳实践。所有这些领域都可以写不止一本书,但我们必须继续前进并研究如何以反应方式进行持久性。
使用 Spring Framework 实现对 NoSQL 数据库的响应式支持需要整个底层基础架构提供响应式或异步 API。一般来说,NoSQL 数据库出现的时间相对较晚,并且发展迅速,因此没有很多基础设施受到同步阻塞 API 的严重限制。因此,与使用 JDBC 驱动程序的关系数据库相比,使用 NoSQL 数据库实现反应式持久性应该更容易。到目前为止,Spring Data 有一些响应式数据连接器,MongoDB 就是其中之一。这将在后面的 使用 Spring Data 进行响应式数据访问 部分进行介绍。
同步模型的局限性
在研究 Spring Framework 甚至 Java 的 persistence 选项时,我们查看了 JDBC、JPA、Hibernate、EclipseLink 、Spring Data JDBC 和 Spring Data JDBC,所有这些 API 和库本质上都是同步和阻塞的。尽管它们几乎总是用于从涉及网络调用的外部服务中检索数据,但它们不允许非阻塞交互。因此,所有前面提到的 API 和库都与响应式范例发生冲突。向数据库发出查询的 Java 线程注定要被阻塞,直到第一条数据到达或发生超时,从响应式应用程序的资源管理的角度来看,这是非常浪费的。如第 6 章中所述, WebFlux 异步非阻塞通信,这种方法极大地限制了应用程序的吞吐量,并且需要更多的服务器资源,因此也需要更多的资金。
不管是 HTTP 请求还是数据库请求,以阻塞的方式发出 IO 请求都是很浪费的。此外,基于 JDBC 的通信通常使用整个连接池来并行执行查询。相比之下,广泛使用的 HTTP2 协议允许使用同一个 TCP 连接同时发送和接收多个资源。这种方法减少了占用的 TCP 套接字的数量,并允许客户端和服务器(在我们的例子中,这是数据库)更大的并发性。考虑下图:
图 7.15 普通数据库通信与允许多路复用的通信协议(如 HTTP2)之间的比较
当然,连接池的存在是为了在打开新连接时节省时间。也可以在 JDBC 下实现一个通信层,以利用 HTTP2 中的多路复用,但是,JDBC 级别之前的代码必须是同步和阻塞的。
同样,在处理占用少量批次的大型查询结果时,与数据库游标(一种允许对查询结果记录进行迭代的控制结构)的通信看起来像上图的左侧。 第 3 章,反应式流 - 新流的标准 ,从 Reactive Streams 的角度详细分析了通信选项之间的差异,但同样的论点也适用于网络交互。
即使数据库提供了 asynchronous 能够高效通信并利用连接多路复用的非阻塞驱动程序,我们也无法获得在使用 JDBC、JPA 或 Spring Data JPA 时充分发挥它的潜力。因此,要构建一个完全响应式的应用程序,我们必须放弃同步技术并使用响应式类型制作 API。
总结本节,传统和完善的 JDBC 和 JPA 实现可能成为现代反应式应用程序的瓶颈。 JDBC 和 JPA 最有可能在运行时使用过多的线程和过多的内存,同时需要积极的缓存来限制冗长的同步请求和阻塞 IO。
不是同步模型不好;它只是不太适合反应式应用程序,很可能成为一个限制因素。但是,这些模型可能会成功共存。同步和反应式方法都有其优点和缺点。例如,到目前为止,反应式持久性方法无法提出任何在特性方面甚至接近 JPA 的 ORM 解决方案。
同步模型的优点
尽管 同步 数据访问并不是在实现持久层时消耗服务器资源的最有效方式,但它仍然是一种高度有价值的方法,主要用于构建阻塞 Web 应用程序时。 JDBC 可能是用于访问数据的最流行和最通用的 API,但它几乎完全隐藏了应用程序和数据库之间客户端-服务器通信的复杂性。 Spring Data JDBC 和 Spring Data JPA 为数据持久性提供了更高级的工具,隐藏了查询转换和事务管理的巨大复杂性。所有这些都经过了实战考验,大大简化了现代应用程序的开发方式。
同步数据访问简单,易于调试,易于测试。通过监控线程池也很容易跟踪资源使用情况。同步方法提供了大量不需要任何背压支持的工具(例如 JPA 和 Spring Data 连接器),并且在使用迭代器和同步流时仍然可能很有效。此外,大多数现代数据库在内部使用阻塞模型,因此使用阻塞驱动程序进行交互是很自然的。这种同步方法对本地和分布式事务有很好的支持。在用 C 或 C++ 编写的本机驱动程序上实现包装器也很容易。
同步 数据访问的唯一缺点在于执行的阻塞方式,这与使用反应范式(Netty、Reactor、WebFlux)。
在简要回顾一下同步数据访问技术之后,我们可能会转向响应式数据持久性的探索,看看 Spring Data 的响应式连接器如何在不损害 Spring Data 存储库的多功能性的情况下实现高性能的承诺。
使用 Spring Data 进行响应式数据访问
因此,要构建一个完全 reactive 应用程序,我们需要一个不使用实体集合操作的存储库,而是一个能够使用实体的反应流进行操作。反应式存储库应该能够通过使用 Entity
本身以及使用反应式Publisher
WebClient
类似的方式操作数据存储库。事实上, Spring Data Commons 模块提供了与这样一个合约的
ReactiveCrudRepository
接口。
现在,让我们讨论一下使用响应式数据访问层而不是通常的阻塞层时我们可以期待的好处。 第 3 章,反应式流 - 新流的标准 ,比较了数据检索的同步和反应模型,因此,通过采用理想的反应数据访问层,我们的应用程序可以获得以下所有好处:
- Effective thread management, since no thread is required to ever block on IO operations. This usually means that fewer threads are created, there's less overhead on thread scheduling, there's less memory footprint allocated for the
Thread
object's stack, and consequently, it's able to handle a massive amount of concurrent connections. - Smaller latency to the first results of a query. These may become available even before the query finishes. It may be convenient for search engines and interactive UI components that target low latency operation.
- Lower memory footprint. This is useful as less data is required to be buffered when processing a query for outgoing or incoming traffic. Also, the client may unsubscribe from a reactive stream and reduce the amount of data sent over the network as soon as it has enough data to fulfill its needs.
- Backpressure propagation informs the client about a database's ability to consume new data. Also, it permits informing the database server about the client's ability to process query results. In this case, more urgent work may be done instead.
- One more benefit may come from the fact that reactive clients are not thread bound, so sending a query and different data processing operations may happen in different threads. In turn, underlying queries and connection objects have to be tolerant to such modes of operation. As no threads hold exclusive rights over query objects and no client code is ever blocked, it is possible to share a single wire connection to the database and forget about connection pooling. If a database supports a smart connection mode, query results may be transported via a single physical network connection and routed to correct reactive subscribers.
- Last but not least, smooth integration of the persistence layer with a fluent reactive code of the reactive application is backed by the Reactive Streams specification.
reactive 数据库访问堆栈越多,好处 一个应用程序可能有。但是,通过应用异步驱动程序甚至是封装到适当反应式适配器中的阻塞驱动程序,可以获得前面提到的一些好处。应用程序可能会失去传播背压的能力,但它仍然可以使用更少的内存并进行适当的线程管理。现在,是时候在 Spring Boot 应用程序中使用响应式代码了。
要在 Spring Boot 应用程序中启用响应式持久性,我们必须使用具有响应式连接器的数据库之一。在撰写本文时,Spring Data 项目为 MongoDB、Cassandra、Redis 和 Couchbase 提供了响应式连接。这个列表似乎是有限的,但目前反应性持久性仍然是一个新事物,正在被广泛接受。此外,限制 Spring 团队响应式支持更多数据库的主要约束因素是缺乏数据库的响应式和异步驱动程序。现在,让我们研究一下反应式 CRUD 存储库在 MongoDB 示例中是如何工作的。
使用 MongoDB 反应式存储库
要使用 MongoDB 而不是 的同步数据访问,我们必须向我们的 Gradle 项目添加以下依赖项:
假设我们想要重构上一节中的简单 MongoDB 应用程序,使其具有响应性。在这种情况下,我们可以将Book
实体保持原样,而不做任何修改。与 MongoDB 对象-文档映射关联的所有注释对于同步和反应式 MongoDB 模块都是相同的。但是,在存储库中,我们 现在 必须用反应类型替换普通类型:
因此,我们的存储库 现在 扩展了ReactiveMongoRepository
接口(1)
;而不是MongoRepository
。反过来,ReactiveMongoRepository
扩展了ReactiveCrudRepository
接口,这是所有响应式连接器的通用接口。
笔记
虽然没有 RxJava2MongoRepository
,但我们仍然可以通过从 RxJava2CrudRepository
扩展来使用所有带有 RxJava2 的响应式 Spring Data 存储库。 Spring Data 处理对 RxJava2 的 Project Reactor 类型的采用,反之亦然,以提供原生的 RxJava 2 体验。
ReactiveCrudRepository
接口是同步 Spring Data 中 CrudRepository
接口的反应式等效接口。 Reactive Spring Data Repositories 使用相同的注解并支持大多数同步提供的特性。因此,响应式 Mongo 存储库支持通过方法名称约定(3)
进行查询,@Query
使用手写 MongoDB 注释查询(4)
,以及@Meta
注解以及一些额外的查询调整能力( 2)
。它还支持 constructs 用于运行Query by Example
(QBE) 请求。然而,与同步
MongoRepository
相比,ReactiveMongoRepository
扩展了ReactiveSortingRepository
接口,它提供了请求特定结果顺序的能力,但不提供分页支持。数据分页的问题在 分页支持 部分中讨论。
像往常一样,我们可以在我们的应用程序中注入一个 ReactiveSpringDataMongoBookRepository
类型的bean,然后Spring Data会提供所需的bean。 以下 代码展示了如何使用响应式存储库将几本书插入MongoDB:
让我们了解一下前面的代码在这里做了什么:
- 使用
BookSpringDataMongoRxRepository
接口注入一个bean。 - 使用必须插入数据库的
Book
准备反应流。 - 使用使用
Publisher
的saveAll
方法保存实体。像往常一样,在实际订阅者订阅之前不会进行保存。ReactiveCrudRepository
还有使用saveAll
方法覆盖"literal">可迭代 接口。这两种方法有不同的语义,但我们稍后会讨论这个主题。
saveAll
方法返回一个带有保存实体的Flux<Book>
,但由于我们对那个级别的细节不感兴趣,所以then
方法,我们转换流的方式只有onComplete
或onError< /code>事件被传播。
- 当反应流完成并保存所有书籍时,我们会报告相应的日志消息。
- 与往常一样,对于反应式流,应该有一个订阅者。在这里,为了简单起见,我们在没有任何处理程序的情况下进行订阅。但是,在实际应用程序中,应该有真正的订阅者,例如来自处理响应的 WebFlux 交换的订阅。
现在,让我们使用 Reactive Streams 查询 MongoDB。要打印流经反应流的查询结果,我们可以使用以下方便的辅助方法:
- 这是一种将人类可读的书籍列表打印为带有所需消息前缀的单个日志消息的方法。
- 对于流中的每一本书,它调用其
toString
方法并传播其字符串表示。 Flux.reduce
方法用于将所有书籍表示收集到一条消息中。请注意,如果书籍数量很大,这种方法可能不起作用,因为每本新书都会增加存储缓冲区的大小,并可能导致高内存消耗。为了存储中间结果,我们使用StringBuilder
class(3.1)
。请记住StringBuilder
不是线程安全的,onNext
方法可能会调用不同的线程,但是 Reactive Streams Specification 保证发生之前的关系。因此,即使不同的线程推送不同的实体,使用StringBuilder
将它们连接在一起也是安全的,因为内存屏障保证了StringBuilder
对象,而它在一个反应流中更新。在点(3.2)
一本书表示附加到单个缓冲区。
- 因为
reduce
方法只有在处理完所有传入的onNextonNext
事件>事件,我们可以安全地记录所有书籍的最终消息。 - 要开始处理,我们必须
订阅
。为了简单起见,我们假设这里不可能有错误。但是,在生产代码中,应该有一些处理错误的逻辑。
现在,让我们阅读并报告数据库中的所有书籍:
以下代码使用方法命名约定搜索 Andy Weir 的所有书籍:
此外,前面的代码使用 Mono
类型传递搜索条件,并且仅在该Mono
类型时才开始实际的数据库查询产生一个onNext
事件。因此,响应式 repository 成为响应式流的自然组成部分,其中传入和传出流是响应式的。
组合存储库操作
现在,让我们实现一个稍微复杂的 业务用例。我们可能想更新一本书的出版年份,而我们只有书名。因此,首先,我们必须找到所需的图书实例,然后更新出版年份,并将图书保存到数据库中。为了使我们的用例更加复杂,我们假设 title 和 year 值都是异步检索的,有一些延迟,并由 Mono
类型传递。另外,我们想知道我们更新的请求是否成功。到目前为止,我们并不要求更新是原子的,并假设总是不超过一本书具有相同的标题。因此,根据这些要求,我们可以设计以下业务方法 API:
前面的代码执行以下操作:
updatedBookYearByTitle
方法返回一个更新的书籍实体(或者没有,如果没有找到书籍)。- 标题值通过
Mono
类型引用。 - 新的出版年份值通过
Mono
类型引用。
我们现在可以创建一个测试场景来检查我们的实现是如何updatedBookYearByTitle
作品:
前面的代码执行以下操作:
- 跟踪运行时间,存储测试的开始时间。
- 以一秒的模拟延迟解析标题,并在值准备好后立即记录它
(2.1)
。 - 以两秒的模拟延迟解析新的发布年份值,并在值准备好后立即记录它
(2.1)
。 - 调用我们的业务方法,并在更新通知到达时记录它,如果有的话
(4.1)
。要检查是否存在onNext
事件(意味着实际的图书更新),Mono.hasElement
方法返回Mono
被调用。
- 当流完成时,代码会记录更新是否成功并报告总执行时间。
- 与往常一样,必须有人订阅才能启动反应式工作流程。
从前面的代码中,我们可以得出结论 workflow 的运行速度不能超过两秒,因为这是解决问题所需的时间出版年份。但是,它可能会运行更长时间。让我们进行实现的第一次迭代:
使用这种方法,在方法的开头,我们使用提供的对 title
(1)
。一旦找到Book
实体(2)
,我们就订阅新的出版年份值。然后,一旦新的出版年份值到达,我们就更新Book
实体(4)
并调用< code class="literal">save存储库的方法。此代码产生以下输出:
所以,这本书更新了,但是从日志中我们可以看出,我们是在收到书名后才订阅新的出版年份,所以该方法总共花费了三秒多的时间来计算结果。我们可以做得更好。为了启动并发检索过程,我们必须在工作流开始时订阅这两个流。以下代码描述了如何使用 zip
方法执行此操作:
在这里,我们zip
两个值并同时订阅它们(1)
。一旦两个值都准备好了,我们的流就会收到一个Tuple2
容器,其中包含感兴趣的值(2)
。现在我们必须解压值(2.1)
和(2.2)
使用data.getT1()
和data.getT2()
调用。在(3)
点,我们查询Book
实体,一旦到达,我们就更新出版年份并保存实体到数据库。在 second 迭代之后,我们的应用程序显示以下输出:
现在,我们可能会看到我们首先订阅了两个流 当两个值都到达时,我们更新了 book 实体。在第二种方法中,我们花费大约两秒钟而不是三秒钟来执行操作。它更快,但需要使用 Tuple2
类型,这需要额外的代码行以及进行转换。为了提高可读性并删除getT1()
和getT2()
调用,我们可以添加Reactor Addons 模块,它为这种情况提供了一些语法糖。
通过以下新依赖,我们可以改进之前的代码示例:
这是我们可以改进的方法:
在这里,在 (1)
处,我们可以将 Tuple2
对象的手动解构替换为 TupleUtils
类中的">function方法并使用已经解构的值。由于 function
方法是静态的,因此生成的代码非常流畅和冗长:
此外,在 (2)
点,我们将 titleValue
再次包装到 Mono
对象。我们可以使用已经具有正确类型的原始title
对象,但在这种情况下,我们将订阅两次title
流并将接收以下输出。请注意,我们触发了两次标题解析代码:
还有一点是,在第三次迭代中,我们发出 database 请求,仅在收到书名和新出版后才加载图书价值观。但是,我们可能会在出版年份请求仍在进行中但标题值已经存在时开始加载图书实体。第四次迭代展示了如何构建这个反应式工作流程:
在这里,使用zip
运算符(1)
我们订阅新的出版年份值(1.1)
和书籍实体(1.2)
同时。当两个值都到达时 (2)
,我们更新实体的出版年份并请求图书保存过程(2.1)
.此外,与此业务用例的所有先前迭代一样,即使工作流至少需要两秒钟才能完成,也不会阻塞任何线程。因此,此代码非常有效地使用计算资源。
本练习的重点是演示,借助 Reactive Streams 的功能和 Project Reactor API 的多功能性,很容易构建不同的异步工作流,甚至涉及数据持久层。只需几个反应式操作符,我们就可以彻底改变数据流经系统的方式。然而,并非所有这样的反应流替代方案都是平等的。有些可能运行得更快,而另一些可能运行得较慢,并且在许多情况下,最明显的解决方案并不是最合适的解决方案。因此,在编写反应式管道时,请考虑反应式运算符的替代组合,不要选择想到的第一个,而是选择最适合业务请求的选项。
反应式存储库如何工作
现在,让我们深入了解 reactive 存储库的工作原理。 Spring Data 中的响应式存储库通过调整底层数据库驱动程序功能来工作。下面可能有一个响应式流兼容的驱动程序或一个 asynchronous 驱动程序,可以封装到响应式 API 中。在这里,我们将了解反应式 MongoDB 存储库如何使用符合反应式流的 MongoDB 驱动程序,以及反应式 Cassandra 存储库如何构建在异步驱动程序上。
首先,ReactiveMongoRepository
接口扩展了更通用的接口——ReactiveSortingRepository
和ReactiveQueryByExampleExecutor
。 ReactiveQueryByExampleExecutor
接口允许使用 QBE 语言执行查询。 ReactiveSortingRepository
接口扩展了更通用的ReactiveCrudRepository
接口并添加了findAll
方法,它允许请求查询结果的排序顺序。
由于很多响应式连接器使用ReactiveCrudRepository
接口,让我们仔细看一下。ReactiveCrudRepository
声明了保存方法,查找,并删除实体。 单声道<T> save(T entity)
方法保存entity
,然后返回保存的实体进行进一步操作。请注意,保存操作可能会完全更改实体对象。 单声道<T> findById(ID id)
操作consumes实体的id
并且 返回 结果包装到Mono
。 findAllById
方法有两个覆盖,每个都使用 Iterable<ID>
集合形式的 ID Publisher<ID>
的形式。除了响应式方法之外,ReactiveCrudRepository
和CrudRepository
之间唯一显着的区别在于ReactiveCrudRepository
不支持分页,不允许事务性操作。本章稍后将介绍使用 Spring Data 对反应式持久性的事务支持。但是,现在实现分页策略是开发人员的责任。
分页支持
需要注意的是,Spring Data 团队有意省略了分页支持,因为同步存储库中使用的实现不适合反应式范例。要计算下一页的参数,我们需要知道上一个结果的返回记录数。此外,要使用该方法计算总页数,我们需要查询总记录数。这两个方面都不符合反应式非阻塞范式。此外,查询数据库以计算所有行的成本相当高,并且会增加实际数据处理之前的延迟。但是,仍然可以通过将 Pageable
对象传递给存储库来获取数据块,如下所示:
所以,现在我们可以请求结果的第二页(注意,索引从 0 开始),其中每一页包含五个元素:
ReactiveMongoRepository 实现细节
Spring Data MongoDB Reactive 模块只有一个ReactiveMongoRepository
接口的implementation ,即SimpleReactiveMongoRepository
类。它为ReactiveMongoRepository
的所有方法提供了实现,并使用ReactiveMongoOperations
接口来处理底层的所有操作。
我们来看findAllById(Publisher
很明显,该方法通过buffer
操作收集所有ids
,然后使用findAllById(Iterable<ID> ids)
覆盖该方法。反过来,该方法制定Query
对象并调用findAll(Query query)
, 调用ReactiveMongoOperations
实例, mongoOperations.find(query,...)
。
另一个有趣的观察是insert(Iterable<S>entities)
方法在一个批处理查询中插入实体。同时,insert(Publisher
方法在
flatMap
算子内部生成很多查询,如下:
在这种情况下,findAllById
方法的两个覆盖的行为方式相同,并且只生成一个数据库查询。现在,让我们看看saveAll
方法。使用Publisher
的方法覆盖对每个实体发出查询。使用 Iterable
的方法 override 在所有实体都是新的情况下发出一个查询,但在其他情况下发出每个实体的查询。 deleteAll(Iterable<? extends T>entities)
方法总是针对每个实体发出一个查询,即使所有实体都在 Iterable
container 并且不需要等待元素异步出现。
正如我们所看到的,同一方法的不同覆盖可能会以不同的方式表现,并且可能会生成不同数量的数据库查询。此外,这种行为与方法是否使用一些同步迭代器或响应式Publisher
没有很强的相关性。因此,我们建议检查存储库方法的方法实现,以了解它向数据库发出了多少查询。
如果我们使用 ReactiveCrudRepository
方法和动态生成的实现,则更难查看实际查询。但是,在这种情况下,查询生成的行为类似于普通同步CrudRepository
。RepositoryFactorySupport
生成一个适当的< /span> 代理 ReactiveCrudRepository
。 ReactiveStringBasedMongoQuery
类用于在使用 @Query
注解修饰方法时生成查询。 ReactivePartTreeMongoQuery
类用于根据方法名称约定生成查询。当然,为 ReactiveMongoTemplate
的记录器设置 DEBUG
级别允许跟踪发送到 MongoDB 的所有查询。
使用 ReactiveMongoTemplate
尽管 ReactiveMongoTemplate
被用作响应式存储库的构建块,但该类本身是非常通用的。有时它允许使用 database 比更高级别的存储库更有效。
例如,让我们实现一个简单的服务,该服务使用ReactiveMongoTemplate
使用正则表达式按标题查找书籍。实现可能如下所示:
下面来介绍RxMongoTemplateQueryService
类的要点:
- 我们必须引用
ReactiveMongoOperations
接口的实例。ReactiveMongoTemplate
实现该接口,并且当MongoDB 数据源已配置。 - 该服务定义了
findBooksByTitle
方法,并使用正则表达式作为搜索条件,并返回带有结果的Flux
。
- MongoDB 连接器的
Query
和Criteria
类用于使用正则表达式构建实际查询。此外,我们通过应用Query.limit
方法将结果数限制为100
。 - 在这里,我们要求
mongoOperations
执行之前构建的查询。查询结果应该映射到Book
类的实体。此外,我们必须告诉我们使用什么集合进行查询。在前面的示例中,我们查询了一个名为book
的集合。
笔记
请注意,我们可以通过提供以下方法签名来实现与普通反应式存储库相同的行为(查询限制除外),该方法签名遵循命名约定:
Flux<Book> findManyByTitleRegex(字符串正则表达式);
在底层,ReactiveMongoTemplate
使用ReactiveMongoDatabaseFactory
接口获取instance 反应式 MongoDB 连接。此外,它使用MongoConverter
接口的实例将实体转换为 文档,反之亦然。MongoConverter
也用于同步MongoTemplate
。让我们看看ReactiveMongoTemplate
如何实现它的契约。例如,find(Query query,...)
方法映射org.springframework.data.mongodb.core.query.Query< /code>instance 到
org.bson.Document
类的实例,MongoDB 客户端可以使用该实例。然后ReactiveMongoTemplate
使用转换后的查询调用数据库客户端。 com.mongodb.reactivestreams.client.MongoClient
类提供了响应式 MongoDB 驱动程序的入口点。它与 Reactive Streams 兼容,并通过响应式发布者返回数据。
使用响应式驱动程序 (MongoDB)
Spring Data 中的 Reactive MongoDB 连接基于 MongoDB Reactive Streams Java Driver ( https://github.com/mongodb/mongo-java-driver-reactivestreams)。驱动程序提供异步流处理 具有非阻塞背压。反过来,响应式驱动程序构建在 MongoDB Async Java 驱动程序之上 (http://mongodb.github.io/mongo-java-driver/3.8/driver-async)。异步驱动是低级的和 有一个基于回调的API,所以使用起来不如更高级的好级反应流驱动程序。我们必须注意,除了 MongoDB Reactive Streams Java Driver 还有 MongoDB RxJava Driver (http://mongodb.github.io/mongo-java-driver-rx),它也是建立在相同的异步 MongoDB 驱动程序之上的。因此,对于 MongoDB 连接,Java 生态系统有一个同步驱动程序、一个异步驱动程序和两个反应驱动程序。
当然,如果我们需要比ReactiveMongoTemplate
提供更多的查询过程控制,我们可以直接使用响应式驱动。使用这种方法,前面使用纯反应式驱动程序的示例如下:
我们看一下前面的代码:
- 服务是指
com.mongodb.reactivestreams.client.MongoClient<的instance /代码>界面。当数据源配置正确时,这个实例应该可以作为 Spring bean 访问。
- 该服务定义了
findBooksByTitleRegex
方法,该方法返回一个带有Book
实体的Flux
. - 我们必须返回一个新的
Flux
实例,它将执行推迟到实际订阅发生的时间。在该 lambda 中,我们使用com.mongodb.client.model.Filters 定义一个具有
帮助类。然后我们通过名称引用数据库org.bson.conversions.Bson
类型的新查询(3.2)
以及集合(3.3)
。除非我们使用find
方法发送先前准备好的查询(3.4)
否则不会发生与数据库的通信。 - 一旦结果开始返回,我们可以将 MongoDB 文档传输到域实体中,如果需要的话。
尽管在前面的示例中我们在数据库驱动程序级别工作,但当我们使用 Reactive Streams 操作时,它仍然非常舒适。此外,我们不需要手动处理背压,因为 MongoDB Reactive Streams Java Driver 已经支持它。 Reactive MongoDB 连接使用基于批量大小的背压需求。这种方法是合理的默认设置,但在使用小的需求增量时会产生许多往返。下图突出显示了响应式 MongoDB 存储库所需的所有 abstraction 层:
图 7.16 Reactive MongoDB stack with Spring Data
使用异步驱动程序 (Cassandra)
我们已经描述了 reactive Mongo 存储库是如何构建在响应式驱动程序之上的。现在,让我们看看反应式 Cassandra 存储库如何适应异步驱动程序。
与 ReactiveMongoRepository
类似,响应式 Cassandra 连接器为我们提供了ReactiveCassandraRepository
接口,它还扩展了更通用的ReactiveCrudRepository
。 ReactiveCassandraRepository
接口由SimpleReactiveCassandraRepository
实现,而后者又使用ReactiveCassandraOperations
低级操作的接口。
ReactiveCassandraOperations
由ReactiveCassandraTemplate
类实现。当然,ReactiveCassandraTemplate
可以直接在应用中使用,类似于ReactiveMongoTemplate
。
ReactiveCassandraTemplate
类内部使用ReactiveCqlOperations
。ReactiveCassandraTemplate
与 Spring Data 实体如as org.springframework.data.cassandra.core.query.Query
,而ReactiveCqlOperations
用CQL语句操作 (由<代码 class="literal">String) 可由 Cassandra 驱动程序识别。 ReactiveCqlOperations
接口由ReactiveCqlTemplate
类实现。反过来,ReactiveCqlTemplate
使用ReactiveSession
接口进行实际的数据库查询。ReactiveSession
由DefaultBridgedReactiveSession
类实现,该类将驱动程序提供的异步Session
方法连接到响应式执行模式。
让我们更深入地了解DefaultBridgedReactiveSession
类如何将异步 API 适配为反应式 API。 execute
方法接收一个语句
(例如,一个SELECT
语句)并且 reactively 返回结果。 execute
方法 及其adaptFuture
helper方法 如下所示:
让我们看一下前面的代码:
- 首先,
execute
方法返回的不是Flux
,而是一个Mono
带有ReactiveResultSet
的实例。ReactiveResultSet
包装异步com.datastax。 driver.core.ResultSet
, 它支持分页,以便在返回ResultSet
实例时获取结果的第一页,仅在之后获取下一页第一个结果的所有结果都已被消耗。ReactiveResultSet
使用以下方法签名适应该行为—Flux<Row>行()
。 - 我们使用
create
方法创建一个新的Mono
实例,该方法将操作推迟到订阅的那一刻。 - 这是在驱动程序的异步
Session
实例上执行的异步查询。请注意,Cassandra 驱动程序使用 Guava 的ListenableFuture
来返回结果。 - 异步
ResultSet
被包装到一个名为ReactiveResultSet
的响应式副本中。 - 在这里,我们调用
adaptFuture
辅助方法,该方法将ListenableFuture
映射到Mono 。
- 如果有任何错误,我们必须通知我们的反应订阅者。
adaptFuture
方法只是简单地为future添加一个新的监听器(7.1)
,所以当结果出现时,它会生成一个响应式(7.1)
code class="literal">onNextsignal(7.2)
。它还通知订阅者有关执行错误(如果有)(7.3)
。
需要注意的是,多页ResultSet
允许调用fetchMoreResults
方法异步获取后续页面数据。< code class="literal">ReactiveResultSet 在internally Flux<Row> rows()
方法。 尽管这种方法有效,但在 Casandra 收到完全反应式驱动程序之前,它仍被视为一种中间解决方案。
下图显示了响应式 Spring Data Cassandra 模块的内部架构:
图 7.17 Reactive Cassandra 堆栈与 Spring Data
反应式交易
事务是数据库的标记,它定义了具有许多应该以原子方式执行的逻辑操作的单个单元的边界。因此,我们有一个事务被初始化的时间点,然后对事务对象进行一些操作,然后发生决策时刻。在那一刻,客户端和数据库决定是否应该成功提交或回滚事务。
在同步世界中,transaction 对象通常保存在ThreadLocal
容器。但是, ThreadLocal
不适合将数据与反应流相关联,因为用户无法控制线程切换。事务需要将底层资源绑定到具体化流。使用 Project Reactor,这可以通过利用 Reactor 上下文来实现,如 Chapter 4, < em>Project Reactor - 响应式应用程序的基础。
MongoDB 4 的反应式事务
从 4.0 版开始,MongoDB 支持多文档事务。这使我们能够在新版本的 MongoDB 中试验反应式事务。以前,Spring Data 只有 具有与不支持事务的数据库的反应连接器。现在,情况发生了变化。由于响应式 transactions 是响应式持久性领域的新事物,它本身是相当新的,因此以下所有声明和代码示例都应该被视为未来如何实现反应式事务的一些可能性。在撰写本文时,Spring Data 没有任何在服务或存储库级别应用反应式事务的功能。但是,我们可以在ReactiveMongoOperations
由ReactiveMongoTemplate
实现的级别操作事务。
首先,多文档事务是MongoDB的一个新特性。它仅适用于具有 WiredTiger 存储引擎的非分片副本集。从 MongoDB 4.0 开始,没有其他配置支持多文档事务。
此外,某些 MongoDB 功能在事务中不可用。不允许发出元命令,或创建集合或索引。此外,集合的隐式创建在事务中不起作用。因此,需要设置所需的数据库结构以防止错误。此外,某些命令的行为可能会有所不同,因此请查看有关多文档事务的文档。
以前,MongoDB 只允许对一个文档进行原子更新,即使该文档包含嵌入式文档。使用多文档事务,可以在许多操作、文档和集合中获得全有或全无的语义。多文档事务保证了全球一致的数据视图。提交事务时,将保存事务中所做的所有更改。但是,当事务中的任何操作失败时,整个事务将中止并且所有更改都将被丢弃。此外,在事务提交之前,在事务之外看不到任何数据更新。
现在,让我们演示反应式事务可用于将文档存储到 MongoDB。为此,我们可以使用一个众所周知的经典例子。假设我们必须实现一个钱包服务,在用户账户之间转移资金。每个用户都有自己的账户,余额为非负数。只有当他们有足够的资金时,用户才能将任意金额转移给另一个用户。转账可能会同时发生,但系统在转账时不允许损失或获得任何金钱。因此,发送方钱包的取款操作和接收方钱包的存款操作必须同时且原子地发生。多文档交易在这里应该有所帮助。
如果没有交易,我们可能会面临以下问题:
- A client makes a few transfers at the same time with more funds than they have on their account. Hence, there is a chance that simultaneous transfers may impact the consistency of the system and illegally create money.
- A user receives a few deposits simultaneously. Some updates may rewrite the wallet state, and the user may lose money forever.
有几种方法可以描述 汇款算法,但这里我们坚持使用最简单的一种。要将金额
从账户A
转移到账户B
,我们应该做到以下几点:
- 打开一个新的交易。
- 加载
账户A
的钱包。 - 加载
账户B
的钱包。 - 检查
账户A
的钱包是否有足够的资金。 - 通过提取
金额
计算账户A
的新余额。 - 通过存入
金额
,计算账户B
的新余额。 - 保存
账户A
的钱包。 - 保存
账户B
的钱包。 - 提交事务。
作为该算法的结果,我们要么获得钱包的新一致状态,要么根本看不到任何变化。
让我们描述一下Wallet
实体类,它被映射到一个 MongoDB 文档并且有一些方便的实用方法:
让我们描述一下前面的代码:
Wallet
实体类映射到MongoDB中的wallet
集合。org.bson.types.ObjectId
类用作实体标识符。ObjectId
类与MongoDB有很好的集成,常用于实体识别。hasEnoughFunds
方法检查钱包是否有足够的资金进行操作。withdraw
方法将钱包余额减少请求的金额。deposit
方法将钱包的余额增加所请求的金额。
要从数据库存储和加载钱包,我们需要一个存储库:
让我们更详细地描述WalletRepository
接口:
现在,让我们为WalletService
定义一个接口:
此处,编号点的含义如下:
transferMoney
方法将金额
从fromOwner
的钱包转移到toOwner
的钱包。请注意,该方法使用响应式类型,因此在方法调用的那一刻,实际的交易参与者可能仍然是未知的。 当然,该方法同样可以接受原语或Mono
。然而,在这里,我们有意使用三个不同的 Mono 实例来练习 zip
运算符和TupleUtils
。reportAllWallets
方法汇总所有注册钱包的数据,查看总余额。transferMoney
方法返回TxResult
类型的结果。TxResult
枚举描述了传输操作的三个潜在结果:SUCCESS
、NOT_ENOUGH_FUNDS
,以及TX_CONFLICT
。SUCCESS
和NOT_ENOUGH_FUNDS
操作是自描述的。TX_CONFLICT
描述交易失败的情况,因为其他一些并发交易成功,更新了一个或两个涉及的钱包。Statistics
类表示系统中所有钱包的聚合状态,对完整性检查很有用。为简单起见,省略了实现细节。
现在我们已经定义了WalletService
接口,我们可以编写一个带有模拟的单元测试。使用所需的并行性,模拟选择两个随机所有者并尝试转移随机数量的资金。省略了一些无关紧要的部分,模拟可能如下所示:
上述代码由以下步骤组成:
- 使用
Flux.range
方法模拟所需的传输量迭代
。 - 应用一个小的随机延迟以刺激随机交易竞争。
- 事务在
simulationScheduler
上运行。它的并行性定义了可能发生多少并发事务。我们可以用这个代码创建一个调度——Schedulers.newParallel("name", parallelism)
。 - 选择随机钱包所有者和要转移的金额。
- 发出
transferMoney
服务请求。 - 由于
transferMoney
调用可能导致TxResult
状态之一,reduce
方法有助于跟踪模拟统计数据。请注意,OperationStats
类跟踪有多少操作成功,有多少由于资金不足而被拒绝,以及有多少由于交易冲突而失败。另一方面,WalletService.Statistics
类跟踪资金总额。
通过WalletService
的正确实现,我们期望测试simulation会导致到系统中的总金额不变的系统状态。同时,我们希望当发送方有足够的资金进行交易时,汇款请求会成功执行。否则,我们可能会面临可能导致实际财务损失的系统完整性问题。
现在,让我们使用 MongoDB 4 和 Spring Data 提供的反应式事务支持来实现WalletService
服务。 TransactionalWalletService
类表示的实现可能如下所示:
由于前面的代码可能看起来不简单,让我们逐个描述它:
- 首先,我们必须使用
ReactiveMongoTemplate
类,因为在撰写本文时,Reactive MongoDB 连接器不支持存储库级别的事务,仅支持MongoDB 模板。 transferMoney
方法的实现在这里定义。通过zip
操作,它订阅所有方法参数(2.1)
,当所有参数都被解析后,它使用TupleUtils.function
静态辅助函数,用于将Tuple3
分解成其组成部分(2.2)
代码流畅度。在这一点(2.3)
我们调用doTransferMoney
方法,它会进行实际的汇款。但是,doTransferMoney
方法可能会返回onError
信号 这表明交易冲突。在这种情况下,我们可以使用方便的retryBackoff
方法(2.4)
重试操作。retryBackoff
方法需要知道重试次数(20)、初始重试延迟(1毫秒)、最大 重试延迟(500 毫秒)和抖动值(0.1),用于配置重试延迟增加的速度。如果我们在所有重试后仍无法处理事务,我们应该将TX_CONFLICT
状态返回给客户端。doTransferMoney
方法尝试进行实际的汇款。使用已解析的参数调用它 -form
、to
和amount
(3.1)
。通过调用mongoTemplate.inTransaction().execute(...)
方法,我们定义了一个新事务的边界(3.2) 。在
execute
方法中,我们得到了session
ReactiveMongoOperations
的实例班级。session
对象绑定到 MongoDB 事务。现在,在交易中,我们搜索发送者的钱包(3.3)
然后搜索接收者的钱包(3.4 ) 。两个钱包都解决后,我们检查发送方是否有足够的资金
(3.5)
。然后我们从发送者的 钱包中取出正确数量的钱(3.6)
并将相同数量的钱存入接收者的钱包(3.7 ) 。此时,更改尚未保存到数据库中。现在,我们保存发送者的更新钱包
(3.8)
然后保存接收者的更新钱包(3.9)
.如果数据库不拒绝更改,我们返回SUCCESS
状态并自动提交事务(3.10)
。如果发送方没有足够的资金,我们返回NOT_ENOUGH_FUNDS
状态(3.11)。如果在与数据库通信时出现任何错误,我们会传播onError
信号(3.12)
,进而触发在(2.4)
处描述的重试逻辑。- 在
(3.3)
和(3.4)
我们使用了queryForOwner< /code>method,它使用 Criteria API 来构建 MongoDB 查询。
使用 Reactor Context 实现使用事务引用正确的会话。 ReactiveMongoTemplate.inTransaction
方法启动一个新事务并将其放入上下文中。因此,可以在反应流中的任何位置获取与由com.mongodb.reactivestreams.client.ClientSession
接口表示的事务的会话。 ReactiveMongoContext.getSession()
helper 方法允许我们获取会话实例。
当然,我们可以通过在一个查询中加载两个钱包以及在一个查询中更新两个钱包来改进TransactionalWalletService
类。诸如此类的更改应该会减少数据库请求的数量,加快汇款速度,并降低交易冲突率。但是,这些改进 留给读者作为练习。
现在,我们可以使用不同数量的钱包、汇款迭代和并行性来运行前面描述的测试场景。如果我们正确实现了TransactionalWalletService
类中的所有业务逻辑,我们应该会收到如下测试的输出:
所以,在前面的模拟中,我们进行了10,000
个传输操作,6,238
个成功,3,762
其中因资金不足而失败。此外,我们的重试策略允许解决所有事务冲突,因为没有一个事务以 TX_CONFLICT
状态完成。从日志中可以看出,系统保持了总余额的不变性——模拟前后系统中的总金额是相同的。因此,我们在通过 MongoDB 应用反应式交易进行并发汇款时实现了系统完整性。
对副本集的多文档事务的支持现在允许使用 MongoDB 作为主要数据存储来实现整个新的应用程序集。当然,MongoDB 的未来版本可能允许跨分片部署的事务,并提供各种隔离级别来处理事务。但是,我们应该注意到,多文档事务会产生更高的性能成本和更长的响应延迟比较与简单的文档写入。
尽管反应式事务还不是一种广泛使用的技术,但这些示例清楚地表明可以以反应式方式应用事务。将反应式持久性应用于 PostgreSQL 等关系数据库时,反应式事务的需求量很大。但是,该主题需要用于数据库访问的反应式语言级 API,在撰写本文时,该 API 尚不存在。
Spring Data 反应式连接器
在撰写本文时,Spring Data 2.1 已为四个 NoSQL 数据库(即 MongoDB、Cassandra、Couchbase 和 Redis)提供 database 连接器. Spring Data 很可能也会支持其他数据存储,尤其是那些利用 Spring WebFlux WebClient 通过 HTTP 进行通信的数据存储。
在这里,我们不会介绍 Spring Data 反应式连接器的所有 特性 或其实现细节。在前面的部分中,我们介绍了 MongoDB 和 Cassandra 的很多内容。但是,让我们重点介绍每个反应式连接器的主要显着特征。
反应式 MongoDB 连接器
正如本章previous 所述,Spring Data 对 MongoDB 有很好的支持。 Spring Data Reactive MongoDB 模块可以通过spring-boot-starter-data-mongodb-reactive
Spring Boot starter 模块启用。 Reactive MongoDB 支持提供了一个 reactive 存储库。 ReactiveMongoRepository
接口定义了基本的存储库契约。该仓库继承了ReactiveCrudRepository
的所有特性,并增加了对QBE的支持。此外,MongoDB 存储库支持使用 @Query
注释的自定义查询和使用 @Meta
注释的附加查询配置。如果遵循命名约定,MongoDB 存储库支持从方法名称生成查询。
MongoDB 存储库的另一个显着特性是支持可尾游标。默认情况下,当所有结果都用完时,数据库会自动关闭查询游标。但是,MongoDB 有上限集合,它们是固定大小的并支持高吞吐量操作。文档检索基于 insertion 顺序。封顶集合的工作方式类似于循环缓冲区。此外,上限集合支持可尾光标。这个cursor 在客户端消费完初始查询中的所有结果并且当有人将新文档插入到上限集合中时保持打开状态, tailable 游标将返回新文档。在ReactiveMongoRepository
中,标有@Tailable
注解的方法返回一个由Flux<表示的可尾游标;实体>
类型。
再低一层,ReactiveMongoOperations
接口及其实现类ReactiveMongoTemplate
提供对MongoDB通信的更细粒度的访问。除此之外,ReactiveMongoTemplate
支持与 MongoDB 的多文档事务。此功能仅适用于具有 WiredTiger 存储引擎的非分片副本集。此功能在 使用 MongoDB 4 部分的反应式事务中进行了描述。
反应式 Spring Data MongoDB 模块构建在 Reactive Streams MongoDB Driver 之上,它实现了 Reactive Stream 规范并在内部使用了 Project Reactor。反过来,MongoDB Reactive Streams Java Driver 构建在 MongoDB Async Java Driver 之上。 响应式存储库的工作原理 部分 更详细地描述了ReactiveMongoRepository
的工作原理。
反应式 Cassandra 连接器
Spring Data Reactive Cassandra module可以通过导入spring-boot-starter-来启用data-cassandra-reactive
启动模块。 Cassandra 还支持响应式存储库。 ReactiveCassandraRepository
接口扩展ReactiveCrudRepository
并定义capabilities。 @Query
注解允许手动定义 CQL3 查询。 @Consistency
注解可以配置应用于查询的所需一致性级别。
ReactiveCassandraOperations
接口和ReactiveCassandraTemplate
类可以访问 Cassandra 数据库上的低级操作。
从 Spring Data 2.1 开始,响应式 Cassandra 连接器包装了异步 Cassandra 驱动程序。 使用异步驱动程序 (Cassandra) 部分描述了如何将异步通信包装到响应式客户端中。
反应式沙发底座连接器
Spring Data Reactive Couchbase 模块可以通过 spring-boot-starter-data-couchbase-reactive
starter 模块启用。它为 Couchbase (https://www.couchbase.com)。 ReactiveCouchbaseRepository
接口扩展了基本ReactiveCrudRepository
, 另外需要 扩展Serializable
接口的实体ID类型。
ReactiveCouchbaseRepository
接口的默认实现建立在< code class="literal">RxJavaCouchbaseOperations接口。 RxJavaCouchbaseTemplate
类实现RxJavaCouchbaseOperations
。此时,很明显反应式 Couchbase 连接器使用了 RxJava 库来实现RxJavaCouchbaseOperations
。由于ReactiveCouchbaseRepository
方法返回Mono
和Flux
类型和RxJavaCouchbaseOperations
方法返回Observable
类型,需要进行响应式类型转换。它发生在存储库实现的级别。
反应式 Couchbase 连接器构建在反应式 Couchbase 驱动程序之上。最新的 Couchbase 驱动程序 2.6.2 使用 RxJava 版本 1.3.8,这是 1.x 分支的最后一个版本。因此,背压支持可能会受到 Couchbase 连接器的限制。但是,它通过 Netty 框架和 RxJava 库具有完全非阻塞的堆栈,因此不应该浪费任何应用程序资源。
反应式 Redis 连接器
Spring Data Reactive Redis 模块可以通过导入spring-boot-starter启用 -data-redis-reactive
启动器。与其他 reactive 连接器相比,Redis 连接器不提供反应式存储库。因此,ReactiveRedisTemplate
类成为响应式 Redis 数据访问的中心抽象。ReactiveRedisTemplate
实现了由ReactiveRedisOperations
接口并提供所有必要的序列化-反序列化例程。同时,ReactiveRedisConnection
允许在与 Redis 通信时使用原始字节缓冲区。
除了存储和检索对象以及管理 Redis 数据结构的普通操作外,该模板还允许订阅 Pub-Sub 频道。例如,convertAndSend(String destination, V message)
方法将给定消息发布到给定通道,并返回接收到消息的客户端数量。 listenToChannel(String... channels)
方法返回一个Flux
,其中包含来自感兴趣频道的消息。这样,反应式 Redis 连接器不仅支持反应式数据存储,还提供了消息传递机制。 第 8 章, 使用 Cloud Streams 扩展, 更多地描述了消息传递如何提高反应式应用程序的可伸缩性和弹性。
Spring Data Redis 目前集成 与 Lettuce 驱动程序 (https://github.com/lettuce-io/lettuce-core)。它是 Redis 的唯一反应式 Java 连接器。 Lettuce 4.x 版本使用 RxJava 进行底层实现。但是,库的 5.x 分支切换到 Project Reactor。
除 Couchbase 之外的所有反应式连接器都有反应式运行状况指示器。因此,数据库健康检查也不应该浪费任何服务器资源。 第 10 章, 最后,发布它!
我们确信,随着时间的推移,Spring Data 将为其生态系统添加更多的响应式连接器。
限制和预期的改进
由于反应式连接领域相对较新,禁止在许多应用程序中使用该方法存在一些限制:
- The lack of reactive drivers for popular databases used in large part of modern projects. So far, we have reactive or asynchronous drivers for MongoDB, Cassandra, Redis, and Couchbase. Consequently, these databases have reactive connectors in the Spring Data ecosystem. Also, we have a few options for reactive access to PostgreSQL. At the same time, there is some work going on to enable reactive access for MySQL and MariaDB. Even though the few databases with reactive support cover many use cases, this list is still limiting. To become a trendy development technique, the reactive data access should have connectors for most widespread relational databases, such as PostgreSQL, MySQL, Oracle, MS SQL, for popular search engines such as ElasticSearch and Apache Solr, as well as for cloud databases such as Google Big Query, Amazon Redshift, and Microsoft CosmosDB.
- The lack of reactive JPA. Currently, the reactive persistence operates at a pretty low level. We can not easily work with entities in the ways proposed by ordinary JPA. With the current reactive connectors, we have no support for entity relationship mappings, for entity caching, or for lazy loading. However, it would be odd to demand such capabilities even before consenting to any low-level API for the reactive data access.
- The lack of a language-level reactive API for data access. As was described previously in this chapter, at the time of writing, the Java platform has only JDBC API for data access, which is synchronous and blocking, and, consequently, cannot smoothly be used with a reactive application.
但是,我们可能会看到越来越多的 NoSQL 解决方案提供了响应式驱动程序,或者至少提供了易于包装到响应式 API 中的异步驱动程序。此外,目前正在对 Java 中用于数据访问的语言级 API 领域进行重大改进。在撰写本文时,有两个突出的建议可以满足这一需求,即 ADBA 和 R2DBC。现在,让我们更仔细地看看它们。
异步数据库访问
异步数据库访问(ADBA) 定义了一个非用于 Java 平台的阻塞数据库访问 API。在撰写本文时,它仍是一个草案, JDBC 专家组 正在讨论它应该是什么样子。 ADBA 是在 JavaOne 2016 会议上宣布的,并且已经讨论 几年了。 ADBA 旨在补充当前的 JDBC API,并提出一种面向高吞吐量程序的异步替代方案(而不是替代方案)。 ADBA 旨在支持流畅的编程风格,并为编写数据库查询提供构建器模式。 ADBA 不是 JDBC 的扩展,并且不依赖于它。准备就绪后,ADBA 很可能将存在于 java.sql2
包中。
ADBA 它是一个异步 API,因此在进行网络调用时不应阻塞任何方法调用。所有潜在的阻塞操作都表示为单独的 ADBA 操作。客户端应用程序构建并提交一个 ADBA 操作或 ADBA 操作图。实现 ADBA 的驱动程序异步执行操作并通过java.util.concurrent.CompletionStage
或回调报告结果。准备就绪后,通过 ADBA 请求问题的异步 SQL 可能如下所示:
请注意,前面的代码仍然基于 ADBA 草案,因此 API 可能随时更改。但是,编号的行表示以下含义:
- 查询在
CompletionStage
中返回结果。在我们的例子中,我们返回一个员工姓名列表。 - 这通过调用
rowOperation
数据库的方法connection
来启动一个新的行操作。 - 该操作允许通过调用
onError
方法来注册错误处理程序。错误处理发生在我们的自定义userDefinedErrorHandler
方法中。
collect
方法允许使用 Java Stream API 收集器收集结果。submit
方法开始处理操作。getCompletionStage
方法为用户提供了一个CompletionStage
的实例,该实例将在处理完成时保存结果。
当然,ADBA 将提供编写和执行 更复杂的数据库查询的能力,包括条件查询、并行查询和依赖查询操作。 ADBA 支持事务。但是,与 JDBC 相比,ADBA 并非旨在由业务代码直接使用(即使有可能),而是旨在为更高级的库和框架提供异步基础。
在撰写本文时,ADBA 只有一个实现,称为 AoJ。 AoJ (https://gitlab.com/asyncjdbc/asyncjdbc/tree/master< /a>) 是一个 experimental 库,它通过在单独的线程池上调用标准 JDBC 来实现 ADBA API 的子集. AoJ 不适合生产使用,但提供了玩 ADBA 的能力,而无需实现成熟的异步驱动程序。
有一些关于 ADBA 不仅可以通过 CompletionStage
而且还可以通过响应式 Publisher
返回结果的能力的对话。来自 Java Flow API。然而,目前尚不清楚 Flow API 将如何集成到 ADBA 中,或者 ADBA 是否会公开一个反应式 API。在撰写本文时,这个话题仍然是热议的话题。
在这一点上,我们必须再次声明,Java 的CompletionStage
所代表的异步行为可能总是被响应式Publisher
实现所替代。但是,这种说法对相反的情况无效。反应性行为可以用CompletionStage
或 CompletableFuture
仅通过一些折衷来表示,即通过降低背压传播。此外,对于 CompletionStage<List<T>>
,没有实际的数据流语义,因为客户端需要等待完整的结果集。利用 Collector API 进行流式传输似乎并不是一个真正的选择。此外, CompletableFuture
一旦提交就开始执行,而 Publisher
只有在提交时才开始执行它收到订阅。用于数据库访问的反应式 API 将同时满足反应式语言级 API 和异步语言级 API 的需求。这是因为任何响应式 API 都可以快速转换为异步 API,而无需任何语义妥协。但是,在大多数情况下,异步 API 可能会成为反应式 API,但需要做出一些妥协。这就是为什么从本书作者的角度来看,支持 Reactive Streams 的 ADBA 似乎比只支持异步的 ADBA 更有益。
Java 中下一代数据访问 API 的替代候选者称为 R2DBC。它提供的不仅仅是完全异步的 ADBA,并且证明了用于关系数据访问的反应式 API 具有巨大的潜力。所以,让我们更仔细地看看它。
反应式关系数据库连接
反应式关系数据库连接 (R2DBC) (https://github.com/r2dbc) 是一个探索完全反应式数据库 API 可能看起来像。 Spring Data 团队领导 R2DBC 计划,并使用它在 reactive 应用程序中的响应式数据访问上下文中探索和验证想法. R2DBC 在 Spring OnePlatform 2018 会议上公开宣布。 R2DBC 的目标是定义一个响应式 database 访问 API,支持背压。 Spring Data 团队在响应式 NoSQL 持久性方面获得了一些出色的经验,因此他们决定提出他们对真正响应式语言级数据访问 API 的设想。此外,R2DBC 可能成为 Spring Data 中关系响应式存储库的底层 API。在撰写本文时,R2DBC 仍处于试验阶段,尚不清楚它是否或何时可能成为生产就绪软件。
R2DBC 项目由以下部分组成:
- R2DBC Service Provider Interface (SPI) defines the minimalistic API for driver implementations. The API is concise in order to drastically trim down the API that driver implementors had to conform to. The SPI is not intended for direct use from the application code. Instead, a dedicated client library is required for that purpose.
- R2DBC Client offers a human-friendly API and helper classes that translate user requests to the SPI level. This separate level of abstraction adds some comfort when using R2DBC directly. The authors highlight that R2DBC Client does for R2DBC SPI the same things as the Jdbi library does for JDBC. However, anyone is free to use SPI directly or implement their own client library over R2DBC SPI.
- R2DBC PostgreSQL Implementation provides a R2DBC driver for PostgreSQL. It uses the Netty framework for asynchronous communication via the PostgreSQL wire protocol. Backpressure may be achieved either by TCP Flow Control or by a PostgreSQL feature called portal, which is effectively a cursor into a query. The portal translates perfectly to Reactive Streams. It is important to note that not all relational databases have the wire protocol features required for a proper backpressure propagation. However, at least TCP Flow Control is available in all cases.
R2DBC 客户端允许使用 PostgreSQL 数据库,如下所示:
让我们描述一下前面的代码:
- 首先,我们要配置一个以
PostgresqlConnectionFactory
类为代表的连接工厂。配置很简单。 - 我们必须创建一个
R2dbc
类的实例, 它提供了响应式API。 R2dbc
允许我们通过应用inTransaction
方法来创建一个事务。handle
包装了反应式连接的一个实例,并提供了额外的便利 API。在这里,我们可能会执行一条 SQL 语句,例如插入一个新行。执行
方法接收 SQL 查询及其参数(如果有)(3.1)
。反过来,execute
方法返回受影响的行数。在前面的代码中,我们记录了更新的行数(3.2)
。- 插入一行后,我们启动另一个事务来选择所有书名
(4.1)
。当结果到达时,我们映射各个行(4.2)
通过应用map
函数(4.3)
,了解行结构。在我们的例子中,我们检索String
类型title
>(4.4) 。mapResult
方法返回Flux
类型。 - 我们以反应方式记录所有
onNext
信号。每个信号都有一个书名,包括在步骤(3)
插入的书名。
正如我们所见,R2DBC Client 提供了一个具有响应式风格的流畅 API。这个 API 在响应式应用程序的代码库中感觉非常自然。
将 R2DBC 与 Spring Data R2DBC 一起使用
当然,Spring Data 团队无法抗拒实现ReactiveCrudRepository
接口位于 R2DBC 之上。在撰写本文时,此实现存在于 Spring Data JDBC 模块中,本章已对此进行了描述。但是,它将获得自己的名为 Spring Data R2DBC 的模块。
SimpleR2dbcRepository
类使用R2DBC实现ReactiveCrudRepository
接口。值得注意的是 SimpleR2dbcRepository
类没有使用默认的R2DBC客户端,而是定义了自己的客户端来使用R2DBC SPI。
在 Spring Data R2DBC 启动之前,Spring Data JDBC 模块中的响应式支持位于项目的 r2dbc
Git 分支中,因此尚未准备好投入生产.然而,支持 R2DBC 的 Spring Data JDBC 模块展示了使用 ReactiveCrudRepository
进行关系数据操作的巨大潜力。所以,让我们定义我们的第一个 ReactiveCrudRepository
for PostgreSQL。它可能如下所示:
到目前为止,Spring Data JDBC 模块没有自动配置,所以我们必须手动创建 BookRepository
接口的实例:
在前面的代码中,我们执行以下步骤:
- 我们需要对
PostgresqlConnectionFactory
的引用,它是在前面的示例中创建的。 TransactionalDatabaseClient
启用对事务的基本支持。- 为了将行映射到实体,我们必须创建一个简单的
RelationalMappingContext
,反之亦然。
- 我们创建一个适当的存储库工厂。
R2dbcRepositoryFactory
类知道如何创建ReactiveCrudRepository
。 - 工厂生成
BookRepository
接口的实例。
现在,我们可以在正常的响应式工作流程中使用我们的完全响应式 BookRepository
,如下所示:
尽管 R2DBC 项目仍处于试验阶段,并且它对 Spring Data JDBC 的支持,我们可能会看到真正的响应式数据访问并不是很遥远。此外,背压的问题在R2DBC SPI层面得到解决。
在这一点上,尚不清楚 ADBA 是否会获得响应式支持,或者 R2DBC 是否会成为 ADBA 的响应式替代品。然而,在这两种情况下,我们相信反应式关系数据访问将很快成为现实,至少对于具有 ADBA 或 R2DBC 兼容驱动程序的数据库而言。
将同步存储库转换为反应式
尽管 Spring Data 为流行的 NoSQL 数据库提供了 reactive 连接器,但响应式应用程序有时需要查询没有响应式连接的数据库.将任何阻塞通信包装到反应式 API 中是可能的。但是,所有阻塞通信都应该发生在适当的线程池上。如果没有,我们可能会阻塞应用程序的事件循环并完全停止它。请注意,一个小线程池(带有一个有界队列)可能会在某个时候耗尽。一个完整的队列在某个时候变成了阻塞模式,而使它成为非阻塞的整个点就消失了。这样的解决方案不如完全被动的解决方案有效。但是,在响应式应用程序中,使用专用线程池来阻塞请求的方法通常是可以接受的。
假设我们必须实现一个响应式微服务,该微服务不时向关系数据库发出请求。该数据库具有 JDBC 驱动程序,但没有任何异步或反应驱动程序。在这种情况下,唯一的选择是构建一个反应式适配器,将阻塞请求隐藏在反应式 API 后面。
如前所述,所有阻塞请求都应发生在专用调度程序上。调度程序的底层线程池定义了阻塞操作的并行级别。例如,在 Schedulers.elastic()
上运行阻塞操作时,concurrent 的数量请求不受限制,因为elastic
调度程序没有绑定创建的线程池的最大数量。同时,Scheduler.newParallel("jdbc", 10)
定义了pooled worker的数量,所以不会超过10个并发请求同时发生。当通过固定大小的连接池与数据库进行通信时,这种方法效果很好。在大多数情况下,将线程池的大小设置为大于连接池的大小是没有意义的。例如,对于在无限线程池上运行的调度程序,当连接池耗尽时,新任务及其正在运行的线程将不会被网络通信阻塞,而是在从连接池中检索连接的阶段。
在选择适当的阻塞 API 时,有几个选项。每个选项都有其优点和缺点。在这里,我们将介绍 rxjava2-jdbc
库并了解如何包装预先存在的阻塞存储库。
使用 rxjava2-jdbc 库
David Moten 的 rxjava2-jdbc
库 (https ://github.com/davidmoten/rxjava2-jdbc) 的创建是为了以不阻塞 reactive 应用程序。该库基于 RxJava 2 构建,使用专用线程池和非阻塞连接池的概念。因此,请求在等待空闲连接时不会阻塞线程。一旦连接可用,查询就会在连接上开始执行 并阻塞线程。应用程序可能不会管理用于阻塞请求的专用调度程序,因为库会这样做。此外,该库有一个流畅的 DSL,它允许我们发出 SQL 语句并以反应流的形式接收结果。 让我们定义 Book
实体并正确注释它以与 rxjava2-jdbc
:
在前面的代码中,我们可以看到以下内容:
- 我们定义了
Book
接口。请注意,使用 Spring Data,我们通常将实体定义为类。 - 访问器方法用
@Column
注解修饰。注释有助于将行列映射到实体字段。 - 通过
@Query
注解,我们定义了用于实体检索的SQL语句。
现在让我们定义一个简单的 repository 来查找在特定时期出版的书籍:
我们来描述RxBookRepository
类的实现如下:
- 由于图书馆不能自动生成查询,我们必须提供搜索所需书籍的 SQL 查询。 SQL 查询中允许使用命名参数。
- 数据库初始化需要 JDBC URL 和池大小。在我们的例子中,最多可以同时运行 25 个并发查询。
findByYearBetween
方法使用 RxJava 2 库中的反应类型(Flowable
andSingle
),而不是来自 Project Reactor。 这是因为rxjava2-jdbc
库在内部使用 RxJava 2.x 并通过其 API 公开 RxJava 类型。但是,很容易将 RxJava 类型转换为 Project Reactor 中的类型。在(3.1)
我们订阅了解析请求参数的流。然后我们调用select
方法(3.2)
填写查询参数(3.3 ) 。
autoMap
方法将 JDBC 行转换为Book
实体。autoMap
方法返回Flowable<Book>
, 相当于Project Reactor的通量
。
rxjava2-jdbc
库支持 大多数 JDBC 驱动程序。此外,该库对事务有一些支持。事务中的所有操作都必须在同一个连接上执行。事务的提交/回滚自动发生。
rxjava2-jdbc
库很简洁,减少了一些潜在的线程块,并且可以响应式地使用关系数据库。然而,到目前为止,它仍然是新的,可能无法处理复杂的反应式工作流程,尤其是那些涉及事务的工作流程。 rxjava2-jdbc
库也需要所有 SQL 查询的定义。
包装一个同步的 CrudRepository
有时我们可能已经有一个CrudRepository
实例,其中包含所有必需的机制数据访问(无需手动查询或实体映射)。但是,我们不能直接在反应式应用程序中使用它。在这种情况下,很容易编写我们自己的反应式适配器,它的行为类似于 rxjava2-jdbc
库,但在存储库级别。应用此方法时请谨慎使用 JPA。使用延迟加载时,我们很快就会遇到代理问题。因此,假设我们有以下由 JPA 定义的Book
实体:
此外,我们还有以下 Spring Data JPA 存储库:
- 它扩展了
CrudRepository
接口并继承了所有的数据访问方法。 BookJpaRepository
定义了一个根据命名约定生成查询的方法。BookJpaRepository
使用自定义 SQL 定义方法。
BookJpaRepository
接口可以很好地与阻塞 JPA 基础设施配合使用。 BookJpaRepository
repository 的所有方法都返回非反应类型。要将BookJpaRepository
接口包装到反应式API中并接收其大部分功能,我们可以定义一个抽象适配器并使用其他方法对其进行扩展以映射findByIdBetween
和findShortestTitle
方法。抽象适配器可以被重用于适配任何CrudRepository
实例。适配器可能如下所示:
让我们描述一下前面的代码:
ReactiveCrudRepositoryAdapter
是一个抽象类,它实现了ReactiveCrudRepository
接口,并具有与委托存储库相同的泛型类型。ReactiveCrudRepositoryAdapter
使用CrudRepository
类型的底层delegate
。 ;此外,适配器需要Scheduler
实例来卸载事件循环中的请求。调度器的并行度定义了并发请求的数量,因此使用与我们用于连接池配置相同的数量是很自然的。然而,最好的映射并不总是一对一的。如果连接池用于其他用途,可用连接数可能少于可用线程数,部分线程可能在等待连接时被阻塞(rxjava2-jdbc
句柄这样的场景更好)。- 这是阻塞
save
方法的反应式包装方法。阻塞调用被包装到Mono.fromCallable
operator(3.1)
并卸载到专用的调度程序
(3.2)
。 - 这是
findById
方法的反应式适配器。首先,该方法订阅id
stream(4.1)
。如果值到达(4.2)
,则委托
实例 被调用(4.3) 。
CrudRepository.findById
方法返回可选
,所以需要将值映射到一个单声道
实例(4.4)
。如果接收到一个空的Optional
,返回emptyMono
(4.5)
.当然,执行被卸载到专用的调度器
。
- 这是
deleteAll
方法的反应式适配器。由于deleteAll(Publisher
和 deleteAll(Iterator
方法的语义不同,我们不能将一个响应式调用直接映射到一个阻塞调用中。例如,实体流是无穷无尽的,因此永远不会删除任何项目。因此, deleteAll
方法订阅实体(5.1)
并发出一个单独的委托.delete(T entity)
request(5.2)
对于每个他们。由于删除请求可以并行运行,每个请求都有自己的subscribeOn
调用来接收来自调度程序
(5.3)
。deleteAll
方法返回一个输出流,当传入流终止并且所有删除操作都完成时,该输出流完成。ReactiveCrudRepository
接口的所有方法都应该这样映射。
现在,让我们在具体的 reactive 存储库实现中定义缺少的自定义方法:
RxBookRepository
类扩展了抽象ReactiveCrudRepositoryAdapter
类,引用了BookJpaRepository
和< code class="literal">Scheduler实例,并定义如下方法:
findByIdBetween
方法接收两个反应流并使用zip
操作(1.1)订阅它们 。当两个值都准备好
(1.2)
时,在delegate
实例上调用相应的方法(1.3)
并且阻塞执行被卸载到专用的调度器
。但是,也可以卸载lowerPublisher
和upperPublisher
流的分辨率,这样事件循环就不会在那里消耗资源(1.5)
。小心这种方法,因为它可能会与实际的数据库请求争夺资源并降低吞吐量。findShortestTitle
方法调用对应的方法(2.1)
在专用调度器(2.2 )
并将Iterable
映射到Flux
(2.3)
。
现在,我们最终可以使用以下代码将阻塞BookJpaRepository
包装到响应式RxBookRepository
中:
并非所有阻塞特征都可以如此轻松地映射。例如,JPA 延迟加载 很可能 被描述的方法破坏。此外,事务的支持将需要类似于 rxjava2-jdbc
库中的额外工作。或者,我们需要在 granularity 包装同步操作,其中没有事务扩展超过一个阻塞调用。
此处描述的方法不会神奇地将阻塞请求转换为反应式非阻塞执行。一些构成JPA
调度器的线程 仍然 被阻塞。然而,对调度程序的详细监控和明智的池管理应该有助于在应用程序的性能和资源使用之间建立一个可接受的平衡。
反应式 Spring 数据在行动
为了结束本章并强调响应式持久性的好处,让我们创建一个必须进行通信的数据密集型 响应式应用程序经常使用数据库。例如,让我们重温 Chapter 6 中的示例, WebFlux Async Non-阻止通信。在那里,我们为 实现了一个替代的只读Web 前端application Gitter 服务(https://gitter.im)。应用程序连接到预定义的聊天室,并通过 Server-Sent Events (SSE)。现在,有了新的要求,我们的 应用程序 必须收集有关聊天室中最活跃和被引用次数最多的用户的统计信息。我们的聊天应用程序可能使用 MongoDB 来存储消息和用户配置文件。此信息也可用于统计重新计算目的。 下图描述了应用程序设计:
图 7.18 使用响应式 Spring Data 访问 MongoDB 的聊天应用程序
在上图中,编号点如下:
- 这是一个 Gitter 服务器,它可以通过 SSE 从特定聊天室流式传输消息。这是一个外部系统,我们的应用程序从中接收所有数据。
- 这是 UI 服务,它是我们应用程序的一部分。该组件将来自 Gitter 的消息和最近的统计信息流式传输到客户端,这些客户端是在浏览器中运行的 Web 应用程序。 UI 服务 使用 WebFlux 模块通过 SSE 以反应方式传输数据。
- 这是 聊天服务, 它使用响应式
WebClient
来监听传入的消息来自 Gitter 服务器。收到的消息通过 UI Service 广播到WebClient
并流式传输到 统计服务。 - 统计服务 持续跟踪最活跃和被提及最多的用户。统计信息通过 UI 服务 不断地流式传输到 Web 客户端。
- 用户存储库 是一个反应存储库,它与 MongoDB 通信以存储和检索有关聊天参与者的信息。它是使用 Spring Data MongoDB Reactive 模块构建的。
- Message Repository 是一个反应式存储库,允许存储和搜索聊天消息。它也是用 Spring Data MongoDB Reactive 构建的。
- 我们选择 MongoDB 作为我们的存储,因为它适合应用程序的需求,并且在 Spring Data 中还具有响应式驱动程序和良好的响应式支持。
因此,application 中的数据流是恒定的,不需要任何阻塞调用。在这里,聊天服务通过 UI 服务广播消息,统计服务接收聊天消息,重新计算统计后,将消息发送给 UI 服务。 WebFlux 模块负责所有网络通信,Spring Data 使得插入 MongoDB 交互而不中断反应流成为可能。在这里,我们省略了大部分的实现细节。但是,让我们看看统计服务。这可能如下所示:
下面分析StatisticService
类的实现:
StatisticService
类引用了UserRepository
和MessageRepository
,它们提供了响应式与 MongoDB 集合通信。updateStatistic
方法流式传输由UsersStatistic
视图模型对象表示的统计事件。同时,该方法需要由messagesFlux
方法参数(2.1)
表示的传入聊天消息流。该方法订阅ChatMessage
对象的Flux
,将它们转换为所需的表示(2.2 )
,并使用messageRepository
(2.3)
将它们保存到 MongoDB。retryBackoff
运算符有助于克服潜在的 MongoDB 通信问题(2.4)
。此外,如果订阅者无法处理所有事件,我们会丢弃旧消息(2.5)
。通过应用concatMap
运算符,我们通过调用doGetUserStatistic
方法开始统计重估过程>(2.6) 。
我们为此使用 concatMap
,因为它保证了统计结果的正确顺序。这是因为运营商在生成下一个子流之前等待内部子流完成。此外,在统计重新计算中,我们通过应用 errorStrategyContinue
运算符 (2.7)
忽略所有错误,因为这部分应用程序并不重要,可以容忍一些临时问题。
doGetUserStatistic
辅助方法计算排名靠前的用户。为了计算最活跃的用户,我们在userRepository
(3.1) 上调用
,将结果映射到正确的类型findMostActive
方法(3.2)
,在没有找到用户的情况下,我们返回预定义的EMPTY_USER
(3.3)
。同样,为了获取最受欢迎的用户,我们在存储库(3.4)
上调用findMostPopular
方法,映射结果(3.5)
,并根据需要设置默认值(3.6)
。Mono.zip
运算符有助于合并这两个响应式请求并生成UsersStatistic
类的新实例。timeout
操作符设置maximum 可用于统计重新计算的时间预算。
使用这个优雅的代码,我们可以轻松地混合来自 WebClient
对象的传入消息流,即由 WebFlux 模块处理的 SSE 事件的传出流。当然,我们还通过响应式 Spring Data 将 MongoDB 查询处理纳入我们的响应式管道中。此外,我们没有阻塞此管道中的任何线程。 因此,我们的应用程序非常有效地利用了服务器资源。
概括
在本章中,我们学到了很多关于现代应用程序中的数据持久性的知识。我们已经描述了微服务架构中数据访问的挑战,以及多语言持久性如何帮助构建具有所需特征的服务。我们还概述了实现分布式事务的可用选项。本章介绍了用于数据持久性的阻塞和响应式方法的优缺点,以及每个现代阻塞数据访问级别缺少的响应式替代方案。
在本章中,我们描述了 Spring Data 项目如何优雅地将响应式数据访问引入现代 Spring 应用程序。我们研究了反应式 MongoDB 连接器和 Cassandra 连接器的特性和实现细节。我们还介绍了 MongoDB 4 对多文档事务的支持。本章揭示了下一代语言级反应式数据库 API 的可用选项,即 ADBA 和 R2DBC。我们已经探索了这两种方法的缺陷和好处,并研究了 Spring Data 如何使用新的 Spring Data JDBC 模块支持关系数据库的反应式存储库。
我们还介绍了将阻塞驱动程序或存储库集成到反应式应用程序中的当前选项——我们学到了很多东西!然而,我们只触及了数据持久性的皮毛,因为这是一个庞大的主题,不可能在一章内涵盖。
在本章的开头,我们提到了数据库的双重性质——静态数据存储和带有数据更新的消息流。下一章将探讨 Kafka 和 RabbitMQ 等消息系统上下文中的反应式系统和反应式编程。