RSocket 分布式通讯协议是 Spring Reactive 的核心内容,从 Spring Framework 5.2 开始,RSocket 已经是 Spring 的内置功能,Spring Boot 2.3 也添加了 spring-boot-starter-rsocket,简化了 RSocket 的服务编写和服务调用。RSocket 通讯的核心架构中包含两种模式,分别是 Broker 代理模式和服务直连通讯模式。Broker 的通讯模式更灵活,如 Alibaba RSocket Broker,采用的是事件驱动模型架构。而目前更多的架构则是面向服务化设计,也就是我们常说的服务注册发现和服务直连通讯的模式,其中最知名的就是 Spring Cloud 技术栈,涉及到配置推送、服务注册发现、服务网关、断流保护等等。在面向服务化的分布式网络通讯中,如 REST API、gRPC 和 Alibaba Dubbo 等,都与 Spring Cloud 有很好地集成,用户基本不用关心服务注册发现和客户端负载均衡这些底层细节,就可以完成非常稳定的分布式网络通讯架构。
RSocket 作为通讯协议的后起之秀,核心是二进制异步化消息通讯,是否也能和 Spring Cloud 技术栈结合,实现服务注册发现、客户端负载均衡,从而更高效地实现面向服务的架构?这篇文章我们就讨论一下 Spring Cloud 和 RSocket 结合实现服务注册发现和负载均衡。
服务注册发现
服务注册发现的原理非常简单,主要涉及三种角色:服务提供方、服务消费者和服务注册中心。典型的架构如下:
服务提供方,如 RSocket Server,在应用启动后,会向服务注册中心注册应用相关的信息,如应用名称,ip 地址,Web Server 监听端口号等,当然还会包括一些元信息,如服务的分组(group),服务的版本号(version),RSocket 的监听端口号,如果是 WebSocket 通讯,还需要提供 ws 映射路径等,不少开发者会将服务提供方的服务接口列表作为 tags 提交给服务注册中心,方便后续的服务查询和治理。
在本文中,我们采用 Consul 作为服务注册中心,主要是 Consul 比较简单,下载后执行 `consul agent -dev` 就可以启动对应的服务,当然你可以使用 Docker Compose,配置也非常简单,然后 `docker-compose up -d` 就可以启动 Consul 服务。
当我们向服务中心注册和查询服务时,都需要有一个应用名称,对应到 Spring Cloud 中,也就是 Spring Boot 对应的 `spring.application.name` 的值,这里我们称之为应用名称,也就是后续的服务查找都是基于该应用名称进行的。如果你调用 `ReactiveDiscoveryClient.getInstances(String serviceId);` 查找服务实例列表时,这个 serviceId 参数其实就是 Spring Boot 的应用名称。考虑到服务注册和后续的 RSocket 服务路由的配合以及方便大家理解,这里我们打算设计一个简单的命名规范。
假设你有一个服务应用,功能名称为 calculator,同时提供两个服务: 数学计算器服务(MathCalculatorService)和汇率计算器服务(ExchangeCalculatorService), 那么我们该如何来命名该应用及其对应的服务接口名?
这里我们采用类似 Java package 命名规范,采用域名倒排的方式,如 calculator 应用对应的则为 `com-example-calculator` 样式,为何是中划线,而不是点?`.` 在 DNS 解析中作为主机名是非法的,只能作为子域名存在,不能作为主机名,而目前的服务注册中心设计都遵循 DNS 规约,所以我们采用中划线的方式来命名应用。这样采用域名倒排和应用名结合的方式,可以确保应用之间不会重名,另外也方便和 Java Package 名称进行转换,也就是 `-` 和 `.` 之间的相互转换。
那么应用包含的服务接口应该如何命名?服务接口全名是由应用名称和 interface 名称组合而成,规则如下:
String serviceFullName = appName.replace("-", ".") + "." + serviceInterfaceName;
-
com.example.calculator.MathCalculatorService
-
com.example.calculator.ExchangeCalculatorService
而 `com.example.calculator.math.MathCalculatorService` 则是错误的, 因为在应用名称和接口名称之间多了 `math`
。为何要采用这种命名规范?首先让我们看一下服务消费方是如何调用远程服务的。假设服务消费方拿到一个服务接口,如 `com.example.calculator.MathCalculatorService`,那么他该如何发起服务调用呢?
-
首先根据 Service 全面提取处对应的应用名称(appName),如 `com.example.calculator.MathCalculatorService` 服务对应的 appName 则为 `com-example-calculator`。如果应用和服务接口之间不存在任何关系,那么想要获取服务接口对应的服务提供方信息,你可能还需要应用名称,这会相对来说比较麻烦。如果接口名称中包含对应的应用信息,则会简单很多,你可以理解为应用是服务全面中的一部分。
-
调用 `ReactiveDiscoveryClient.getInstances(appName)` 获取应用名对应的服务实例列表(ServiceInstance),ServiceInstance 对象会包含诸如 IP 地址,Web 端口号、RSocket 监听端口号等其他元信息。
-
根据 `RSocketRequester.Builder.transports(servers)` 构建具有负载均衡能力的 RSocketRequester 对象。
-
使用服务全称和具体功能名称作为路由进行 RSocketRequester 的 API 调用,样例代码如下:
rsocketRequester .route("com.example.calculator.MathCalculatorService.square") .data(number) .retrieveMono(Integer.class)
通过上述的命名规范,我们可以从服务接口全称中提取出应用名,然后和服务注册中心交互查找对应的实例列表,然后建立和服务提供者的连接,最后基于服务名称进行服务调用。该命名规范,基本做到到了最小化的依赖,开发者完全是基于服务接口调用,非常简单。
RSocket 服务编写
有了服务的命名规范和服务注册,编写 RSocket 服务,这个还是非常简单,和编写一个 Spring Bean 没有任何区别。引入 `spring-boot-starter-rsocket` 依赖,创建一个 Controller 类,添加对应的 MessagMapping annotation 作为基础路由,然后实现功能接口添加功能名称,样例代码如下:
@Controller @MessageMapping("com.example.calculator.MathCalculatorService") public class MathCalculatorController implements MathCalculatorService { @MessageMapping("square") public Mono<Integer> square(Integer input) { System.out.println("received: " + input); return Mono.just(input * input); } }
上述代码看起来好像有点奇怪,既然是服务实现,添加 @Controller 和 @MessageMapping,看起来好像有点不伦不类的。当然这些 annotation 都是一些技术细节体现,你也能看出,RSocket 的服务实现是基于 Spring Message 的,是面向消息化的。这里我们其实只需要添加一个自定义的 @SpringRSocketService annotation 就可以解决这个问题,代码如下:
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Controller @MessageMapping() public @interface SpringRSocketService { @AliasFor(annotation = MessageMapping.class) String[] value() default {}; }
回到服务对应的实现代码,我们改为使用 @SpringRSocketService annotation,这样我们的代码就和标准的 RPC 服务接口完全一模一样啦,也便于理解。此外 @SpringRSocketService 和 @RSocketHandler 这两个 Annotation,也方便我们后续做一些 Bean 扫描、IDE 插件辅助等。
@SpringRSocketService("com.example.calculator.MathCalculatorService") public class MathCalculatorImpl implements MathCalculatorService { @RSocketHandler("square") public Mono<Integer> square(Integer input) { System.out.println("received: " + input); return Mono.just(input * input); } }
最后我们添加一下 spring-cloud-starter-consul-discovery 依赖,设置一下 bootstrap.properties,然后在 application.properties 设置一下 RSocket 监听的端口和元信息,我们还将该应用提供的服务接口列表作为 tags 传给服务注册中心,当然这个也是方便我们后续的服务管理。样例如下:
spring.application.name=com-example-calculator spring.cloud.consul.discovery.instance-id=com-example-calculator-${random.uuid} spring.cloud.consul.discovery.prefer-ip-address=true server.port=0 spring.rsocket.server.port=6565 spring.cloud.consul.discovery.metadata.rsocketPort=${spring.rsocket.server.port} spring.cloud.consul.discovery.tags=com.example.calculator.ExchangeCalculatorService,com.example.calculator.MathCalculatorService
RSocket 服务应用启动后,我们在 Consul 控制台就可以看到服务注册上来的信息,截屏如下:
RSocket 客户端接入
客户端接入稍微有一点复杂,主要是要基于服务接口全面要做一系列相关的操作,但是前面我们已经有了命名规范,所以问题也不大。客户端应用同样会接入服务注册中心,这样我们就可以获得 `ReactiveDiscoveryClient` bean,接下来就是根据服务接口全名,如 `com.example.calculator.ExchangeCalculatorService` 构建出具有负载均衡的 RSocketRequester。
原理也非常简单,前面说过,根据服务接口全称,获得其对应的应用名称,然后调用 `ReactiveDiscoveryClient.getInstances(appName)` 获得服务应用对应的实例列表,接下来将服务实例(ServiceInstance)列表转换为 RSockt 的 LoadbalanceTarget 列表,其实就是 POJO 转换,最后将转 LoadbalanceTarget 列表进行 Flux 封装(如使用 Sink 接口),传递给 RSocketRequester.Builder 就完成具有负载均衡能力的 RSocketRequester 构建,详细的代码细节大家可以参考项目的代码库。
这里要注意的是接下来如何感知服务端实例列表的变化,如应用上下线,服务暂停等。这里我采用一个定时任务方案,定时查询服务对应的地址列表。当然还有其他的机制,如果是标准的 Spring Cloud 服务发现接口,目前是需要客户端轮询的,当然也可以结合 Spring Cloud Bus 或者消息中间件,实现服务端列表变化的监听。如果客户端感知到服务列表的变化,只需要调用 Reactor 的 Sink 接口发送新的列表即可,RSocket Load Balance 在感知到变化后,会自动做出响应,如关闭即将失效的连接、创建新的连接等工作。
在实际的应用之间的相互通讯,会存在一些服务提供方不可用的情况,如服务方突然宕机或者其网络不可用,这就导致了服务应用列表中部分服务不可用,那么 RSocket 这个时候会如何处理?不用担心,RSocket Load Balance 有重试机制,当一个服务调用出现连接等异常,会重新从列表中获取一个连接进行通讯,而那个错误的连接也会标识为可用性为 0,不会再被后续请求所使用。服务列表推送和通讯期间的容错重试机制,这两者保证了分布式通讯的高可用性。
最后让我们启动 client-app,然后从客户端发起一个远程的 RSocket 调用,截屏如下:
上图中 `com-example-calculator` 服务应用包括三个实例,服务的调用会在这三个服务实例交替进行(RoundRobin 策略)。
开发体验的一些考量
虽然服务注册和发现、客户端的负载均衡这些都完成啦,调用和容错这些都没有问题,但是还有一些使用体验上的问题,这里我们也阐述一下,让开发体验做的更好。
大多数 RPC 通讯都是基于接口的,如 Apache Dubbo、gRPC 等。那么 RSocket 能否做到?答案是其实完全可以。在服务端,我们已经是基于服务接口来实现 RSocket 服务啦,接下来我们只需要在客户端实现基于该接口的调用就可以。对于 Java 开发者来说,这不是大问题,我们只需要基于 Java Proxy 机制构建就可以,而 Proxy 对应的 InvocationHandler 会使用 RSocketRequester 来实现 invoke() 的函数调用。详细的细节请参考应用代码中的的 `RSocketRemoteServiceBuilder.java` 文件,而且在 client-app module 中也已经包含了解基于接口调用的 bean 实现。
使用 RSocketRequester 调用远程接口时,对应的处理函数只能接受单个参数,这个和 gRPC 的设计是类似的,当然也考虑了不同对象序列化框架的支持问题。但是考虑到实际的使用体验,可能会涉及到多参函数的情况,让调用方开发体验更好,那么这个时候该如何处理?其实从 Java 1.8 后,interface 是允许增加 default 函数的,我们可以添加一些体验更友好的 default 函数,而且还不影响服务通讯接口,样例如下:
public interface ExchangeCalculatorService { double exchange(ExchangeRequest request); default double rmbToDollar(double amount) { return exchange(new ExchangeRequest(amount, "CNY", "USD")); } }
通过 interface 的 default method,我们可以为调用方提供给便捷函数,如在网络传输的是字节数组 (byte[]),但是在 default 函数中,我们可以添加 File 对象支持,方便调用方使用。Interface 中的函数 API 负责服务通讯规约,default 函数来提升使用方的体验,这两者的配合,可以非常容易解决函数多参问题,当然 default 函数在一定程度上还可以作为数据验证的前哨来使用。
前面我们说到,RSocket 还有一种 Broker 架构,也就是服务提供方是隐藏在 Broker 之后的,请求主要是由 Broker 承接,然后再转发给服务提供方处理,架构样例如下:
那么基于服务发现的机制负载均衡,能否和 RSocket Broker 模式混合使用呢?如一些长尾或者复杂网络下的应用,可以注册到 RSocket Broker,然后由 Broker 处理请求调用和转发。这个其实也不不复杂,前面我们说到应用和服务接口命名规范,这里我们只需要添加一个应用名前缀就可以解决。假设我们有一个 RSocker Broker 集群,暂且我们称之为 broker0 集群,当然该 broker 集群的实例也都注册到服务注册中心(如 Consul)啦。那么在调用 RSocket Broker 上的服务时,服务名称就被调整为 `broker0:com.example.calculator.MathCalculatorService`,也就是服务名前添加了 `appName:` 这样的前缀,这个其实是 URI 的另一种规范形式,我们就可以提取冒号之前的应用名,然后去服务注册中心查询获得应用对应的实例列表。
回到 Broker 互通的场景,我们会向服务注册中心查询 broker0 对应的服务列表,然后和 broker0 集群的实例列表创建连接,这样后续基于该接口的服务调用就会发送给 Broker 进行处理,也就是完成了服务注册发现和 Broker 模式的混合使用的模式。
借助于这种定向指定服务接口和应用间的关联,也方便我们做一些 beta 测试,如你想将 `com.example.calculator.MathCalculatorService` 的调用导流到 beta 应用,你就可以使用 `com-example-calculator-beta1:com.example.calculator.MathCalculatorService` 这种方式调用服务,这样服务调用对应的流量就会转发给 `com-example-calculator-beta1` 对应的实例,起到 beta 测试的效果。
回到最前面说到的规范,如果应用名和服务接口的绑定关系你实在做不到,那么你可以使用这种方式实现服务调用,如 `calculator-server:com.example.calculator.math.MathCalculatorService`,只是你需要更完整的文档说明,当然这种方式也可以解决之前系统接入到目前的架构上,应用的迁移成本也比较小。如果你之前的面向服务化架构设计也是基于 interface 接口通讯的,那么通过该方式迁移到 RSocket 上完全没有问题,对客户端代码调整也最小。
通过整合服务注册发现,结合一个实际的命名规范,就完成了服务注册发现和 RSocket 路由之间的优雅配合,当然负载均衡也是包含其中啦。对比其他的 RPC 方案,你不需要引入 RPC 自己的服务注册中心,复用 Spring Cloud 的服务注册中心就可以,如 Alibaba Nacos, Consul, Eureka 和 ZooKeeper 等,没有多余的开销和维护成本。如果你想更多了解 RSocket RPC 相关的细节,可以参考 Spring 官方博客 。
欢迎加入 alibaba-rsocket-broker 钉钉群:
更多详细的代码细节,可以点击【阅读原文】查看文章对应的代码库!