vlambda博客
学习文章列表

初识响应式编程与Webflux

本期内容:

  • 概念

  • Web Flux

  • Reactive Streams、Reactor 和 Web Flux、

  • Mono 和 Flux  API简介

  • Mono 和 Flux的操作符

  • 总结


概念

响应式编程_:响应式编程_是一种面向数据流和变化传播的编程范式

数据流:

本来数据是我们自行处理的,后来我们把要处理的数据抽象出来(变成了数据流),然后通过API去处理数据流中的数据(是声明式的)。可以参考 java8的stream 来理解。但是stream是同步流。

变化传播:

  • 在命令式编程(我们的日常编程模式)下,式子a=b+c,这就意味着a的值是由b和c计算出来的。如果b或者c后续有变化, 不会影响到a的值
  • 在响应式编程下,式子a:=b+c,这就意味着a的值是由b和c计算出来的。但如果b或者c的值后续有变化, 会影响到a的值

举例:

背压:
消费者通知提供者自己的消费能力。

下图展示了 reactivestreams 中的核心接口
初识响应式编程与Webflux

  • Publisher:发布者

  • Subscriber:订阅者

  • Subscription:这个单词中文翻译为名词的订阅,在代码中它是发布者和订阅者之间的媒介

  • Processor:该接口继承了发布者和订阅者,可以理解为发布者和订阅者的中间操作(但是 Reactor 的中间操作并没有实现 Processor,在最新版本的 Reactor 中,Processor 的相关实现接口已经被弃用)

在了解了响应式编程的核心接口之后,我们来看下响应式编程是如何运作的
初识响应式编程与Webflux
在 Reactor 中大部分实现都是按照上图的逻辑来执行的

  1. 首先是Subscriber(订阅者)主动订阅 Publisher(发布者),通过调用 Publisher 的 subscribe 方法
  2. Publisher 在向下游发送数据之前,会先调用 Subscriber 的 onSubscribe 方法,传递的参数为 Subscription(订阅媒介)
  3. Subscriber 通过 Subscription#request 来请求数据,或者 Subscription#cancel 来取消数据发布(这就是响应式编程中的背压,订阅者可以控制数据发布)
  4. Subscription 在接收到订阅者的调用后,通过 Subscriber#onNext 向下游订阅者传递数据。
  5. 在数据发布完成后,调用 Subscriber#onComplete 结束本次流,如果数据发布或者处理遇到错误会调用 Subscriber#onError

调用 Subscriber#onNext,onComplete,onError 这三个方法,可能是在 Publisher 中做的,也可能是在 Subscription 中做的,根据不同的场景有不同的实现方式,并没有什么严格的要求。可以认为 Publisher 和 Subscription 共同配合完成了数据发布

其实 Reactor 中 API 实现原理也都是这个套路。

总结:Reactor 实现了 响应式编程的规范,以流为模型,支持java中两种异步模型(Future 和callback),让开发人员可以更好的编排多个异步的组合,避免了“回调地狱”。

写一个使用Reactor的简单例子:

    @Test
    public void simpleDemo(){


        Flux<Integer> publisher = Flux.just(1,2,3,4,5);

        publisher.subscribe(new CoreSubscriber<Integer>() {
            Subscription subscription;
            @Override
            public void onSubscribe(Subscription s) {
                subscription = s;
                subscription.request(Long.MAX_VALUE);
            }
            @Override
            public void onNext(Integer integer) {
                System.out.println(("消费 :" + integer));
                //这里可以根据具体情况,设置request的值,实现了背压
                subscription.request(1);
            }
            @Override
            public void onError(Throwable t) {
                System.err.println("发生异常");
            }
            @Override
            public void onComplete() {
                System.out.println("完成");
            }
        });
    }

Web Flux


Spring 5 引入的一个基于 Netty 而不是 Servlet 的高性能的 Web 框架,但是使用方式并没有同传统的基于 Servlet 的 Spring MVC 有什么大的不同。

▼ Web Flux 中 MVC 接口的示例

@RequestMapping("/demo")
@RestController 
public class DemoController {    
    @RequestMapping(value = "/foobar")    
    public Mono<Foobar> foobar() {
        return Mono.just(new Foobar()); 
    }}

异步非阻塞的优点:
我们假设,设置tomcat最大线程为200,遇到200个非常耗时的请求
那么当有200个线程同时并发在处理,那么当来201个请求的时候,就已经处理不了,因为所有的线程都阻塞了。这是Severlet 3.0之前的处理情况

而3.0之后异步处理是怎样处理呢?Servlet3.0类似于Netty一样就一个boss线程池和work线程池,boss线程只负责接收请求,work线程只负责处理逻辑。那么servlet3.0规范中,这200个线程只负责接收请求,然后每个线程将收到的请求,转发到work线程去处理。因为这200个线程只负责接收请求,并不负责处理逻辑,故不会被阻塞,而影响通信,就算处理非常耗时,也只是对work线程形成阻塞,所以当再来请求,同样可以处理,其主要应用场景是针对业务处理较耗时的情况可以减少服务器资源的占用,并且提高并发处理速度。

