vlambda博客
学习文章列表

Hystrix 熔断器执行和超时实现机制

本篇假设大家对Hystrix的执行过程及源码有一定的了解,这里介绍Hystrix的熔断器执行机制。

1.Hystrix 熔断器类结构

HystrixCircuitBreaker作为接口定义,具体的实现有 NoOpCircuitBreakerHystrixCircuitBreakerImpl,其中 NoOpCircuitBreaker只是个空壳没有具体的实现,相当于不熔断。 HystrixCircuitBreakerImpl是主要的熔断逻辑实现。

2.Hystrix 熔断器状态

熔断器有三个状态 CLOSEDOPENHALF_OPEN 熔断器默认关闭状态,当触发熔断后状态变更为 OPEN,在等待到指定的时间,Hystrix会放请求检测服务是否开启,这期间熔断器会变为 HALF_OPEN 半开启状态,熔断探测服务可用则继续变更为 CLOSED关闭熔断器。 Hystrix 熔断器执行和超时实现机制

3.代码视角

 
   
   
 
  1. ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();

Hystrix为每个commandKey都维护了一个熔断器,保持着对应的熔断器,所以当new XXXHystrixCommand()的时候依然能够保持着原来熔断器的状态。

3.1 如何判定开启熔断
 
   
   
 
  1. protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {

  2.    this.properties = properties;

  3.    this.metrics = metrics;


  4.    //On a timer, this will set the circuit between OPEN/CLOSED as command executions occur

  5.    Subscription s = subscribeToStream();

  6.    activeSubscription.set(s);

  7. }


  8. private Subscription subscribeToStream() {

  9.    /*

  10.     * This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream

  11.     */

  12.    return metrics.getHealthCountsStream()

  13.            .observe()

  14.            .subscribe(new Subscriber<HealthCounts>() {


  15.                 //.....................省略干扰代码......................

  16.                @Override

  17.                public void onNext(HealthCounts hc) {

  18.                    // check if we are past the statisticalWindowVolumeThreshold

  19.                    if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {


  20.                    } else {

  21.                        if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {


  22.                        } else {


  23.                            if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {

  24.                                circuitOpened.set(System.currentTimeMillis());

  25.                            }

  26.                        }

  27.                    }

  28.                }

  29.            });

  30. }

这里面HystrixBreaker启动的时候会订阅 HystrixCommandMetricsHealthCountsStream,每当 HealthCountsStream搜集到数据,都会触发上面的 onNext方法,然后该方法做下面几个判断 1.当前请求量是否达到设定水位(请求量太小不做阀值控制) 2.当前的请求错误量是否达到阀值,达到后会将熔断器状态置为 OPEN, circuitOpened设置为当前时间戳表示开启的时间。

3.2 attemptExecution

