Hystrix工作流程解析
搭建Hystrix源码阅读环境
引入依赖
<dependency><groupId>com.netflix.hystrix</groupId><artifactId>hystrix-core</artifactId><version>1.5.12</version></dependency>
创建Command类
public class HelloCommand extends HystrixCommand<String> {public HelloCommand() {super(HystrixCommandGroupKey.Factory.asKey("test"));}protected String run() throws Exception {Thread.sleep(800);return "sucess";}protected String getFallback() {System.out.println("执行了回退方法");return "error";}}
创建测试类
public class CommandTest {public static void main(String[] args) {HelloCommand command = new HelloCommand();String result = command.execute();System.out.println(result);}}
Hystrix工作流程
首先我们看一下上方的这张图,这个图完整的描述了Hystrix的工作流程:1.每次调用都会创建一个HystrixCommand 2.执行execute或queue做同步\异步调用 3.判断熔断器是否打开,如果打开跳到步骤8,否则进入步骤4 4.判断线程池/信号量是否跑满,如果跑满进入步骤8,否则进入步骤5 5.调用HystrixCommand的run方法,如果调用超时进入步骤8 6.判断是否调用成功,返回成功调用结果,如果失败进入步骤8 7.计算熔断器状态,所有的运行状态(成功, 失败, 拒绝,超时)上报给熔断器,用于统计从而判断熔断器状态 8.降级处理逻辑,根据上方的步骤可以得出以下四种情况会进入降级处理:
熔断器打开
线程池/信号量跑满
调用超时
调用失败
9.返回执行成功结果
创建HystrixCommand
接着我们结合源码看一下这个调用流程,直接执行测试类的main方法,可以看到入口就在execute方法上
public R execute() {try {return queue().get();} catch (Exception e) {throw Exceptions.sneakyThrow(decomposeException(e));}}
执行同步方法
public Future<R> queue() {final Future<R> delegate = toObservable().toBlocking().toFuture();//省略。。。};
接着看toObservable()方法
public Observable<R> toObservable() {//省略。。。return Observable.defer(new Func0<Observable<R>>() {public Observable<R> call() {if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");//TODO make a new error type for thisthrow new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);}commandStartTimestamp = System.currentTimeMillis();if (properties.requestLogEnabled().get()) {// log this command execution regardless of what happenedif (currentRequestLog != null) {currentRequestLog.addExecutedCommand(_cmd);}}final boolean requestCacheEnabled = isRequestCachingEnabled();final String cacheKey = getCacheKey();//如果开启请求缓存则查询缓存是否存在if (requestCacheEnabled) {HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);if (fromCache != null) {isResponseFromCache = true;return handleRequestCacheHitAndEmitValues(fromCache, _cmd);}}Observable<R> hystrixObservable =Observable.defer(applyHystrixSemantics).map(wrapWithAllOnNextHooks);Observable<R> afterCache;if (requestCacheEnabled && cacheKey != null) {HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);if (fromCache != null) {// another thread beat us so we'll use the cached value insteadtoCache.unsubscribe();isResponseFromCache = true;return handleRequestCacheHitAndEmitValues(fromCache, _cmd);} else {// we just created an ObservableCommand so we cast and return itafterCache = toCache.toObservable();}} else {afterCache = hystrixObservable;}return afterCache.doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line)).doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once.doOnCompleted(fireOnCompletedHook);}});}
在上面这个方法中会有一个缓存的判断,如果存在缓存的话直接返回结果,否则进入方法applyHystrixSemantics方法
判断熔断器和线程池以及信号量
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {executionHook.onStart(_cmd);/* determine if we're allowed to execute *///判断是否开启熔断器if (circuitBreaker.attemptExecution()) {//获取信号量实例final TryableSemaphore executionSemaphore = getExecutionSemaphore();final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);final Action0 singleSemaphoreRelease = new Action0() {public void call() {if (semaphoreHasBeenReleased.compareAndSet(false, true)) {executionSemaphore.release();}}};final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {public void call(Throwable t) {eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);}};//尝试获取信号量if (executionSemaphore.tryAcquire()) {try {/* used to track userThreadExecutionTime */executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());return executeCommandAndObserve(_cmd).doOnError(markExceptionThrown).doOnTerminate(singleSemaphoreRelease).doOnUnsubscribe(singleSemaphoreRelease);} catch (RuntimeException e) {return Observable.error(e);}} else {//拒绝return handleSemaphoreRejectionViaFallback();}} else {//失败return handleShortCircuitViaFallback();}}
在applyHystrixSemantics方法中,首先会判断是否开启熔断器,如果开启则直接进入失败处理的逻辑。否则会尝试获取信号量(如果使用的是线程池的模式则默认获取成功),获取成功进入executeCommandAndObserve方法
判断超时
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();//省略...//判断是否开启超时设置if (properties.executionTimeoutEnabled().get()) {//list添加超时操作execution = executeCommandWithSpecifiedIsolation(_cmd).lift(new HystrixObservableTimeoutOperator<R>(_cmd));} else {execution = executeCommandWithSpecifiedIsolation(_cmd);}
这里如果设置超时时间的话则会加上一个超时的处理,接着看executeCommandWithSpecifiedIsolation方法
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {return Observable.defer(new Func0<Observable<R>>() {public Observable<R> call() {executionResult = executionResult.setExecutionOccurred();if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));}metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {return Observable.error(new RuntimeException("timed out before executing run()"));}if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {HystrixCounters.incrementGlobalConcurrentThreads();threadPool.markThreadExecution();// store the command that is being runendCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());executionResult = executionResult.setExecutedInThread();try {executionHook.onThreadStart(_cmd);executionHook.onRunStart(_cmd);executionHook.onExecutionStart(_cmd);return getUserExecutionObservable(_cmd);} catch (Throwable ex) {return Observable.error(ex);}} else {return Observable.empty();}}}).doOnTerminate(new Action0() {public void call() {if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {handleThreadEnd(_cmd);}if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {}}}).doOnUnsubscribe(new Action0() {public void call() {if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {handleThreadEnd(_cmd);}if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {}}}).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {public Boolean call() {return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;}}));} else {return Observable.defer(new Func0<Observable<R>>() {public Observable<R> call() {executionResult = executionResult.setExecutionOccurred();if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));}metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);// semaphore isolated// store the command that is being runendCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());try {executionHook.onRunStart(_cmd);executionHook.onExecutionStart(_cmd);return getUserExecutionObservable(_cmd); //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw} catch (Throwable ex) {//If the above hooks throw, then use that as the result of the run methodreturn Observable.error(ex);}}});}}
这段代码比较长,具体的执行逻辑为:
进入方法会首先判断隔离策略,如果是使用的信号量模式则在当前线程上执行,否则进入下方的线程池逻辑
更改HystrixCommand的状态为USER_CODE_EXECUTED
判断HystrixCommand的超时状态,如果超时则抛出异常
更改当前command的线程执行状态为STARTED
调用getUserExecutionObservable执行具体的业务逻辑,也就是我们实现的那个run方法
doOnTerminate:执行完毕后更改线程状态为TERMINAL
doOnUnsubscribe:当Observable被取消订阅,更改线程状态为TERMINAL
subscribeOn:指定scheduler
好文章就该:收藏、转发、在看!!!
