聊聊Hystrix 命令执行流程
作者 | 赵云翔
杏仁后端工程师,关注如何优雅写 bug。
前言
Hystrix 简单介绍
-
什么是 Hystrix? Hystrix 是一个容错库,旨在隔离对远程系统、服务和第三方库的访问点,停止级联故障,并在错误不可避免的复杂分布式系统中能够弹性恢复。 -
核心概念 -
Command 命令 Command 是 Hystrix 的入口,对用户来说,我们只需要创建对应的 command,将需要保护的接口包装起来就可以。 可以无需关注再之后的逻辑。 与 Spring 深度集成后还可以通过注解的方式,就更加对开发友好了。 -
Circuit Breaker 断路器 断路器,是从电气领域引申过来的概念,具有 过载、短路和 欠电压保护功能,有保护线路和电源的能力。 在 Hystrix 中即为当请求超过一定比例响应失败时,hystrix 会对请求进行拦截处理,保证服务的稳定性,以及防止出现服务之间级联雪崩的可能性。 -
Isolation 隔离策略 隔离策略是 Hystrix 的设计亮点所在,利用 舱壁模式
的思想来对访问的资源进行隔离,每个资源是独立的依赖,单个资源的异常不应该影响到其他。 Hystrix 的隔离策略目前有两种:线程池隔离,信号量隔离。 -
Hystrix 的运行流程 官方的
How it Works
对流程有很详细的介绍,图示清晰,相信看完流程图就能对运行流程有一定的了解。
一次 Command 执行
HystrixCommand
是标准的
命令模式
实现,每一次请求即为一次命令的创建执行经历的过程。
从上
Hystrix 流程图
可以看出创建流程最终会指向
toObservable
,
Observable
即为被观察者,作用是发送数据给观察者进行相应的,因此可以知道这个方法应该是较为关键的。
UML
-
HystrixInvokable 标记这个一个可执行的接口,没有任何抽象方法或常量。 -
HystrixExecutable 是为 HystrixCommand
设计的接口,主要提供执行命令的抽象方法,例如:execute()
,queue()
,observe()
。 -
HystrixObservable 是为 Observable
设计的接口,主要提供自动订阅 (observe()
) 和生成 Observable (toObservable()
) 的抽象方法。 -
HystrixInvokableInfo 提供大量的状态查询 (获取属性配置,是否开启断路器等)。 -
AbstractCommand 核心逻辑 的实现。 -
HystrixCommand 定制逻辑实现以及留给用户实现的接口 (比如: run()
)。
样例代码
run
方法需要我们实现自己的业务逻辑,以下是偷懒采用匿名内部类的形式呈现。
构造方法的内部实现我们就不关注了,直接看下执行的逻辑吧。
HystrixCommand demo = new HystrixCommand<String>(HystrixCommandGroupKey.Factory.asKey ("demo-group")) {
@Override
protected String run() {
return "Hello World~";
}
};
demo.execute();
执行过程
流程图
这是官方给出的一次完整调用的链路。上述的 demo 中我们直接调用了 execute
方法,所以调用的路径为 execute()
-> queue()
-> toObservable()
-> toBlocking()
-> toFuture()
-> get()
。核心的逻辑其实就在 toObservable()
中。
HystrixCommand.java
execute()
execute
方法为同步调用返回结果,并对异常作处理。
内部会调用
queue
。
// 同步调用执行
public R execute() {
try {
//queue() 返回的是 Future 类型的对象,所以这里是阻塞 get
return queue().get();
} catch (Exception e) {
throw decomposeException(e);
}
}
queue()
queue
的第一行代码完成了核心的订阅逻辑.
-
toObservable()
生成了 Hystrix 的 Observable 对象。 -
将 Observable
转换为BlockingObservable
可以阻塞控制数据发送。 -
toFuture
实现对BlockingObservable
的订阅。
public Future<R> queue() {
// 着重关注的是这行代码
// 完成了 Observable 的创建及订阅
//toBlocking() 是将 Observable 转为 BlockingObservable, 转换后的 Observable 可以阻塞数据的发送
final Future<R> delegate = toObservable().toBlocking().toFuture();
final Future<R> f = new Future<R>() {
// 由于 toObservable().toBlocking().toFuture() 返回的 Future 如果中断了,
// 不会对当前线程进行中断,所以这里将返回的 Future 进行了再次包装,处理异常逻辑
...
}
// 判断是否已经结束了,有异常则直接抛出
if (f.isDone()) {
try {
f.get();
return f;
} catch (Exception e) {
// 省略这段判断
}
}
return f;
}
BlockingObservable.java
// 被包装的 Observable
private final Observable<? extends T> o;
//toBlocking() 会调用该静态方法将 源 Observable 简单包装成 BlockingObservable
public static <T> BlockingObservable<T> from (final Observable<? extends T> o) {
return new BlockingObservable<T>(o);
}
public Future<T> toFuture() {
return BlockingOperatorToFuture.toFuture ((Observable<T>) o);
}
BlockingOperatorToFuture.java
ReactiveX 关于 toFuture 的解读
The
toFuture
operator applies to theBlockingObservable
subclass, so in order to use it, you must first convert your source Observable into aBlockingObservable
by means of either theBlockingObservable.from
method or theObservable.toBlocking
operator.
toFuture
只能作用于 BlockingObservable
所以也才会有上文想要转换为 BlockingObservable 的操作。
// 该操作将源 Observable 转换为返回单个数据项的 Future
public static <T> Future<T> toFuture (Observable<? extends T> that) {
// CountDownLatch 判断是否完成
final CountDownLatch finished = new CountDownLatch (1);
// 存储执行结果
final AtomicReference<T> value = new AtomicReference<T>();
// 存储错误结果
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
//single() 方法可以限制 Observable 只发送单条数据
// 如果有多条数据 会抛 IllegalArgumentException
// 如果没有数据可以发送 会抛 NoSuchElementException
@SuppressWarnings ("unchecked")
final Subscription s = ((Observable<T>) that).single().subscribe (new Subscriber<T>() {
//single() 返回的 Observable 就可以对其进行标准的处理了
@Override
public void onCompleted() {
finished.countDown();
}
@Override
public void onError (Throwable e) {
error.compareAndSet (null, e);
finished.countDown();
}
@Override
public void onNext (T v) {
// "single" guarantees there is only one "onNext"
value.set (v);
}
});
// 最后将 Subscription 返回的数据封装成 Future, 实现对应的逻辑
return new Future<T>() {
// 可以查看源码
};
}
AbstractCommand.java
AbstractCommand
是
toObservable
实现的地方,属于 Hystrix 的核心逻辑,代码较长,可以和方法调用的流程图一起食用。
toObservable
主要是完成缓存和创建 Observable,requestLog 的逻辑,当第一次创建 Observable 时,
applyHystrixSemantics
方法是 Hystrix 的最最核心的实现,可以跳着看。
tips: 下文中有很多 Action 和 Function,他们很相似,都有 call 方法,但是区别在于 Function 有返回值,而 Action 没有,方法后跟着的数字代表有几个入参。 Func0/Func3 即没有入参和有三个入参。
toObservable
toObservable
代码较长且分层还是清晰的,所以下面一块一块写。
其逻辑和文章开始提到的
Hystrix 流程图
是完全一致的。
public Observable<R> toObservable() {
final AbstractCommand<R> _cmd = this;
// 此处省略掉了很多个 Action 和 Function, 大部分是来做扫尾清理的函数,所以用到的时候再说
//defer 在上篇 rxjava 入门中提到过,是一种创建型的操作符,每次订阅时会产生新的 Observable, 回调方法中所实现的才是真正我们需要的 Observable
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
// 校验命令的状态,保证其只执行一次
if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
//TODO make a new error type for this
throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + "command executed multiple times - this is not permitted.", ex, null);
}
commandStartTimestamp = System.currentTimeMillis();
//properties 为当前 command 的所有属性
// 允许记录请求 log 时会保存当前执行的 command
if (properties.requestLogEnabled().get()) {
//log this command execution regardless of what happened
if (currentRequestLog != null) {
currentRequestLog.addExecutedCommand(_cmd);
}
}
// 是否开启了请求缓存
final boolean requestCacheEnabled = isRequestCachingEnabled();
// 获取缓存 key
final String cacheKey = getCacheKey();
// 开启缓存后,尝试从缓存中取
if (requestCacheEnabled) {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}
// 没有开启请求缓存时,就执行正常的逻辑
Observable<R> hystrixObservable =
// 这里又通过 defer 创建了我们需要的 Observable
Observable.defer(applyHystrixSemantics)
// 发送前会先走一遍 hook, 默认 executionHook 是空实现的,所以这里就跳过了
.map(wrapWithAllOnNextHooks);
// 得到最后的封装好的 Observable 后,将其放入缓存
if (requestCacheEnabled && cacheKey != null) {
//wrap it for caching
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from (hystrixObservable, _cmd);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
if (fromCache != null) {
//another thread beat us so we'll use the cached value instead
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else {
//we just created an ObservableCommand so we cast and return it
afterCache = toCache.toObservable();
}
} else {
afterCache = hystrixObservable;
}
return afterCache
// 终止时的操作
.doOnTerminate(terminateCommandCleanup) //perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
// 取消订阅时的操作
.doOnUnsubscribe(unsubscribeCommandCleanup) //perform cleanup once
// 完成时的操作
.doOnCompleted(fireOnCompletedHook);
}
}
handleRequestCacheHitAndEmitValues
private Observable<R> handleRequestCacheHitAndEmitValues(final HystrixCommandResponseFromCache<R> fromCache, final AbstractCommand<R> _cmd) {
try {
// Hystrix 中有大量的 hook 如果有心做二次开发的,可以利用这些 hook 做到很完善的监控
executionHook.onCacheHit(this);
} catch (Throwable hookEx) {
logger.warn ("Error calling HystrixCommandExecutionHook.onCacheHit", hookEx);
}
// 将缓存的结果赋给当前 command
return fromCache.toObservableWithStateCopiedInto (this)
//doOnTerminate 或者是后面看到的 doOnUnsubscribe,doOnError, 都指的是在响应 onTerminate/onUnsubscribe/onError 后的操作,即在 Observable 的生命周期上注册一个动作优雅的处理逻辑
.doOnTerminate (new Action0() {
@Override
public void call() {
// 命令最终状态的不同进行不同处理
if (commandState.compareAndSet (CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) {
cleanUpAfterResponseFromCache (false); //user code never ran
} else if (commandState.compareAndSet (CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) {
cleanUpAfterResponseFromCache (true); //user code did run
}
}
})
.doOnUnsubscribe (new Action0() {
@Override
public void call() {
// 命令最终状态的不同进行不同处理
if (commandState.compareAndSet (CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) {
cleanUpAfterResponseFromCache (false); //user code never ran
} else if (commandState.compareAndSet (CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) {
cleanUpAfterResponseFromCache (true); //user code did run
}
}
});
}
applyHystrixSemantics
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
// 不再订阅了就返回不发送数据的 Observable
if (commandState.get().equals (CommandState.UNSUBSCRIBED)) {
// 不发送任何数据或通知
return Observable.never();
}
return applyHystrixSemantics (_cmd);
}
};
private Observable<R> applyHystrixSemantics (final AbstractCommand<R> _cmd) {
// 标记开始执行的 hook
// 如果 hook 内抛异常了,会快速失败且没有 fallback 处理
executionHook.onStart (_cmd);
/* determine if we're allowed to execute */
// 断路器核心逻辑:判断是否允许执行 (TODO)
if (circuitBreaker.allowRequest()) {
// Hystrix 自己造的信号量轮子,之所以不用 juc 下,官方解释为 juc 的 Semphore 实现太复杂,而且没有动态调节的信号量大小的能力,简而言之,不满足需求!
// 根据不同隔离策略 (线程池隔离 / 信号量隔离) 获取不同的 TryableSemphore
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
// Semaphore 释放标志
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean (false);
// 释放信号量的 Action
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet (false, true)) {
executionSemaphore.release();
}
}
};
// 异常处理
final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
@Override
public void call (Throwable t) {
// HystrixEventNotifier 是 hystrix 的插件,不同的事件发送不同的通知,默认是空实现.
eventNotifier.markEvent (HystrixEventType.EXCEPTION_THROWN, commandKey);
}
};
// 线程池隔离的 TryableSemphore 始终为 true
if (executionSemaphore.tryAcquire()) {
try {
/* used to track userThreadExecutionTime */
//executionResult 是一次命令执行的结果信息封装
// 这里设置起始时间是为了记录命令的生命周期,执行过程中会 set 其他属性进去
executionResult = executionResult.setInvocationStartTime (System.currentTimeMillis());
return executeCommandAndObserve (_cmd)
// 报错时的处理
.doOnError (markExceptionThrown)
// 终止时释放
.doOnTerminate (singleSemaphoreRelease)
// 取消订阅时释放
.doOnUnsubscribe (singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error (e);
}
} else {
//tryAcquire 失败后会做 fallback 处理,TODO
return handleSemaphoreRejectionViaFallback();
}
} else {
// 断路器短路 (拒绝请求) fallback 处理 TODO
return handleShortCircuitViaFallback();
}
}
executeCommandAndObserve
/**
* 执行 run 方法的地方
*/
private Observable<R> executeCommandAndObserve (final AbstractCommand<R> _cmd) {
// 获取当前上下文
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
// 发送数据时的 Action 响应
final Action1<R> markEmits = new Action1<R>() {
@Override
public void call (R r) {
// 如果 onNext 时需要上报时,做以下处理
if (shouldOutputOnNextEvents()) {
//result 标记
executionResult = executionResult.addEvent (HystrixEventType.EMIT);
// 通知
eventNotifier.markEvent (HystrixEventType.EMIT, commandKey);
}
//commandIsScalar 是一个我不解的地方,在网上也没有查到好的解释
// 该方法为抽象方法,有 HystrixCommand 实现返回 true.HystrixObservableCommand 返回 false
if (commandIsScalar()) {
// 耗时
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
// 通知
eventNotifier.markCommandExecution (getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
eventNotifier.markEvent (HystrixEventType.SUCCESS, commandKey);
executionResult = executionResult.addEvent ((int) latency, HystrixEventType.SUCCESS);
// 断路器标记成功 (断路器半开时的反馈,决定是否关闭断路器)
circuitBreaker.markSuccess();
}
}
};
final Action0 markOnCompleted = new Action0() {
@Override
public void call() {
if (!commandIsScalar()) {
// 同 markEmits 类似处理
}
}
};
// 失败回退的逻辑
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call (Throwable t) {
// 不是重点略过了
}
};
// 请求上下文的处理
final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
@Override
public void call (Notification<? super R> rNotification) {
setRequestContextIfNeeded (currentRequestContext);
}
};
Observable<R> execution;
// 如果有执行超时限制,会将包装后的 Observable 再转变为支持 TimeOut 的
if (properties.executionTimeoutEnabled().get()) {
// 根据不同的隔离策略包装为不同的 Observable
execution = executeCommandWithSpecifiedIsolation (_cmd)
//lift 是 rxjava 中一种基本操作符 可以将 Observable 转换成另一种 Observable
// 包装为带有超时限制的 Observable
.lift (new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation (_cmd);
}
return execution.doOnNext (markEmits)
.doOnCompleted (markOnCompleted)
.onErrorResumeNext (handleFallback)
.doOnEach (setRequestContext);
}
executeCommandWithSpecifiedIsolation
Observable
private Observable<R> executeCommandWithSpecifiedIsolation (final AbstractCommand<R> _cmd) {
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
//mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
return Observable.defer (new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
// 由于源码太长,这里只关注正常的流程,需要详细了解可以去看看源码
if (threadState.compareAndSet (ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
try {
return getUserExecutionObservable (_cmd);
} catch (Throwable ex) {
return Observable.error (ex);
}
} else {
//command has already been unsubscribed, so return immediately
return Observable.error (new RuntimeException ("unsubscribed before executing run()"));
}
}})
.doOnTerminate (new Action0() {})
.doOnUnsubscribe (new Action0() {})
// 指定在某一个线程上执行,是 rxjava 中很重要的线程调度的概念
.subscribeOn (threadPool.getScheduler (new Func0<Boolean>() {
}));
} else { // 信号量隔离策略
return Observable.defer (new Func0<Observable<R>>() {
// 逻辑与线程池大致相同
});
}
}
getUserExecutionObservable
获取用户执行的逻辑
private Observable<R> getUserExecutionObservable (final AbstractCommand<R> _cmd) {
Observable<R> userObservable;
try {
//getExecutionObservable 是抽象方法,有 HystrixCommand 自行实现
userObservable = getExecutionObservable();
} catch (Throwable ex) {
//the run() method is a user provided implementation so can throw instead of using Observable.onError
//so we catch it here and turn it into Observable.error
userObservable = Observable.error (ex);
}
// 将 Observable 作其他中转
return userObservable
.lift (new ExecutionHookApplication (_cmd))
.lift (new DeprecatedOnRunHookApplication (_cmd));
}
Map/FlatMap
操作符底层其实就是用的
lift
进行实现的。
getExecutionObservable
@Override
final protected Observable<R> getExecutionObservable() {
return Observable.defer (new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
try {
//just 操作符就是直接执行的 Observable
//run 方法就是我们实现的业务逻辑: Hello World~
return Observable.just (run());
} catch (Throwable ex) {
return Observable.error (ex);
}
}
}).doOnSubscribe (new Action0() {
@Override
public void call() {
// 执行订阅时将执行线程记为当前线程,必要时我们可以 interrupt
executionThread.set (Thread.currentThread());
}
});
}
总结
-
https://github.com/Netflix/Hystrix/wiki/How-it-Works -
http://reactivex.io/documentation/observable.html -
https://github.com/ruanyf/document-style-guide -
https://blog.csdn.net/qq_24530405/article/details/66969886
全文完
以下文章您可能也会感兴趣:
我们正在招聘 Java 工程师,欢迎有兴趣的同学投递简历到 [email protected] 。