先看下HystrixCommand 的执行Observable com.netflix.hystrix.AbstractCommand#applyHystrixSemantics

 
   
   
 
  1. private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {

  2.        // mark that we're starting execution on the ExecutionHook

  3.        // if this hook throws an exception, then a fast-fail occurs with no fallback.  No state is left inconsistent

  4.        executionHook.onStart(_cmd);


  5.        /* determine if we're allowed to execute */

  6.        if (circuitBreaker.attemptExecution()) {

  7. ··········省略代码··········

这里,每次HystrixCommand执行都会调用 circuitBreaker.attemptExecution()

 
   
   
 
  1. public boolean attemptExecution() {

  2.            if (properties.circuitBreakerForceOpen().get()) {

  3.                return false;

  4.            }

  5.            if (properties.circuitBreakerForceClosed().get()) {

  6.                return true;

  7.            }

  8.            if (circuitOpened.get() == -1) {

  9.                return true;

  10.            } else {

  11.                if (isAfterSleepWindow()) {

  12.                    if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {

  13.                        //only the first request after sleep window should execute

  14.                        return true;

  15.                    } else {

  16.                        return false;

  17.                    }

  18.                } else {

  19.                    return false;

  20.                }

  21.            }

  22.        }

这里代码判断逻辑 1.判断是否强制开启熔断器,是则return false,command不能执行 2.判断是否强制关闭熔断器,是则return true, command可执行 3.判断熔断器是否开启 circuitOpened.get()==-1表示没有开启,则return true,command可执行。 4.到这步证明已经开启了熔断器,那么判断是否可尝试请求,如果可以同时会把熔断器的状态改为 HALF_OPEN

3.3 markSuccess&markNonSuccess

com.netflix.hystrix.AbstractCommand#executeCommandAndObserve

 
   
   
 
  1. private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {

  2.    ......省略干扰代码.......


  3.    final Action1<R> markEmits = new Action1<R>() {

  4.        @Override

  5.        public void call(R r) {

  6.            if (shouldOutputOnNextEvents()) {

  7.                executionResult = executionResult.addEvent(HystrixEventType.EMIT);

  8.                eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);

  9.            }

  10.            if (commandIsScalar()) {

  11.                ......省略干扰代码.......

  12.                circuitBreaker.markSuccess();

  13.            }

  14.        }

  15.    };


  16.    final Action0 markOnCompleted = new Action0() {

  17.        @Override

  18.        public void call() {

  19.            if (!commandIsScalar()) {

  20.                ......省略干扰代码.......

  21.                circuitBreaker.markSuccess();

  22.            }

  23.        }

  24.    };


  25.    final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {

  26.        @Override

  27.        public Observable<R> call(Throwable t) {

  28.            circuitBreaker.markNonSuccess();

  29.            ......省略干扰代码.......

  30.        }

  31.    };


  32.    ......省略干扰代码.......


  33.    return execution.doOnNext(markEmits)

  34.            .doOnCompleted(markOnCompleted)

  35.            .onErrorResumeNext(handleFallback)

  36.            .doOnEach(setRequestContext);

  37. }

此处表示HystrixCommand执行的过程中对应的熔断器状态变更,上面代码不难看出,当error的时候会触发 circuitBreaker.markNonSuccess();,执行成功或者执行完成触发 circuitBreaker.markSuccess();

markNonSuccess

 
   
   
 
  1. @Override

  2. public void markNonSuccess() {

  3.    if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {

  4.        //This thread wins the race to re-open the circuit - it resets the start time for the sleep window

  5.        circuitOpened.set(System.currentTimeMillis());

  6.    }

  7. }

如果能执行到markNonSuccess,说明此时熔断器是关闭状态,或者尝试放流阶段。关闭状态的话不做处理(未触发熔断),尝试放流时,发现依然执行失败,这里讲熔断器状态重新置为开启状态,并把circuitOpened设置为当前的时间戳。

markSuccess

 
   
   
 
  1. @Override

  2. public void markSuccess() {

  3.    if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {

  4.        //This thread wins the race to close the circuit - it resets the stream to start it over from 0

  5.        metrics.resetStream();

  6.        Subscription previousSubscription = activeSubscription.get();

  7.        if (previousSubscription != null) {

  8.            previousSubscription.unsubscribe();

  9.        }

  10.        Subscription newSubscription = subscribeToStream();

  11.        activeSubscription.set(newSubscription);

  12.        circuitOpened.set(-1L);

  13.    }

  14. }

能走到markSuccess说明熔断器此时关闭或者放流阶段,尝试放流阶段则讲熔断器关闭,设置circuitOpened=-1,并重置指标统计。

4.THE END

到这里熔断器的介绍就结束了,回顾下主要有熔断器如何开启、如何关闭、几个状态的变更。一个完整的熔断器就此呈现在大家的面前。

HystrixCommand在执行的过程中如何探测超时,本篇主要对此进行介绍说明。

1.主入口:executeCommandAndObserve

 
   
   
 
  1. #com.netflix.hystrix.AbstractCommand#executeCommandAndObserve

  2. private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {

  3.        ···省略部分代码···

  4.        Observable<R> execution;


  5.        //判断是否开启超时监测

  6.        if (properties.executionTimeoutEnabled().get()) {

  7.            execution = executeCommandWithSpecifiedIsolation(_cmd)

  8.                    .lift(new HystrixObservableTimeoutOperator<R>(_cmd));

  9.        } else {

  10.            execution = executeCommandWithSpecifiedIsolation(_cmd);

  11.        }


  12.        return execution.doOnNext(markEmits)

  13.                .doOnCompleted(markOnCompleted)

  14.                .onErrorResumeNext(handleFallback)

  15.                .doOnEach(setRequestContext);

  16.    }

executeCommandWithSpecifiedIsolation(cmd) .lift(new HystrixObservableTimeoutOperator ( cmd));

可以简单的认为lift 里面的对前面的Observable包含,类似装饰者,后面的parent就是指上层的Observable。其中 HystrixObservableTimeoutOperator 就是关键的部分。

2.关键点: HystrixObservableTimeoutOperator

先看下HystrixObservableTimeoutOperator.call(),TimerListener的实现

 
   
   
 
  1. TimerListener listener = new TimerListener() {


  2.                @Override

  3.                public void tick() {


  4.                    if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {

  5.                        // 标记事件,可以认为是开的hook,这里暂忽略

  6.                        originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);


  7.                        //取消原Obserable的订阅

  8.                        s.unsubscribe();


  9.                        final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {


  10.                            @Override

  11.                            public void run() {

  12.                                child.onError(new HystrixTimeoutException());

  13.                            }

  14.                        });

  15.                        timeoutRunnable.run();

  16.                    }

  17.                }


  18.                //获取配置的超时时间配置

  19.                @Override

  20.                public int getIntervalTimeInMilliseconds() {

  21.                    return originalCommand.properties.executionTimeoutInMilliseconds().get();

  22.                }

  23.            };

