CloudX开发者社群丨Reactive 架构才是未来
1
Reactive 和 Reactive programming
  
    
    
  
   
     
     
   public static void main(String[] args) {
   
     
     
    FluxProcessor<Integer, Integer> publisher = UnicastProcessor.create();
   
     
     
     publisher.doOnNext(event -> System.out.println("receive event: " + event)).subscribe();
   
     
     
   
   
     
     
    publisher.onNext(1); // print 'receive event: 1'
   
     
     
    publisher.onNext(2); // print 'receive event: 2'
   
     
     
   }
  
    
    
    
2
Reactive Manifesto
对于 Reactive 现在你应该大致有一点感觉了,但是 Reactive 有什么价值,有哪些设计原则,估计你还是有些模糊。这就是 Reactive Manifesto 要解决的疑问了。
即时响应性 (Responsive)
回弹性 (Resilient)
弹性 (Elastic)
消息驱动 (Message Driven)
-  
  
上面描述有很多专有名词,可能有些疑惑,可以看下相关名词解释。 
 -  
  
为什么使用 Reactive 方式构建的系统会具有以上价值, 我稍后在 Reactor 章节中介绍。  
3
Reactive Stream
The purpose of Reactive Streams is to provide a standard for asynchronous stream processing with non-blocking backpressure. 
Publisher
  
    
    
  
   
     
     
   public interface Publisher<T> { 
   
     
     
   public void subscribe(Subscriber<? super T> s);
   
     
     
   }
  
    
    
    
Subscriber
  
    
    
  
   
     
     
   publicinterface Subscriber<T> {
   
     
     
   public void onSubscribe(Subscription s);
   
     
     
   public void onNext(T t);
   
     
     
   public void onError(Throwable t);
   
     
     
   public void onComplete();
   
     
     
   }
  
    
    
    
Subscription
  
    
    
  
   
     
     
   public interface Subscription { 
   
     
     
   public void request(long n); 
   
     
     
   public void cancel();
   
     
     
   }
  
    
    
    
Processor
  
    
    
  
   
     
     
   public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}
  
    
    
    
-  
  
同步方式一般通过多线程来提高性能,但系统可创建的线程数是有限的,且线程多以后造成线程切换开销。  
-  
  
同步方式很难进一步提升资源利用率。  
-  
  
同步调用依赖的系统出现问题时,自身稳定性也会受到影响。  
Thread
-  
  
thread 不是非常轻量(相比下面几种实现方案)。  -  
  
thread 数量是有限的,最终可能会成为主要瓶颈。  -  
  
有一些平台可能不支持多线程。例如:JavaScript。  -  
  
调试,实现上有一定复杂性。  
Callback
-  
  
多层嵌套 callback 比较复杂,容易形成"圣诞树" (callback hell)。  -  
  
错误处理比较复杂。  -  
  
多用于 event loop 架构的语言中,例如:JavaScript。  
Future
-  
  
无法逻辑组合各种行为,支持业务场景有限。  -  
  
错误处理依然复杂。  
Reactive Extensions (Rx)
-  
  
和 Future 很相似。Future 可以认为返回一个独立的元素,而 Rx 返回一个可以被订阅的 Stream。  -  
  
多平台支持同一套规范。  -  
  
同一套 API 同时支持异步、同步。  -  
  
错误处理方便。  
Coroutines
-  
  
kotlin coroutine 和 goroutine 在语法层面上提供异步支持, 而且比Rx更简洁,但无法跨多个语言平台形成统一的规范。  
Reactor
-  
  
回弹性 (Resilient):当函数出现严重超时时 (rt >= 10s),函数上游的 broker, gateway 应用几乎无任何影响。  
-  
  
及时响应性:不管是高并发场景(资源足够),还是正常场景,RT 表现一致。  
-  
  
涉及到 IO 的地方几乎全异步化。例如中间件(HSF, MetaQ 等提供异步 API)调用。  -  
  
IO 线程模型变化。使用较少(一般 CPU 核数)线程处理所有的请求。  
4
传统 Java 应用 IO 线程模型
  
    
    
  
   
     
     
   // 非阻塞读取客户端请求数据(in), 读取成功后执行lambda.
   
     
     
   inChannel.read(in) {
   
     
     
    workerThreadPool.execute{
   
     
     
   // 阻塞处理业务逻辑(process), 业务逻辑在worker线程池中执行,同步执行完后,再向客户端返回输出(out)
   
     
     
   val out = process(in)
   
     
     
    outChannel.write(out)
   
     
     
    } 
   
     
     
   }
  
    
    
    
Reactive 应用 IO 线程模型
  
    
    
  
   
     
     
   // 非阻塞读取客户端请求数据(in), 读取成功后执行lambda
   
     
     
   inChannel.read(in) {
   
     
     
   // IO线程执行业务逻辑(process), 然后向客户端返回输出(out). 这要求业务处理流程必须是非阻塞的.
   
     
     
    process(in){ out->
   
     
     
    outChannel.write(out) {
   
     
     
   // this lambda is executed when the writing completes
   
     
     
    ...
   
     
     
    }
   
     
     
    }
   
     
     
   }
  
    
    
    
-  
  
Reactor 基础文档  -  
  
Reactive Streams 规范文档  -  
  
Operator  
总结
参考: https://www.reactivemanifesto.org/ https://www.reactive-streams.org/ https://kotlinlang.org/docs/tutorials/coroutines/async-programming.html https://projectreactor.io/docs/core/release/reference/index.html 
招商项目联系人:唐先生13996021262
