浅谈 RSocket 与响应式编程
作者 | 素渡
一 RSocket 的主要特性
首先,RSocket是高效一个二进制的网络通讯协议,能够满足很多场景下使用。其次,RSocket是一个激进的响应式捍卫者,激进到连API都跟响应式无缝集成。
1 四种通讯模式
即发即忘 FireAndForget
立即发送一个请求,无需为这个请求发送响应报文。适用于监控埋点,日志上报等,这种场景下无需回执,丢失几个请求无伤大雅。
请求响应 RequestResponse
请求方发送一条请求消息,响应方收到请求后并返回一条响应消息。传统的HTTP是典型的RequestResponse。
流 RequestStream
请求方发送一个请求报文,响应方发回N个响应报文。传统的MQ是典型的RequestStream。
通道 RequestChannel
创建一个通道上下文,双方可以互相发送消息。IM是个典型的RequestChannel通讯场景。
2 双向通讯 Bi-Directional
RSocket的Client连接到Server,这个过程称为Setup,在连接成功后,会约定收发消息的方向逻辑:
-
当Client请求Server时,发送的请求ID永远为奇数
-
当Server请求Client时,发送的请求ID永远为偶数
3 其他
-
二进制协议,紧凑高效
-
多路复用
-
基于帧(Frame)的背压,与ReactiveStreams语义契合
-
灵活的传输层切换: TCP/UDP/WebSocket等
-
支持Cancel、断点续传、租约等高级特性
二 RSocket 的内部实现
1 帧的设计
-
一个帧由6 bytes的Header和剩余的Body构成,其中Header的4 bytes表示 StreamID,6 bits表示Frame Type, 10 bits作为Flags。Body根据不同的帧类型,结构也不同,常用的带Payload的帧一般会包括Metadata和Data两个部分。
-
传输层如果本身不支持分帧特性的(如TCP),那么RSocket会用3 bytes的uint24表示帧长度,所以最大的帧大小是16MB。
-
如果帧超出16MB,RSocket支持帧分裂重组,也就是拆成更小的帧,接收端再自动重组。
2 数据载体——Payload
-
Metadata——元数据,类似HTTP的header
-
Data——数据,类似HTTP的body
3 架构
-
Transport层将网络二进制流编解码为Frames。
-
RSocket支持自定义最大Frame Size,默认16MB,当某个Frame超出时,会被拆解为N个小Frame,收到时再重组,在介绍帧的时候也提到了,这个特性称为Fragmentation。
-
DuplexConnection转换Frames为Payload,抽象为一个个Request/Response上下文,并负责读写。
-
RSocket组装Connection为RSocket Interface,其中Resumable支持断点续传,连接断开重连也能自愈,个人觉得这个特性有点鸡肋,在弱网环境有些优势,但是因为期间会缓存住未处理完毕的帧,所以会耗费大量的系统资源。
-
RSocket使用Reactor核心库暴露为4种通讯模式,抽象为高级API。
4 玩法
5 关于 RSocket Broker
三 响应式编程
1 响应式编程大概长这样
2 Reactive Streams
-
Publisher:发布者,负责生产数据。唯一的方法subscribe,接收一个Subscriber开始一次新的订阅。
-
Subscriber:订阅者,负责订阅消费数据。
-
Subscription:订阅,某次订阅的上下文控制,如取消、通知获取下N条数据。
-
创建subscriber,开始订阅Publisher。
-
生成上下文subscription。
-
Publisher就绪,调用onSubscribe。
-
Publisher开始生产数据。
-
每条成功生产的数据回调onNext。
-
当生产失败时,回调onError并结束当前订阅。
-
当所有数据生产完毕时,回调onComplete并结束当前订阅。
-
中途可以调用subscription随时cancel取消订阅,或者通过request(n)通知生产下N个元素,这个过程即背压。
四 总结
https://github.com/jjeffcaii/rsocket-minibroker
https://github.com/rsocket/rsocket-go
https://github.com/rsocket/rsocket-rust