Hystrix 熔断器执行和超时实现机制
本篇假设大家对Hystrix的执行过程及源码有一定的了解,这里介绍Hystrix的熔断器执行机制。
1.Hystrix 熔断器类结构
HystrixCircuitBreaker
作为接口定义,具体的实现有 NoOpCircuitBreaker
和 HystrixCircuitBreakerImpl
,其中 NoOpCircuitBreaker
只是个空壳没有具体的实现,相当于不熔断。 HystrixCircuitBreakerImpl
是主要的熔断逻辑实现。
2.Hystrix 熔断器状态
熔断器有三个状态 CLOSED
、 OPEN
、 HALF_OPEN
熔断器默认关闭状态,当触发熔断后状态变更为 OPEN
,在等待到指定的时间,Hystrix会放请求检测服务是否开启,这期间熔断器会变为 HALF_OPEN
半开启状态,熔断探测服务可用则继续变更为 CLOSED
关闭熔断器。
3.代码视角
ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();
Hystrix为每个commandKey都维护了一个熔断器,保持着对应的熔断器,所以当new XXXHystrixCommand()的时候依然能够保持着原来熔断器的状态。
3.1 如何判定开启熔断
protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
this.properties = properties;
this.metrics = metrics;
//On a timer, this will set the circuit between OPEN/CLOSED as command executions occur
Subscription s = subscribeToStream();
activeSubscription.set(s);
}
private Subscription subscribeToStream() {
/*
* This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream
*/
return metrics.getHealthCountsStream()
.observe()
.subscribe(new Subscriber<HealthCounts>() {
//.....................省略干扰代码......................
@Override
public void onNext(HealthCounts hc) {
// check if we are past the statisticalWindowVolumeThreshold
if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
} else {
if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
} else {
if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
circuitOpened.set(System.currentTimeMillis());
}
}
}
}
});
}
这里面HystrixBreaker启动的时候会订阅 HystrixCommandMetrics
的 HealthCountsStream
,每当 HealthCountsStream
搜集到数据,都会触发上面的 onNext
方法,然后该方法做下面几个判断 1.当前请求量是否达到设定水位(请求量太小不做阀值控制) 2.当前的请求错误量是否达到阀值,达到后会将熔断器状态置为 OPEN
, circuitOpened设置为当前时间戳表示开启的时间。
3.2 attemptExecution
先看下HystrixCommand 的执行Observable com.netflix.hystrix.AbstractCommand#applyHystrixSemantics
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
// mark that we're starting execution on the ExecutionHook
// if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent
executionHook.onStart(_cmd);
/* determine if we're allowed to execute */
if (circuitBreaker.attemptExecution()) {
··········省略代码··········
这里,每次HystrixCommand执行都会调用 circuitBreaker.attemptExecution()
public boolean attemptExecution() {
if (properties.circuitBreakerForceOpen().get()) {
return false;
}
if (properties.circuitBreakerForceClosed().get()) {
return true;
}
if (circuitOpened.get() == -1) {
return true;
} else {
if (isAfterSleepWindow()) {
if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
//only the first request after sleep window should execute
return true;
} else {
return false;
}
} else {
return false;
}
}
}
这里代码判断逻辑 1.判断是否强制开启熔断器,是则return false,command不能执行 2.判断是否强制关闭熔断器,是则return true, command可执行 3.判断熔断器是否开启 circuitOpened.get()==-1
表示没有开启,则return true,command可执行。 4.到这步证明已经开启了熔断器,那么判断是否可尝试请求,如果可以同时会把熔断器的状态改为 HALF_OPEN
3.3 markSuccess&markNonSuccess
com.netflix.hystrix.AbstractCommand#executeCommandAndObserve
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
......省略干扰代码.......
final Action1<R> markEmits = new Action1<R>() {
@Override
public void call(R r) {
if (shouldOutputOnNextEvents()) {
executionResult = executionResult.addEvent(HystrixEventType.EMIT);
eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);
}
if (commandIsScalar()) {
......省略干扰代码.......
circuitBreaker.markSuccess();
}
}
};
final Action0 markOnCompleted = new Action0() {
@Override
public void call() {
if (!commandIsScalar()) {
......省略干扰代码.......
circuitBreaker.markSuccess();
}
}
};
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
circuitBreaker.markNonSuccess();
......省略干扰代码.......
}
};
......省略干扰代码.......
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
此处表示HystrixCommand执行的过程中对应的熔断器状态变更,上面代码不难看出,当error的时候会触发 circuitBreaker.markNonSuccess();
,执行成功或者执行完成触发 circuitBreaker.markSuccess();
markNonSuccess
@Override
public void markNonSuccess() {
if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
//This thread wins the race to re-open the circuit - it resets the start time for the sleep window
circuitOpened.set(System.currentTimeMillis());
}
}
如果能执行到markNonSuccess,说明此时熔断器是关闭状态,或者尝试放流阶段。关闭状态的话不做处理(未触发熔断),尝试放流时,发现依然执行失败,这里讲熔断器状态重新置为开启状态,并把circuitOpened设置为当前的时间戳。
markSuccess
@Override
public void markSuccess() {
if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
//This thread wins the race to close the circuit - it resets the stream to start it over from 0
metrics.resetStream();
Subscription previousSubscription = activeSubscription.get();
if (previousSubscription != null) {
previousSubscription.unsubscribe();
}
Subscription newSubscription = subscribeToStream();
activeSubscription.set(newSubscription);
circuitOpened.set(-1L);
}
}
能走到markSuccess说明熔断器此时关闭或者放流阶段,尝试放流阶段则讲熔断器关闭,设置circuitOpened=-1,并重置指标统计。
4.THE END
到这里熔断器的介绍就结束了,回顾下主要有熔断器如何开启、如何关闭、几个状态的变更。一个完整的熔断器就此呈现在大家的面前。
HystrixCommand在执行的过程中如何探测超时,本篇主要对此进行介绍说明。
1.主入口:executeCommandAndObserve
#com.netflix.hystrix.AbstractCommand#executeCommandAndObserve
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
···省略部分代码···
Observable<R> execution;
//判断是否开启超时监测
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
executeCommandWithSpecifiedIsolation(cmd) .lift(new HystrixObservableTimeoutOperator
( cmd));
可以简单的认为lift 里面的对前面的Observable包含,类似装饰者,后面的parent就是指上层的Observable。其中 HystrixObservableTimeoutOperator 就是关键的部分。
2.关键点: HystrixObservableTimeoutOperator
先看下HystrixObservableTimeoutOperator.call(),TimerListener的实现
TimerListener listener = new TimerListener() {
@Override
public void tick() {
if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
// 标记事件,可以认为是开的hook,这里暂忽略
originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);
//取消原Obserable的订阅
s.unsubscribe();
final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {
@Override
public void run() {
child.onError(new HystrixTimeoutException());
}
});
timeoutRunnable.run();
}
}
//获取配置的超时时间配置
@Override
public int getIntervalTimeInMilliseconds() {
return originalCommand.properties.executionTimeoutInMilliseconds().get();
}
};
这段代码的意思就是,给当前command的超时状态置为超时,如果设置成功就抛出 HystrixTimeoutException
异常,紧接着被command的 doOnErron接收走 fallback逻辑
fallback
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
.................................
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
circuitBreaker.markNonSuccess();
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
//此处catch到超时异常
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
/*
* Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
*/
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
return handleFailureViaFallback(e);
}
}
};
.................................
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
同时 s.unsubscribe()
通知正在执行的线程,终止任务。如何终止呢?
executeCommandWithSpecifiedIsolation.subscribeOn()
subscribeOne的参数就是 HystrixContextScheduler
, Rxjava里 scheduler具体干活的是 worker
,我们先看下Hystrix自定义scheduler的结构示意图 那么我们直奔主题,直接看 ThreadPoolWorker
//ThreadPoolWorker.schedule
@Override
public Subscription schedule(final Action0 action) {
if (subscription.isUnsubscribed()) {
return Subscriptions.unsubscribed();
}
ScheduledAction sa = new ScheduledAction(action);
subscription.add(sa);
sa.addParent(subscription);
ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
return sa;
}
1.开始的时候判断observable是否被订阅 2.被订阅后,将任务 submit到线程池 3. FutureCompleterWithConfigurableInterrupt
scheduler在执行的时候,增加了observable的中断探测
private static class FutureCompleterWithConfigurableInterrupt implements Subscription {
private final FutureTask<?> f;
private final Func0<Boolean> shouldInterruptThread;
private final ThreadPoolExecutor executor;
private FutureCompleterWithConfigurableInterrupt(FutureTask<?> f, Func0<Boolean> shouldInterruptThread, ThreadPoolExecutor executor) {
this.f = f;
this.shouldInterruptThread = shouldInterruptThread;
this.executor = executor;
}
@Override
public void unsubscribe() {
executor.remove(f);
if (shouldInterruptThread.call()) {
f.cancel(true);
} else {
f.cancel(false);
}
}
.....省略代码.......
}
当observable 取消订阅时,就会把当前任务移除,并中断任务
到这里只是讲说了超时后的处理,如何认定执行超时呢?
3.匠心之巧
这里有个很巧妙的设计,再探 HystrixObservableTimeoutOperator
final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);
#com.netflix.hystrix.util.HystrixTimer#addTimerListener
public Reference<TimerListener> addTimerListener(final TimerListener listener) {
startThreadIfNeeded();
// add the listener
Runnable r = new Runnable() {
@Override
public void run() {
try {
listener.tick();
} catch (Exception e) {
logger.error("Failed while ticking TimerListener", e);
}
}
};
ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);
return new TimerReference(listener, f);
}
利用了 ScheduledThreadPoolExecutor
,延迟执行,延迟时间就是我们设定的超时时间,我们再看下
#HystrixObservableTimeoutOperator
Subscriber<R> parent = new Subscriber<R>() {
@Override
public void onCompleted() {
if (isNotTimedOut()) {
// stop timer and pass notification through
tl.clear();
child.onCompleted();
}
}
@Override
public void onError(Throwable e) {
if (isNotTimedOut()) {
// stop timer and pass notification through
tl.clear();
child.onError(e);
}
}
..... ..... ..... ..... ..... ..... ..... ..... .....
private boolean isNotTimedOut() {
// if already marked COMPLETED (by onNext) or succeeds in setting to COMPLETED
return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||
originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED);
}
};
这里parent就是指上层的obserable,这里可以抽象的认为是我们的HystrixCommand执行线程, 当command执行线程执行完成的时候或异常的时候,会执行 tl.clear(), 也就是Future.cancel()会中断 TimerListener
的ScheduledFuture 线程,迫使超时机制失效。
// tl.clear()
private static class TimerReference extends SoftReference<TimerListener> {
private final ScheduledFuture<?> f;
.... .... .... .... ....
@Override
public void clear() {
super.clear();
// stop this ScheduledFuture from any further executions
f.cancel(false);
}
}
4.回归文字
HystrixCommand里有个 TimedOutStatus
超时状态 现在可以认为有两个线程,一个是hystrixCommand任务执行线程,一个是等着给hystrixCommand判定超时的线程,现在两个线程看谁能先把hystrixCommand的状态置换,只要任何一个线程对hystrixCommand打上标就意味着超时判定结束。
相关热门推荐文章:
长按二维码,扫扫关注哦
✬如果你喜欢这篇文章,欢迎分享和点赞✬