vlambda博客
学习文章列表

手写一个简易版断路器(hystrix)

理论基础

根据熔断器模式(Circuit Breaker Pattern)

共有上面几种状态

  • Closed:默认状态,Circuit Breaker 内部维护着最近的失败次数(failure count)。每操作失败一次,失败次数就会加一。当失败次数达到设定的阈值,就会进入Open状态
  • Open: 操作不会执行,会立即失败。Circuit Breaker  内部维护一个计时器。当时间达到设定的阈值,就会从Open状态进入到Half-Open状态
  • Half-Open: 只要操作失败一次,立即进入Open状态。否则增加操作成功的次数,当成功次数达到设定阈值,进入Closed状态

简化为自己的理解模型如下:手写一个简易版断路器(hystrix)

基于上面的理论,我们来实现我们的一个断路器

实现

定义状态

  • State.java 首先我们定义一个枚举,定义上面的三种状态
public enum State {

    CLOSED,
    HALF_OPEN,
    OPEN
}

定义阈值配置

然后我们要有一个配置类,定义失败多少次进入Open状态,Half-Open状态的超时时间等。具体看代码

  • Config.java
@Data
public class Config {

    /**
     * Closed 状态进入 Open 状态的错误个数阀值
     */

    private int failureCount = 5;

    /**
     * failureCount 统计时间窗口(超过该时间重置失败次数)
     */

    private long failureTimeInterval = 2 * 1000;

    /**
     * Open 状态进入 Half-Open 状态的超时时间
     */

    private int halfOpenTimeout = 5 * 1000;

    /**
     * Half-Open 状态进入 Open 状态的成功个数阀值
     */

    private int halfOpenSuccessCount = 2;


}

计数器

配置类我们基本都解决了,我们还需要有一个计数器,来记录上次操作调用失败的时间戳,失败次数等。用于和配置类比对来作状态转换。

  • Counter.java
public class Counter {

    // Closed 状态进入 Open 状态的错误个数阀值
    private final int failureCount;

    // failureCount 统计时间窗口
    private final long failureTimeInterval;

    // 当前错误次数
    private final AtomicInteger currentCount;

    // 上一次调用失败的时间戳
    private long lastTime;

    // Half-Open 状态下成功次数
    private final AtomicInteger halfOpenSuccessCount;

    public Counter(int failureCount, long failureTimeInterval) {
        this.failureCount = failureCount;
        this.failureTimeInterval = failureTimeInterval;
        this.currentCount = new AtomicInteger(0);
        this.halfOpenSuccessCount = new AtomicInteger(0);
        this.lastTime = System.currentTimeMillis();
    }

    /**
     * 失败次数 + 1
     * synchronized 保证线程安全
     * @return
     */

    public synchronized int incrFailureCount() {
        long current = System.currentTimeMillis();
        // 超过时间窗口,当前失败次数重置为 0
        if (current - lastTime > failureTimeInterval) { 
            lastTime = current;
            currentCount.set(0);
        }
        return currentCount.getAndIncrement();
    }

    /**
     * half Open 状态下 成功数 + 1
     * @return
     */

    public int incrSuccessHalfOpenCount() {
        return this.halfOpenSuccessCount.incrementAndGet();
    }

    /**
     * 失败总次数是否超过阈值
     * @return
     */

    public boolean failureThresholdReached() {
        return getCurCount() >= failureCount;
    }

    public int getCurCount() {
        return currentCount.get();
    }

    /**
     * 重置
     */

    public synchronized void reset() {
        halfOpenSuccessCount.set(0);
        currentCount.set(0);
    }

}

核心实现类

上面的一切都准备好了,我们就来编写我们的核心实现类(CircuitBreaker)。CircuitBreaker主要是负责状态间的状态转换,并提供方法给外部调用达到限流降级作用。

  • CircuitBreaker.java
public class CircuitBreaker {

    private State state;

    private final Config config;

    private final Counter counter;

    private long lastOpenedTime;

    public CircuitBreaker(Config config) {
        this.counter = new Counter(config.getFailureCount(), config.getFailureTimeInterval());
        this.state = CLOSED;
        this.config = config;
    }


