vlambda博客
学习文章列表

在华师学编码:《Spring响应式微服务...》第二章 响应式模型和Reactor框架


本文主要包括

  1. 响应式模型:流、背压、响应式流

  2. Java响接口

  3. Flux、Mono组件创建

  4. Mono组件、Flux组件操作符

  5. Reactor框架中的背压机制。


前言:

本章在于介绍响应式框架的理论,而后通过 Reactor 框架的使用,初步掌握其原理,为后续继续使用Reactor 讲解,打好基础。
书中,以 Mono、Flux 介绍、创建、操作符使用,分为三个小节,大量借鉴官网教程图片。本推文通过组件相互分离,设置小标题,如上所示先介绍 Mono组件,而后介绍Flux组件。最后介绍 背压机制
本章看似内容很多,实则讲得比较浅(停留在使用层面,未深入原理)大多借鉴官网API 文档,做了一系列收集整理,可以通过阅读,进行实验,加强记忆。

1.响应式模型:流、背压、响应式流

此处传统编程模型返回List<>集合,而响应式编程模型返回的是 Flux<>集合,这就是响应式编程模型的最显著特点区别。
1)流
概念:简单的说,生产者生产,由消费者消费的一个序列,可称为Source/Sink模型,或发布/订阅模型。
实现方式有两种: 推模型:生产者推送给消费者处理。 拉模型:消费者处理完成,主动拉取。
处理方式有两种: 同步处理:消费者处理完成,再允许发送,形成阻塞。 异步处理:生产者给消费者发送消息之后,可以继续往下异步执行。
生产速度大于消费速度需要进行流量控制:
节流(Throttling):丢弃无法处理元素
使用缓冲区(Buffer):保存转发机制,使生产的数据保存起来,等待消费者逐一处理
调用栈阻塞:最简单就是使用同步线程:未消费完成不处理新的产生数据。
背压:消费者主动告知生产者生产需求。即消费者需要多少,生产多少。

2)背压
上文介绍到,背压就是由消费者主动告知生产者的一种流量控制方法。即能够 向上游反馈流量请求的机制。
实现方式有两种:
阻塞式背压。即消费者正在执行时,无法进行生产;存在缺点,如果生产的消息需要多个消费者处理,而消费者消费速度不一致,导致系统性能下降达不到降压目的。
非阻塞背压:消费者采用拉策略,消费者要求生产一定的消息量,最多只能生产这些量,而后进入等待。

3)响应式流
响应式是一种规范,表现在编码中就是一批被预设定义好的接口。
响应式流规范:是一种非阻塞的背压异步处理流的一种倡议,在响应式流模型中,消费者向生产者发送异步请求,生产者根据异步请求发送 合适数量的元素。

2.Java响应式流接口

1 )响应式流的接口:
Java 中响应式流,有4个接口 Publisher<T>、Subscriber<T>、Subscription 和Processor<T, R>

在华师学编码:《Spring响应式微服务...》第二章 响应式模型和Reactor框架

Publisher<T>发布者:通过调用subscribe 方法发送一个Subscriber 对象。
public interface Publisher<T> { void subscribe(Subscriber<? super T> var1);}
Subscriber<T>订阅者,定义4个方法分别是:接收一个对象,下一个对象,遇到异常时候调用、完成时的调用方法
public interface Subscriber<T> { void onSubscribe(Subscription var1);
void onNext(T var1);
void onError(Throwable var1);
void onComplete();}
Subscription 订阅,订阅者发布令牌,发布者与订阅者的沟通传递类。发布者发起请求调用request 方法 ,取消则用cancel 方法
public interface Subscription { void request(long var1);
void cancel();}
Processor<T, R>处理器 ,是发布者订阅者的处理媒介,可以看到继承了发布者,订阅者两个接口,便于相互转换
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}
4个接口相互调用流程,共有7步,如下图所示
在华师学编码:《Spring响应式微服务...》第二章 响应式模型和Reactor框架
2)Java 中的异步
Java 中实现异步主要有2种形式回调(Callback)和Future
回调(Callback):在执行A类methonA 调用 B类中的methonB, 在methonB 执行完成后主动,调用A类的callback 方法,类似于Runnable ,线程执行完成后有返回值。
Future:表示一个异步任务,可以通过相关接口获取Future 的运行结果。接口方法如下,分别是取消、是否取消、是否完成、获取结果
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;}
3)响应式编程的主流实现技术:
RxJava :Reactive Extension(Rx) 是一个类库,基于可观察序列的事件驱动,最早应用于.NET平台。目前拥有RxJava1.x 和2.x 两个版本。由于2.x 需要兼容1.x 版本,导致很多地方使用不直观。
Akka Streams :运行在JVM 上,构建的高并发分布式、高弹性消息驱动工具套件。Actor是Akka 最核心的概念。封装了状态和行为的对象,通过Actor 简化锁及线程管理。Akka Stream 是以Akka为基础的响应式流的实现
Vert.x 是Eclipse 基金会下开源的Java 工具,用来构建高并发、异步、可伸缩、多语言的web 应用程序。包含Vert.x Reactive Streams 工具库 提供可读流和可写流
Project Reactor :是Spring5 引入的响应式编程机制,Reactor 诞生较晚,属于第二代响应式开发框架,类似于RxJava,但是没有RxJava 的历史包袱。其中Flux和Mono 是Reactor的核心组件。

3.Flux、Mono组件创建