Reactive Streams、Reactor 和 Web Flux

上面介绍了反应式编程的一些概念,以及 Reactor 和 Web Flux。可能读者看到这里有些乱。这里介绍一下三者的关系。其实很简单:
Reactive Streams 是规范,Reactor 实现了 Reactive Streams。Web Flux 以 Reactor 为基础,实现 Web 领域的反应式编程框架。

其实,对于大部分业务开发人员来说,当编写反应式代码时,我们通常只会接触到 Publisher 这个接口,对应到 Reactor 便是 Mono 和 Flux。对于 Subscriber 和 Subcription 这两个接口,Reactor 必然也有相应的实现。但是,这些都是 Web Flux 和 Spring Data Reactive 这样的框架用到的。如果不开发中间件,通常开发人员是不会接触到的。
比如,在 Web Flux,你的方法只需返回 Mono 或 Flux 即可。你的代码基本也只和 Mono 或 Flux 打交道。而 Web Flux 则会实现 Subscriber ,onNext 时将业务开发人员编写的 Mono 或 Flux 转换为 HTTP Response 返回给客户端。

Mono 和 Flux  API简介

  • Mono 实现了 org.reactivestreams.Publisher 接口,代表0到1个元素的发布者。
  • Flux 同样实现了 org.reactivestreams.Publisher 接口,代表0到N个元素的发布者。

“普通”的创建方式

简单的创建方式是主要是使用像 just 这样的方法创建

Mono<String> helloWorld = Mono.just("Hello World");
Flux<String> fewWords = Flux.just("Hello""World");
Flux<String> manyWords = Flux.fromIterable(array);

这样的创建方式在什么时候用呢?一般是用在当你在经过一系列非 IO 型的操作之后,得到了一个对象。接下来要基于这个对象运用 Reactor 进行高性能的 IO 操作时,可以用这种方式将你之前得到的对象转换为 Mono 或 Flux。

“文艺”的创建方式

上述是我们通过一个同步调用得到的结果创建出 Mono 或 Flux,但有时我们需要从一个非 Reactive 的异步调用的结果创建出 Mono 或 Flux。那如何实现呢。
如果这个异步方法返回一个 CompletableFuture,那我们可以基于这个 CompletableFuture 创建一个
**Mono.fromFuture(aCompletableFuture); **
如果这个异步调用不会返回 CompletableFuture,是有自己的回调方法,那怎么创建 Mono 呢?
我们可以使用 static Mono create(Consumer<MonoSink > callback) 方法:

 ListenableFuture<ResponseEntity<String>> entity;
        Mono.create(sink -> {
            entity.addCallback(new        ListenableFutureCallback<ResponseEntity<String>>() {        
                @Override        
                public void onFailure(Throwable ex) {
                    sink.error(ex);        
                }         
                @Override        
                public void onSuccess(ResponseEntity<String> result) {
                    sink.success(result.getBody());        
                }    
            });});

Mono 和 Flux的操作符

在 Mono 和 Flux 中间环节处理的处理过程中,有三个有些类似的方法:map、flatMap 和 then。这三个方法可以说是 Reactor 中使用频率很高的方法。

  • flatMap(Function<? super T, ? extends Mono<? extends R>> transformer)
  • map(Function<? super T, ? extends R> mapper)
  • then(Mono other)

从方法名字上看,flatMap 和 map 都是做映射之用。而 then 则是下一步的意思,最适合用于链式调用。

then 表面看上去是下一步的意思,但它只表示执行顺序的下一步,不表示下一步依赖于上一步。then 方法的参数只是一个 Mono,无从接受上一步的执行结果。而 flatMap 和 map 的参数都是一个 Function。入参是上一步的执行结果。

flatMap的转换Function要求返回一个Publisher,这个Publisher代表一个作用于元素的异步的转换操作;而map仅仅是同步的元素转换操作。(在flatMap里,可以通过该上一步的元素,进行其他异步操作)

总结

本文介绍了响应式编程,Reactor,webflux的基本概念和使用方法。响应式编程是规范,Reacotr是基于响应式编程实现的框架,而webflux则是spring将Reactor整合到spring全家桶中。相比于传统mvc,webflux并不会让你的程序运行得更快,而是在有限的资源下提高系统的伸缩性和利用率。

目前webflux并没有被广泛使用,是因为webflux 作为一个异步非阻塞的框架,并不适用于我们的平时业务系统,目前我们的数据库操作都是同步的。最常见的是使用在网关层,因为dubbo、redis等都是支持异步操作的。

如果有疑问或者有不同的理解,也欢迎大家探讨和指正


- END -