手写一个简易版断路器(hystrix)
理论基础
根据熔断器模式(Circuit Breaker Pattern)
共有上面几种状态
-
Closed:默认状态,Circuit Breaker 内部维护着最近的失败次数(failure count)。每操作失败一次,失败次数就会加一。当失败次数达到设定的阈值,就会进入Open状态 -
Open: 操作不会执行,会立即失败。Circuit Breaker 内部维护一个计时器。当时间达到设定的阈值,就会从Open状态进入到Half-Open状态 -
Half-Open: 只要操作失败一次,立即进入Open状态。否则增加操作成功的次数,当成功次数达到设定阈值,进入Closed状态
简化为自己的理解模型如下:
基于上面的理论,我们来实现我们的一个断路器
实现
定义状态
-
State.java 首先我们定义一个枚举,定义上面的三种状态
public enum State {
CLOSED,
HALF_OPEN,
OPEN
}
定义阈值配置
然后我们要有一个配置类,定义失败多少次进入Open状态,Half-Open状态的超时时间等。具体看代码
-
Config.java
@Data
public class Config {
/**
* Closed 状态进入 Open 状态的错误个数阀值
*/
private int failureCount = 5;
/**
* failureCount 统计时间窗口(超过该时间重置失败次数)
*/
private long failureTimeInterval = 2 * 1000;
/**
* Open 状态进入 Half-Open 状态的超时时间
*/
private int halfOpenTimeout = 5 * 1000;
/**
* Half-Open 状态进入 Open 状态的成功个数阀值
*/
private int halfOpenSuccessCount = 2;
}
计数器
配置类我们基本都解决了,我们还需要有一个计数器,来记录上次操作调用失败的时间戳,失败次数等。用于和配置类比对来作状态转换。
-
Counter.java
public class Counter {
// Closed 状态进入 Open 状态的错误个数阀值
private final int failureCount;
// failureCount 统计时间窗口
private final long failureTimeInterval;
// 当前错误次数
private final AtomicInteger currentCount;
// 上一次调用失败的时间戳
private long lastTime;
// Half-Open 状态下成功次数
private final AtomicInteger halfOpenSuccessCount;
public Counter(int failureCount, long failureTimeInterval) {
this.failureCount = failureCount;
this.failureTimeInterval = failureTimeInterval;
this.currentCount = new AtomicInteger(0);
this.halfOpenSuccessCount = new AtomicInteger(0);
this.lastTime = System.currentTimeMillis();
}
/**
* 失败次数 + 1
* synchronized 保证线程安全
* @return
*/
public synchronized int incrFailureCount() {
long current = System.currentTimeMillis();
// 超过时间窗口,当前失败次数重置为 0
if (current - lastTime > failureTimeInterval) {
lastTime = current;
currentCount.set(0);
}
return currentCount.getAndIncrement();
}
/**
* half Open 状态下 成功数 + 1
* @return
*/
public int incrSuccessHalfOpenCount() {
return this.halfOpenSuccessCount.incrementAndGet();
}
/**
* 失败总次数是否超过阈值
* @return
*/
public boolean failureThresholdReached() {
return getCurCount() >= failureCount;
}
public int getCurCount() {
return currentCount.get();
}
/**
* 重置
*/
public synchronized void reset() {
halfOpenSuccessCount.set(0);
currentCount.set(0);
}
}
核心实现类
上面的一切都准备好了,我们就来编写我们的核心实现类(CircuitBreaker)。CircuitBreaker主要是负责状态间的状态转换,并提供方法给外部调用达到限流降级作用。
-
CircuitBreaker.java
public class CircuitBreaker {
private State state;
private final Config config;
private final Counter counter;
private long lastOpenedTime;
public CircuitBreaker(Config config) {
this.counter = new Counter(config.getFailureCount(), config.getFailureTimeInterval());
this.state = CLOSED;
this.config = config;
}
/**
* 供外部调用达到限流降级作用
* @param toRun
* @param fallback 异常回滚方法
* @param <T>
* @return
*/
public <T> T run(Supplier<T> toRun, Function<Throwable, T> fallback) {
try {
if (state == OPEN) {
// 判断 是否超时可以进入 half-open状态
if (halfOpenTimeout()) {
return halfOpenHandle(toRun, fallback);
}
// 直接执行失败回调方法
return fallback.apply(new DegradeException("degrade by circuit breaker"));
// 如果降级关闭则正常执行
} else if (state == CLOSED) {
T result = toRun.get();
// 注意重置 错误数
closed();
return result;
} else {
// if (state == half-Open)
return halfOpenHandle(toRun, fallback);
}
} catch (Exception e) {
// 执行失败 错误次数+ 1
counter.incrFailureCount();
// 错误次数达到阀值,进入 open 状态
if (counter.failureThresholdReached()) {
open();
}
return fallback.apply(e);
}
}
/**
* 转换为 half-Open状态
* @param toRun
* @param fallback
* @param <T>
* @return
*/
private <T> T halfOpenHandle(Supplier<T> toRun, Function<Throwable, T> fallback) {
try {
// closed 状态超时进入 half-open 状态
halfOpen();
T result = toRun.get();
int halfOpenSuccCount = counter.incrSuccessHalfOpenCount();
// half-open 状态成功次数到达阀值,进入 closed 状态
if (halfOpenSuccCount >= this.config.getHalfOpenSuccessCount()) {
closed();
}
return result;
} catch (Exception e) {
// half-open 状态发生一次错误进入 open 状态
open();
return fallback.apply(new DegradeException("degrade by circuit breaker"));
}
}
/**
* 判断是否 Open是否超时, 如果是则进入 halfOpen状态
* @return
*/
private boolean halfOpenTimeout() {
return System.currentTimeMillis() - lastOpenedTime > config.getHalfOpenTimeout();
}
private void closed() {
counter.reset();
state = CLOSED;
}
private void open() {
state = OPEN;
lastOpenedTime = System.currentTimeMillis();
}
private void halfOpen() {
state = HALF_OPEN;
}
}
测试
首先我们测试Open状态
@Test
public void testScene1() {
CircuitBreaker cb = new CircuitBreaker(new Config());
IntStream.range(0, 10).forEach(s -> {
String result = cb.run(() -> {
// 假装执行失败
System.out.println("执行方法次数:" + s);
int i = 1 / 0;
return "Success";
}, t -> {
System.err.println(t);
return "执行自定义降级方法";
});
});
}
执行结果:
可以看到当达到5次后就触发降级操作,不再处理,直接降级
如果我们在每个方法执行后休眠2s,会看到不会触发降级操作
@Test
public void testScene1() {
CircuitBreaker cb = new CircuitBreaker(new Config());
IntStream.range(0, 10).forEach(s -> {
String result = cb.run(() -> {
// 假装执行失败
System.out.println("执行方法次数:" + s);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
int i = 1 / 0;
return "Success";
}, t -> {
System.err.println(t);
return "执行自定义降级方法";
});
});
}
执行结果:
测试 转换为half_open 然后转close
@Test
public void testScene1() {
CircuitBreaker cb = new CircuitBreaker(new Config());
IntStream.range(0, 10).forEach(s -> {
if (s == 5) {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
String result = cb.run(() -> {
// 假装执行失败
System.out.println("执行方法次数:" + s);
if (s < 5) {
int i = 1 / 0;
}
System.out.println("方法执行成功");
return "Success";
}, t -> {
System.err.println(t);
return "执行自定义降级方法";
});
});
}
为了让测试查看方便,我在上面的状态转换里面加了一些输出,然后执行结果如下
可以看到最终都达到了我们想要的效果