这段代码的意思就是,给当前command的超时状态置为超时,如果设置成功就抛出 HystrixTimeoutException异常,紧接着被command的 doOnErron接收走 fallback逻辑

 
   
   
 
  1. fallback

  2. private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {

  3.        final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();


  4.        .................................


  5.        final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {

  6.            @Override

  7.            public Observable<R> call(Throwable t) {

  8.                circuitBreaker.markNonSuccess();

  9.                Exception e = getExceptionFromThrowable(t);

  10.                executionResult = executionResult.setExecutionException(e);

  11.                if (e instanceof RejectedExecutionException) {

  12.                    return handleThreadPoolRejectionViaFallback(e);

  13.                } else if (t instanceof HystrixTimeoutException) {

  14.                    //此处catch到超时异常

  15.                    return handleTimeoutViaFallback();

  16.                } else if (t instanceof HystrixBadRequestException) {

  17.                    return handleBadRequestByEmittingError(e);

  18.                } else {

  19.                    /*

  20.                     * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.

  21.                     */

  22.                    if (e instanceof HystrixBadRequestException) {

  23.                        eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);

  24.                        return Observable.error(e);

  25.                    }


  26.                    return handleFailureViaFallback(e);

  27.                }

  28.            }

  29.        };


  30.        .................................


  31.        return execution.doOnNext(markEmits)

  32.                .doOnCompleted(markOnCompleted)

  33.                .onErrorResumeNext(handleFallback)

  34.                .doOnEach(setRequestContext);

  35.    }

同时 s.unsubscribe()通知正在执行的线程,终止任务。如何终止呢?

executeCommandWithSpecifiedIsolation.subscribeOn()

subscribeOne的参数就是 HystrixContextScheduler, Rxjava里 scheduler具体干活的是 worker,我们先看下Hystrix自定义scheduler的结构示意图 Hystrix 熔断器执行和超时实现机制 那么我们直奔主题,直接看 ThreadPoolWorker

 
   
   
 
  1. //ThreadPoolWorker.schedule

  2. @Override

  3. public Subscription schedule(final Action0 action) {

  4.    if (subscription.isUnsubscribed()) {

  5.        return Subscriptions.unsubscribed();

  6.    }


  7.    ScheduledAction sa = new ScheduledAction(action);


  8.    subscription.add(sa);

  9.    sa.addParent(subscription);


  10.    ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();

  11.    FutureTask<?> f = (FutureTask<?>) executor.submit(sa);

  12.    sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));


  13.    return sa;

  14. }

