反应式编程Reactor中的多线程
在中,我们对反应式编程有了基本的了解。并顺带了解了Java中反应式编程的最佳时间Reactor。今天我们继续通过Reactor来了解反应式编程,我们今天要说的Reactor中的多线程。
在传统的Java多线程开发场景中,我们通常使用 Executors
工具类来创建线程池,通常是如下几种类型:
newCachedThreadPool
创建一个弹性大小的缓存线程池,如果线程池长度超过处理需要,则灵活回收空闲线程。如果没有可回收的线程,则新建线程。newFixedThreadPool
创建一个固定大小的线程池,可控制线程最大并发数,超出的线程会在队列中进行等待。newScheduledThreadPool
创建一个固定大小的线程池,支持定时及周期性的任务执行。newSingleThreadExecutor
创建一个单线程的线程池,它只会用唯一的工作线程来执行任务,保证所有的任务按照指定顺序执行。newWorkStealingPool
创建支持work-stealing(任务窃取,在之前的Go并发模型中有所提到,后续我会专门写文章介绍)的线程池。
可以说传统的 Executors
工具类已经让我们使用线程池变得非常顺手。Reactor则更近一步,提出了更傻瓜式的调度器 Scheduler
。
Reactor中提供了 Schedulers
类可以创建如下几种线程环境:
Schedulers.immediate()
当前线程。Schedulers.single()
对所有的调用者都提供同一个线程来使用,如果想使用独占的线程可以使用Schedulers.newSingle()
。Schedulers.elastic()
弹性线程池,根据需要创建一个线程池,重用空闲线程。线程池如果空闲时间过长就会被废弃。对于I/O阻塞的场景较为使用。Schedulers.elastic()
能够给每一个阻塞的任务分配线程,从而不会妨碍其他的任务和资源。Schedulers.paraller()
固定大小线程池,默认创建的大小和CPU个数相同。Schedulets.fromExecutorService(ExecutorService)
自定义线程池,基于自定义的Ex二次投入Service创建Scheduler。
Schedulers
类已经预先创建了几种常用的线程池:使用single()
、elastic()
和parallel()
方法可以分别使用内置的单线程、弹性线程池和固定大小线程池。如果想创建新的线程池,可以使用newSingle()
、newElastic()
和newParallel()
方法。
我们从上面的描述可以看出,Reactor中的Schedulers和传统的Java多线程有一定的对应关系:
Schedulers.single()
和Schedulers.newSingle()
对应Executors.newSingleThreadExecutor()
;Schedulers.elastic()
和Schedulers.newElastic()
对应Executors.newCachedThreadPool()
;Schedulers.parallel()
和Schedulers.newParallel()
对应Executors.newFixedThreadPool()
;
Schedulers
提供的以上三种调度器底层都是基于ScheduledExecutorService
的,因此都是支持任务定时和周期性执行的;
和传统的Java多线程编程相比,Reactor提供了一种非常便利的切换线程的方式。也就是publishOn
和subscribeOn
方法。
Reactor本质上是对数据流的处理,基本模型也是发布-订阅模式。一般来说我们最终调用 subsribe()
方法时,形成了从上到下的数据流(发布端->订阅端)。但是其实还有一条从下到上的订阅流(订阅端->发布端),能够将订阅端的请求从下到上进行反馈。
明白了上面的内容,我们就能够很好理解publishOn
和subscribeOn
的区别了, subscribeOn
会借助订阅流从下到上改变源头的线程执行环境。而 publishOn
则会借助数据流从上到下改变后续的执行环境。
我们通过以下的例子来进一步了解一下:
@Test
public void testScheduling() {
Flux.range(0, 1)
.log() // 1
.publishOn(Schedulers.newParallel("myParallel"))
.log() // 2
.subscribeOn(Schedulers.newElastic("myElastic"))
.log() // 3
.blockLast();
}
1处的log:会打印当前线程是
myElastic-x
2处的log:会打印当前线程是
myParallel-x
3处的log:会打印当前线程是
myParallel-x
通过这个log分析,我们可以得到上面的结论:
publishOn
会影响链中其后的操作符,比如上面的2和3处的logsubscribeOn
无论出现在什么位置,都只影响源头的执行环境,比如上面1处的log
并且从我们使用的情况来看,Reactor中的线程切换非常方便。只需要使用 publishOn
和 subscribeOn
就可以定制我们的线程环境。
Reactor中的多线程本质上和传统Java编程中的多线程没有什么区别,但是增加了很多语法糖,方便了在反应式编程中的多线程使用。并且在Reactor中做多线程环境的切换也是非常方便的,这个和我们在中提到的反应式编程更多的是定制流程,一旦我们的流程定制好了,那么最终数据就会按照我们期待的方式像流水一样流到最终位置。