vlambda博客
学习文章列表

反应式编程Reactor中的多线程

中,我们对反应式编程有了基本的了解。并顺带了解了Java中反应式编程的最佳时间Reactor。今天我们继续通过Reactor来了解反应式编程,我们今天要说的Reactor中的多线程。


传统的Java多线程



在传统的Java多线程开发场景中,我们通常使用 Executors 工具类来创建线程池,通常是如下几种类型:


  • newCachedThreadPool 创建一个弹性大小的缓存线程池,如果线程池长度超过处理需要,则灵活回收空闲线程。如果没有可回收的线程,则新建线程。

  • newFixedThreadPool 创建一个固定大小的线程池,可控制线程最大并发数,超出的线程会在队列中进行等待。

  • newScheduledThreadPool 创建一个固定大小的线程池,支持定时及周期性的任务执行。

  • newSingleThreadExecutor 创建一个单线程的线程池,它只会用唯一的工作线程来执行任务,保证所有的任务按照指定顺序执行。

  • newWorkStealingPool 创建支持work-stealing(任务窃取,在之前的Go并发模型中有所提到,后续我会专门写文章介绍)的线程池。


可以说传统的 Executors 工具类已经让我们使用线程池变得非常顺手。Reactor则更近一步,提出了更傻瓜式的调度器 Scheduler


Reactor的多线程



Reactor中提供了 Schedulers 类可以创建如下几种线程环境:


  • Schedulers.immediate() 当前线程。

  • Schedulers.single() 对所有的调用者都提供同一个线程来使用,如果想使用独占的线程可以使用 Schedulers.newSingle()

  • Schedulers.elastic() 弹性线程池,根据需要创建一个线程池,重用空闲线程。线程池如果空闲时间过长就会被废弃。对于I/O阻塞的场景较为使用。Schedulers.elastic() 能够给每一个阻塞的任务分配线程,从而不会妨碍其他的任务和资源。

  • Schedulers.paraller() 固定大小线程池,默认创建的大小和CPU个数相同。

  • Schedulets.fromExecutorService(ExecutorService) 自定义线程池,基于自定义的Ex二次投入Service创建Scheduler。


Schedulers类已经预先创建了几种常用的线程池:使用single()elastic()parallel()方法可以分别使用内置的单线程、弹性线程池和固定大小线程池。如果想创建新的线程池,可以使用newSingle()newElastic()newParallel()方法。


我们从上面的描述可以看出,Reactor中的Schedulers和传统的Java多线程有一定的对应关系:


  • Schedulers.single()Schedulers.newSingle()对应Executors.newSingleThreadExecutor()

  • Schedulers.elastic()Schedulers.newElastic()对应Executors.newCachedThreadPool()

  • Schedulers.parallel()Schedulers.newParallel()对应Executors.newFixedThreadPool()


Schedulers提供的以上三种调度器底层都是基于ScheduledExecutorService的,因此都是支持任务定时和周期性执行的;


Reactor中的线程环境切换



和传统的Java多线程编程相比,Reactor提供了一种非常便利的切换线程的方式。也就是publishOnsubscribeOn方法。


Reactor本质上是对数据流的处理,基本模型也是发布-订阅模式。一般来说我们最终调用 subsribe() 方法时,形成了从上到下的数据流(发布端->订阅端)。但是其实还有一条从下到上的订阅流(订阅端->发布端),能够将订阅端的请求从下到上进行反馈。


明白了上面的内容,我们就能够很好理解publishOnsubscribeOn 的区别了, subscribeOn 会借助订阅流从下到上改变源头的线程执行环境。而 publishOn 则会借助数据流从上到下改变后续的执行环境。


我们通过以下的例子来进一步了解一下:



@Test
public void testScheduling() {
    Flux.range(0, 1)
        .log() // 1
        .publishOn(Schedulers.newParallel("myParallel"))
        .log() // 2
        .subscribeOn(Schedulers.newElastic("myElastic"))
        .log() // 3
        .blockLast();
}



  • 1处的log:会打印当前线程是 myElastic-x 

  • 2处的log:会打印当前线程是 myParallel-x 

  • 3处的log:会打印当前线程是 myParallel-x 


通过这个log分析,我们可以得到上面的结论:

  • publishOn会影响链中其后的操作符,比如上面的2和3处的log

  • subscribeOn无论出现在什么位置,都只影响源头的执行环境,比如上面1处的log


并且从我们使用的情况来看,Reactor中的线程切换非常方便。只需要使用 publishOnsubscribeOn 就可以定制我们的线程环境。


总结


Reactor中的多线程本质上和传统Java编程中的多线程没有什么区别,但是增加了很多语法糖,方便了在反应式编程中的多线程使用。并且在Reactor中做多线程环境的切换也是非常方便的,这个和我们在中提到的反应式编程更多的是定制流程,一旦我们的流程定制好了,那么最终数据就会按照我们期待的方式像流水一样流到最终位置。