在华师学编码:《Spring响应式微服务...》第二章 响应式模型和Reactor框架
本文主要包括
响应式模型:流、背压、响应式流
Java响应式流接口
Flux、Mono组件创建
Mono组件、Flux组件操作符
Reactor框架中的背压机制。
前言:
1.响应式模型:流、背压、响应式流
2.Java响应式流接口
public interface Publisher<T> {
void subscribe(Subscriber<? super T> var1);
}
public interface Subscriber<T> {
void onSubscribe(Subscription var1);
void onNext(T var1);
void onError(Throwable var1);
void onComplete();
}
public interface Subscription {
void request(long var1);
void cancel();
}
public interface Processor<T, R> extends
Subscriber<T>, Publisher<R> {
}
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
3.Flux、Mono组件创建
https://projectreactor.io/docs/core/release/reference/#mono
https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html
https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
</dependency>
Flux<String> seq1 = Flux.just("foo", "bar", "foobar");
List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);
Flux<String> noData = Flux.empty();
Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3);
Flux.interval(Duration.ofMillis(300))
generate() :利用SynchronousSink组件,包括next() complete()error(Throwable) 三个核心方法
Flux.generate(sink -> {
sink.next("Hello");
sink.complete();
});
Flux.create(sink -> {
sink.next("Hello");
sink.complete();
});
Mono创建方法,除了just() empty() error() never() 还有特定的delay() justOrEmpty();
delay()会在延迟时间后创建数字为0作为唯一值。
Mono.delay(Duration.ofMinutes(1));
justOrEmpty();如果不为空则加入,否则加入一个空对象。
Mono<String> hello = Mono.justOrEmpty(Optional.of("hello"));
4.Mono组件、Flux组件操作符
操作符主要分为7类:
转换操作符:
buffer:指定数量进行分割收集
bufferWhile:条件为true 则分组收集
bufferUntil:条件为true则分组,实例代码如下
Flux.range(1, 20).buffer(5).subscribe(System.out::println);
System.out.println("------");
Flux.range(1, 10).bufferUntil(i -> i % 2 == 0).subscribe(System.out::println);
System.out.println("------");
Flux.range(1, 10).bufferWhile(i -> i % 2 == 0).subscribe(System.out::println);
运行结果
map()将每一个元素映射到函数,便于处理
flatMap()将每一个元素转换为流,并进行合并
Flux.range(1, 5).map(i -> i * i).subscribe(System.out::println);;
System.out.println("------");
Flux.range(1, 5).flatMap(i -> Mono.just(i * i)).subscribe(System.out::println);
运行结果如下:
window()与buffer 相似,不过buffer 转换为多个集合,而window 转换为多个Flux 对象
public void testwindow() {
Flux.range(1, 5).window(2).toIterable().forEach(w -> {
w.subscribe(System.out::println);
System.out.println("-----");
});
}
运行结果:
后续函数不一一展示实验。可以通过查阅API文档进行实验:
过滤操作符:
filter:根据条件过滤;first:获取首个元素;last:获取最后元素
skip/skipLast:返回跳过n个元素的集合;take/takeLast返回获取N个元素的集合。
组合操作符:
then/when:then 是上一个操作完成再执行下一个、when是多个一起执行,直到全部执行完再返回、
merge:按照产生顺序进行合并
mergeSequential以流为单位进行合并、
startWith:在队列开头添加指定元素,zipWith 合并时不进行任何操作,得到一个Tuple2流。
条件操作符:
defaultEmpty如果原始数据流没元素则添加一个默认的空元素
skipUntil跳过元素直到遇到true、skipWhile遇到true 跳过元素
takeUntil收集元素直到遇到true、takeWhile遇到true 再收集元素
数学操作符:concat顺序连接、count计算数量、reduce将上一个元素与下一个元素进行函数处理
Observable 工具操作符:
delay:延后一段时间后
subscribe:添加一个订阅,进行响应,如上文提到的打印
timeout设置一段时间没有产生时间,生产一个超时异常
日志和调试操作符:
log:打印onNext() onComplete() onError 等日志信息、
debug:
1)启动调试模式
Hooks.onOperatorDebug();
Mono.just(0).map(x -> 1 / x)
.checkpoint().subscribe(System.out::println);
5.Reactor框架中的背压机制。
点击“在看”,学多少都不会忘~