框架的官网(https://projectreactor.io/)
文档位置:
https://projectreactor.io/docs/core/release/reference/#mono
Mono文档地址:
https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html
Flux文档地址
https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html
要使用Reactor框架需要引入对应的依赖,以Maven为例:
可以直接使用前面搭建的项目,运用Test 直接调试,进行实验。
 <dependency> <groupId>io.projectreactor</groupId>      <artifactId>reactor-core</artifactId> </dependency>  <dependency> <groupId>io.projectreactor</groupId>      <artifactId>reactor-test</artifactId> </dependency>
Flux 表示 包含 0个或多个 元素的异步序列
Mono 表示包含 0个或1个元素的异步序列


1)Flux创建方法
静态创建方法:
just(),直接指定Flux 对象中的内容
Flux<String> seq1 = Flux.just("foo", "bar", "foobar");
fromArray() fromIterable() fromStream()  数组、 迭代器、流对象,创建。
List<String> iterable = Arrays.asList("foo", "bar", "foobar");Flux<String> seq2 = Flux.fromIterable(iterable);
empty() error() never() 加入空对象、错误对象、不包含任何对象  
Flux<String> noData = Flux.empty();
range(x,y),创建从x开始,y个对象; interval() 创建时间延迟,以及起始元素发布之前的演出时间。
Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3);Flux.interval(Duration.ofMillis(300))
动态创建方法:

generate() :利用SynchronousSink组件,包括next() complete()error(Throwable) 三个核心方法

Flux.generate(sink -> { sink.next("Hello"); sink.complete();});
create();使用FluxSink组件,后续细讲区别
Flux.create(sink -> { sink.next("Hello"); sink.complete();});

Mono创建方法,除了just()  empty() error() never() 还有特定的delay() justOrEmpty();

delay()会在延迟时间后创建数字为0作为唯一值。

Mono.delay(Duration.ofMinutes(1));

justOrEmpty()如果不为空则加入,否则加入一个空对象。

Mono<String> hello = Mono.justOrEmpty(Optional.of("hello"));

4.Mono组件、Flux组件操作符

操作符主要分为7类:

转换操作符:

buffer:指定数量进行分割收集

bufferWhile:条件为true 则分组收集

bufferUntil:条件为true则分组,实例代码如下

Flux.range(1, 20).buffer(5).subscribe(System.out::println);System.out.println("------");Flux.range(1, 10).bufferUntil(i -> i % 2 == 0).subscribe(System.out::println);System.out.println("------");Flux.range(1, 10).bufferWhile(i -> i % 2 == 0).subscribe(System.out::println);

运行结果

在华师学编码:《Spring响应式微服务...》第二章 响应式模型和Reactor框架

map()将每一个元素映射到函数,便于处理

flatMap()将每一个元素转换为流,并进行合并

Flux.range(1, 5).map(i -> i * i).subscribe(System.out::println);;System.out.println("------");Flux.range(1, 5).flatMap(i -> Mono.just(i * i)).subscribe(System.out::println);

运行结果如下:

在华师学编码:《Spring响应式微服务...》第二章 响应式模型和Reactor框架

window()与buffer 相似,不过buffer 转换为多个集合,而window 转换为多个Flux 对象

@Testpublic void testwindow() { Flux.range(1, 5).window(2).toIterable().forEach(w -> { w.subscribe(System.out::println); System.out.println("-----"); });}

运行结果:

在华师学编码:《Spring响应式微服务...》第二章 响应式模型和Reactor框架

后续函数不一一展示实验。可以通过查阅API文档进行实验:

过滤操作符:

filter:根据条件过滤;first:获取首个元素;last获取最后元素

skip/skipLast:返回跳过n个元素的集合;take/takeLast返回获取N个元素的集合。

组合操作符:

then/when:then 是上一个操作完成再执行下一个、when是多个一起执行,直到全部执行完再返回、

merge:按照产生顺序进行合并

mergeSequential以流为单位进行合并、

startWith:在队列开头添加指定元素,zipWith 合并时不进行任何操作,得到一个Tuple2流。

条件操作符:

defaultEmpty如果原始数据流没元素则添加一个默认的空元素

skipUntil跳过元素直到遇到true、skipWhile遇到true 跳过元素

takeUntil收集元素直到遇到true、takeWhile遇到true 再收集元素

数学操作符:concat顺序连接、count计算数量、reduce将上一个元素与下一个元素进行函数处理

Observable 工具操作符:

delay:延后一段时间后 

subscribe:添加一个订阅,进行响应,如上文提到的打印 

timeout设置一段时间没有产生时间,生产一个超时异常

日志和调试操作符:

log:打印onNext() onComplete() onError 等日志信息、

debug:

1)启动调试模式

Hooks.onOperatorDebug();
2)设置检查点(checkpoint)
Mono.just(0).map(x -> 1 / x).checkpoint().subscribe(System.out::println);

5.Reactor框架中的背压机制。

Reactor 包含4种处理策略:
ERROR :当下游跟不上节奏时发出错误信号
DROP:当下游没准备好接收新的元素时,抛弃这个元素
LATEST:下游只能获得上游最新的元素
BUFFER:缓存下游没来得及处理的元素

具体使用4个操作符来设置背压策略:onBackpressureXxx
onBackpressureBuffer() 缓存
onBackpressureDrop() 抛弃
onBackpressureError() 报错
onBackpressureLatest() 缓存

具体可以看到Flux 的官方Api 了解具体设置参数。



-END-






点击“在看”,学多少都不会忘~