vlambda博客
学习文章列表

也谈Reactive响应式编程及核心原理

    Reactive是一种设计思想,编程模型。它是少有的程序员评价差异巨大的编程模型,Reactive的熟客会极力吹捧它,新手极力抵触它。也说明它的门槛不低,尤其是熟练运用的门槛不低,需要思维模式的转变。

    Reactive的第一个特性是解耦合,Reactive的核心是函数化,且是纯函数(无状态,输出只跟输入有关,能稳定重现),纯函数的好处是可以在任意代码模块使用,无耦合。同时Reactive编程模型抽象了两种基本对象Observer和Observable,通过两个基本对象Observer和Observable,外加各种操作符(map, filter等)以搭积木的方式能够搭建出非常复杂的处理函数,使用方只需关心2个基本对象Observer,Observable及这个函数的输出数据, 无需关注这个函数的输入/上游数据格式及中间处理过程,高度解耦合。

    它的另外一个特性是异步,比如正常的编程b = a + 1;这个是一个指令,执行到这里的时候,b的值已经被计算出来了。如果b <= a + 1;这是定义b和a的关系,具体的值还没有,直到 a.onNext(1),b的值才会更新;其实Reactive响应式编程在很多框架有用到,比如TensorFlow的网络模型定义就类似Reactive模式,它先定义网络,通过placeHolder定义输入变量,后执行时feed这些变量才真正执行。

    脱离使用场景吹牛皮都是耍流氓,解耦合和异步处理框架有很多,比如EventBus,它的核心是事件,而Reactive对应的是数据,都是一个东西,且由于Reactive搭积木方式及各种操作符,使得Reactive方式在复杂异步数据流处理场景可以说是无敌手的。学习Reactive响应式编程的建议是去看看Angular里的异步数据流处理代码,Reactive编程模式可以说是Angular里流淌的血液,无处不在,并且专门为Pipe(数据流)定义了一个规范,Pipe类型的变量推荐使用$标识,Angular里的官方库很多是直接提供Observable,外部直接使用即可,使得前后端分离的代码很简洁优雅。

    各个编程语言基本都有Reactive响应式编程的库,比如Java的Rxjava库,Javascript的Rxjs, Android里的RxAndroid 和 RxKotlin, 今天我将以Rxjava代码为例来说说基本原理。


Rxjava 订阅和事件分发机制


    Rxjava其实就是一个通过Observable的subcribe向上通知和onNext向下汇报事件的过程,最后交给observer处理。具体调用流程如下:



    上面1~3的接口会被封装为Observer的LambdaObserver,然后调用4.subscribe(Observer<? super T> observer),


也谈Reactive响应式编程及核心原理


Rxjava链式调用

 

    由于RXjava支持链式调用,从而让observable形成一个observable双向链表,Observable的source引用上一个Observable,并创建一个observer给上一个Observable引用,从而使得上一个obserable能够回调回来,形成向上的链条,进而形成一个双向链表。


也谈Reactive响应式编程及核心原理


具体到一个案例


也谈Reactive响应式编程及核心原理


我们来分段分析.  在调用subscrible之前,只构造了source这个单向链表

 


也谈Reactive响应式编程及核心原理


当执行最后面的subscribe

也谈Reactive响应式编程及核心原理

会从下到上一直调用subscrible直到最顶层的ObservableOnSubscribe的subscrible, subscrible这个向上调用的过程就是构造Observer的过程,会构造每一层的ObserverWrap,比如最先构造OperatorObserver,最后构造CreateEmitter,  然后下面的代码会从上往下一路调用前面构造的Observer, 最后到最顶层Observable.create对应的Observer。


也谈Reactive响应式编程及核心原理


调用图如下


也谈Reactive响应式编程及核心原理

    Rxjava代码逻辑上是一个数据处理链,且通过Observable和Observer两种链接组件,能够将其他Observable和Observer像搭积木的方式组合成复杂的功能。


ConcatMap如何保证事件的时序


    我们知道flatmap和concatMap都支持新增observable发射新的事件, 但是flatmap没法保证时序,而concatMap保证时序

concatmap保证时序的原因是它每次调用Obserable.subscribe后会等到onSubscribe调用onComplete,然后才会调用下一个Obserable.subscribe,从而保证了时序,  具体源码如下:

也谈Reactive响应式编程及核心原理

也谈Reactive响应式编程及核心原理


Rxjava的常用操作


也谈Reactive响应式编程及核心原理


Rxjava的常见Subject对象


    Subject是一个很有意思的类型,它既可以是数据源Observable,也可以是数据的订阅者Observer,它特别适合做桥接, 也可以做桥接。    

    RxJava中常见的Subject有4种,分别是PublishSubject,  AsyncSubject,  BehaviorSubject, ReplaySubject。

也谈Reactive响应式编程及核心原理

PublishSubject

    可以说是最正常的Subject,只能接收订阅后的发出的数据。

也谈Reactive响应式编程及核心原理


AsyncSubject

    简单的说使用AsyncSubject无论输入多少参数,永远只输出最后一个参数。

也谈Reactive响应式编程及核心原理


BehaviorSubject

    BehaviorSubject会发送离订阅最近的上一个值,没有上一个值的时候会发送默认值。

也谈Reactive响应式编程及核心原理


RelaySubject

    无论何时订阅,都会将所有历史订阅内容全部发出。

也谈Reactive响应式编程及核心原理


Angular中的Reactive应用示例





所有的码客文章,都在这里,欢迎长按关注

记得点击下面的「好看」哟👇