1.开始的时候判断observable是否被订阅 2.被订阅后,将任务 submit到线程池 3. FutureCompleterWithConfigurableInterrupt scheduler在执行的时候,增加了observable的中断探测

 
   
   
 
  1. private static class FutureCompleterWithConfigurableInterrupt implements Subscription {

  2.    private final FutureTask<?> f;

  3.    private final Func0<Boolean> shouldInterruptThread;

  4.    private final ThreadPoolExecutor executor;


  5.    private FutureCompleterWithConfigurableInterrupt(FutureTask<?> f, Func0<Boolean> shouldInterruptThread, ThreadPoolExecutor executor) {

  6.        this.f = f;

  7.        this.shouldInterruptThread = shouldInterruptThread;

  8.        this.executor = executor;

  9.    }


  10.    @Override

  11.    public void unsubscribe() {

  12.        executor.remove(f);

  13.        if (shouldInterruptThread.call()) {

  14.            f.cancel(true);

  15.        } else {

  16.            f.cancel(false);

  17.        }

  18.    }


  19.    .....省略代码.......

  20. }

当observable 取消订阅时,就会把当前任务移除,并中断任务

到这里只是讲说了超时后的处理,如何认定执行超时呢?

3.匠心之巧

这里有个很巧妙的设计,再探 HystrixObservableTimeoutOperator

 
   
   
 
  1. final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);


  2. #com.netflix.hystrix.util.HystrixTimer#addTimerListener

  3. public Reference<TimerListener> addTimerListener(final TimerListener listener) {

  4.        startThreadIfNeeded();

  5.        // add the listener


  6.        Runnable r = new Runnable() {


  7.            @Override

  8.            public void run() {

  9.                try {

  10.                    listener.tick();

  11.                } catch (Exception e) {

  12.                    logger.error("Failed while ticking TimerListener", e);

  13.                }

  14.            }

  15.        };


  16.        ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);

  17.        return new TimerReference(listener, f);

  18.    }

利用了 ScheduledThreadPoolExecutor,延迟执行,延迟时间就是我们设定的超时时间,我们再看下

 
   
   
 
  1. #HystrixObservableTimeoutOperator

  2. Subscriber<R> parent = new Subscriber<R>() {


  3.                @Override

  4.                public void onCompleted() {

  5.                    if (isNotTimedOut()) {

  6.                        // stop timer and pass notification through

  7.                        tl.clear();

  8.                        child.onCompleted();

  9.                    }

  10.                }


  11.                @Override

  12.                public void onError(Throwable e) {

  13.                    if (isNotTimedOut()) {

  14.                        // stop timer and pass notification through

  15.                        tl.clear();

  16.                        child.onError(e);

  17.                    }

  18.                }


  19.                .....  .....  .....  .....  .....  .....  .....  .....  .....


  20.                private boolean isNotTimedOut() {

  21.                    // if already marked COMPLETED (by onNext) or succeeds in setting to COMPLETED

  22.                    return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||

  23.                            originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED);

  24.                }


  25.            };

这里parent就是指上层的obserable,这里可以抽象的认为是我们的HystrixCommand执行线程, 当command执行线程执行完成的时候或异常的时候,会执行 tl.clear(), 也就是Future.cancel()会中断 TimerListener 的ScheduledFuture 线程,迫使超时机制失效。

 
   
   
 
  1. // tl.clear()

  2. private static class TimerReference extends SoftReference<TimerListener> {

  3.        private final ScheduledFuture<?> f;

  4.        ....        ....        ....        ....        ....

  5.        @Override

  6.        public void clear() {

  7.            super.clear();

  8.            // stop this ScheduledFuture from any further executions

  9.            f.cancel(false);

  10.        }

  11.    }

4.回归文字

HystrixCommand里有个 TimedOutStatus 超时状态 Hystrix 熔断器执行和超时实现机制 现在可以认为有两个线程,一个是hystrixCommand任务执行线程,一个是等着给hystrixCommand判定超时的线程,现在两个线程看谁能先把hystrixCommand的状态置换,只要任何一个线程对hystrixCommand打上标就意味着超时判定结束。

相关热门推荐文章:



长按二维码,扫扫关注哦

✬如果你喜欢这篇文章,欢迎分享和点赞✬