CloudX开发者社群丨Reactive 架构才是未来
1
Reactive 和 Reactive programming
public static void main(String[] args) {
FluxProcessor<Integer, Integer> publisher = UnicastProcessor.create();
publisher.doOnNext(event -> System.out.println("receive event: " + event)).subscribe();
publisher.onNext(1); // print 'receive event: 1'
publisher.onNext(2); // print 'receive event: 2'
}
2
Reactive Manifesto
对于 Reactive 现在你应该大致有一点感觉了,但是 Reactive 有什么价值,有哪些设计原则,估计你还是有些模糊。这就是 Reactive Manifesto 要解决的疑问了。
即时响应性 (Responsive)
回弹性 (Resilient)
弹性 (Elastic)
消息驱动 (Message Driven)
-
上面描述有很多专有名词,可能有些疑惑,可以看下相关名词解释。
-
为什么使用 Reactive 方式构建的系统会具有以上价值, 我稍后在 Reactor 章节中介绍。
3
Reactive Stream
The purpose of Reactive Streams is to provide a standard for asynchronous stream processing with non-blocking backpressure.
Publisher
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
Subscriber
publicinterface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
Subscription
public interface Subscription {
public void request(long n);
public void cancel();
}
Processor
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}
-
同步方式一般通过多线程来提高性能,但系统可创建的线程数是有限的,且线程多以后造成线程切换开销。
-
同步方式很难进一步提升资源利用率。
-
同步调用依赖的系统出现问题时,自身稳定性也会受到影响。
Thread
-
thread 不是非常轻量(相比下面几种实现方案)。 -
thread 数量是有限的,最终可能会成为主要瓶颈。 -
有一些平台可能不支持多线程。例如:JavaScript。 -
调试,实现上有一定复杂性。
Callback
-
多层嵌套 callback 比较复杂,容易形成"圣诞树" (callback hell)。 -
错误处理比较复杂。 -
多用于 event loop 架构的语言中,例如:JavaScript。
Future
-
无法逻辑组合各种行为,支持业务场景有限。 -
错误处理依然复杂。
Reactive Extensions (Rx)
-
和 Future 很相似。Future 可以认为返回一个独立的元素,而 Rx 返回一个可以被订阅的 Stream。 -
多平台支持同一套规范。 -
同一套 API 同时支持异步、同步。 -
错误处理方便。
Coroutines
-
kotlin coroutine 和 goroutine 在语法层面上提供异步支持, 而且比Rx更简洁,但无法跨多个语言平台形成统一的规范。
Reactor
-
回弹性 (Resilient):当函数出现严重超时时 (rt >= 10s),函数上游的 broker, gateway 应用几乎无任何影响。
-
及时响应性:不管是高并发场景(资源足够),还是正常场景,RT 表现一致。
-
涉及到 IO 的地方几乎全异步化。例如中间件(HSF, MetaQ 等提供异步 API)调用。 -
IO 线程模型变化。使用较少(一般 CPU 核数)线程处理所有的请求。
4
传统 Java 应用 IO 线程模型
// 非阻塞读取客户端请求数据(in), 读取成功后执行lambda.
inChannel.read(in) {
workerThreadPool.execute{
// 阻塞处理业务逻辑(process), 业务逻辑在worker线程池中执行,同步执行完后,再向客户端返回输出(out)
val out = process(in)
outChannel.write(out)
}
}
Reactive 应用 IO 线程模型
// 非阻塞读取客户端请求数据(in), 读取成功后执行lambda
inChannel.read(in) {
// IO线程执行业务逻辑(process), 然后向客户端返回输出(out). 这要求业务处理流程必须是非阻塞的.
process(in){ out->
outChannel.write(out) {
// this lambda is executed when the writing completes
...
}
}
}
-
Reactor 基础文档 -
Reactive Streams 规范文档 -
Operator
总结
参考: https://www.reactivemanifesto.org/ https://www.reactive-streams.org/ https://kotlinlang.org/docs/tutorials/coroutines/async-programming.html https://projectreactor.io/docs/core/release/reference/index.html
招商项目联系人:唐先生13996021262