    /**
     * 供外部调用达到限流降级作用
     * @param toRun
     * @param fallback 异常回滚方法
     * @param <T>
     * @return
     */

    public <T> run(Supplier<T> toRun, Function<Throwable, T> fallback) {
        try {
            if (state == OPEN) {
                // 判断 是否超时可以进入 half-open状态
                if (halfOpenTimeout()) {
                    return halfOpenHandle(toRun, fallback);
                }
                // 直接执行失败回调方法
                return fallback.apply(new DegradeException("degrade by circuit breaker"));
                // 如果降级关闭则正常执行
            } else if (state == CLOSED) {
                T result = toRun.get();
                // 注意重置 错误数
                closed();
                return result;
            } else {
                // if (state == half-Open)
                return halfOpenHandle(toRun, fallback);
            }
        } catch (Exception e) {
            // 执行失败 错误次数+ 1
            counter.incrFailureCount();
            // 错误次数达到阀值,进入 open 状态
            if (counter.failureThresholdReached()) {
                open();
            }
            return fallback.apply(e);
        }
    }

    /**
     *  转换为 half-Open状态
     * @param toRun
     * @param fallback
     * @param <T>
     * @return
     */

    private <T> halfOpenHandle(Supplier<T> toRun, Function<Throwable, T> fallback) {
        try {
            // closed 状态超时进入 half-open 状态
            halfOpen();
            T result = toRun.get();
            int halfOpenSuccCount = counter.incrSuccessHalfOpenCount();
            // half-open 状态成功次数到达阀值,进入 closed 状态
            if (halfOpenSuccCount >= this.config.getHalfOpenSuccessCount()) {
                closed();
            }
            return result;
        } catch (Exception e) {
            // half-open 状态发生一次错误进入 open 状态
            open();
            return fallback.apply(new DegradeException("degrade by circuit breaker"));
        }
    }

    /**
     * 判断是否 Open是否超时, 如果是则进入 halfOpen状态
     * @return
     */

    private boolean halfOpenTimeout() {
        return System.currentTimeMillis() - lastOpenedTime > config.getHalfOpenTimeout();
    }

    private void closed() {
        counter.reset();
        state = CLOSED;
    }

    private void open() {
        state = OPEN;
        lastOpenedTime = System.currentTimeMillis();
    }

    private void halfOpen() {
        state = HALF_OPEN;
    }

}

测试

首先我们测试Open状态

@Test
    public void testScene1() {
        CircuitBreaker cb = new CircuitBreaker(new Config());
        IntStream.range(010).forEach(s -> {
            String result = cb.run(() -> {
                // 假装执行失败
                System.out.println("执行方法次数:" + s);
                int i = 1 / 0;
                return "Success";
            }, t -> {
                System.err.println(t);
                return "执行自定义降级方法";
            });
        });
    }

执行结果:手写一个简易版断路器(hystrix)

可以看到当达到5次后就触发降级操作,不再处理,直接降级

如果我们在每个方法执行后休眠2s,会看到不会触发降级操作

@Test
    public void testScene1() {
        CircuitBreaker cb = new CircuitBreaker(new Config());
        IntStream.range(010).forEach(s -> {
            String result = cb.run(() -> {
                // 假装执行失败
                System.out.println("执行方法次数:" + s);
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                int i = 1 / 0;
                return "Success";
            }, t -> {
                System.err.println(t);
                return "执行自定义降级方法";
            });
        });
    }

执行结果:

测试 转换为half_open 然后转close

@Test
    public void testScene1() {
        CircuitBreaker cb = new CircuitBreaker(new Config());
        IntStream.range(010).forEach(s -> {
            if (s == 5) {
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            String result = cb.run(() -> {
                // 假装执行失败
                System.out.println("执行方法次数:" + s);
                if (s < 5) {
                    int i = 1 / 0;
                }
                System.out.println("方法执行成功");
                return "Success";
            }, t -> {
                System.err.println(t);
                return "执行自定义降级方法";
            });
        });
    }

为了让测试查看方便,我在上面的状态转换里面加了一些输出,然后执行结果如下

可以看到最终都达到了我们想要的效果