vlambda博客
学习文章列表

当响应式编程遇上springboot

01

写在前面

由于响应式编程异步非堵塞的特性,能够依赖较少的资源支撑较大的吞吐量,毫无疑问,这会是未来的趋势。

springboot的reactive web的效果如下:

JDK8的stream,到JDK9的reactive,无疑印证了这一点,相比Rxjava和spring5而言,JDK的stream和reactive是解耦的,是两套独立的完善而Rxjava和spring5更多的stream和reactive绑定在一起使用,提供一整套完整功能的同时,也增加的使用


02

stream

在JDK中的stream使用时,一般有三部,stream的创建,中间操作,终止操作。

int[] nums = { 1, 2, 3 };// 使用stream的内部迭代// map就是中间操作(返回stream的操作)// sum就是终止操作int sum2 = IntStream.of(nums).map(num->2*num).sum();

由于惰性求值,没有终止操作时,中间操作的逻辑不会执行的。

IntStream.of(nums).map(num->2*num);

调用parallel 产生一个并行流

IntStream.range(1, 100).parallel().peek(System.out::print).count();


03

reactive

在这个体系中,有两个角色发布者和订阅者(或者叫被观察者和观察者),这两者之间有发布订阅关系,发布者有发布动作和关闭动作,订阅者有消费动作。

1.定义发布者

SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();

2. 定义订阅者

Subscriber<Integer> subscriber = new Subscriber<Integer>() {  private Subscription subscription; @Override  public void onSubscribe(Subscription subscription) {    this.subscription = subscription; this.subscription.request(1);  } @Override public void onNext(Integer item) {    System.out.println("接受到数据: " + item);    this.subscription.request(1); }
@Override  public void onError(Throwable throwable) {    throwable.printStackTrace(); this.subscription.cancel(); }
@Override  public void onComplete() { System.out.println("处理完了!");  }};

3. 发布者和订阅者 建立订阅关系

publiser.subscribe(subscriber);

4. 生产数据, 并发布

for (int i = 0; i < 1000; i++) { publiser.submit(i);}

5. 结束后 关闭发布者(应该放 finally)

publiser.close();


04

Flux

我们先看看Flux的基本用法

String[] strs = { "1", "2", "3" };Subscriber<Integersubscriber = new Subscriber<Integer>() {  //具体实现略};Flux.fromArray(strs).map(s -> Integer.parseInt(s).subscribe(subscriber);

可以看出来Flux其实就是把stream和reactive结合在一起,订阅者的创建和发布数据就是创建stream和中间操作的过程,创建订阅关系其实就是stream的终止操作。


05

reactive web

Flux.fromStream(IntStream.range(1,5) .map(i->new User("name"+i,i*10)));

这其实就是一个创建stream的操作,而终止操作则交给spring框架,而spring框架接口Servlet的异步API,把Servlet异步处理当成是消费者,并且生成订阅关系,这样一来,Servlet业务线程的工作量大大的减轻了,这也是响应式编程能够用较少的资源支撑较高的吞吐量的一个具体应用。