限流降级组件Sentinel、Hystrix、Resilience4j对比
1、常用限流降级组件对比
Sentinel | Hystrix | resilience4j | |
隔离策略 | 信号量隔离(并发线程数限流) | 线程池隔离/信号量隔离 | 信号量隔离 |
熔断降级策略 | 基于响应时间、异常比率、异常数等 | 异常比率模式、超时熔断 | 基于异常比率、响应时间 |
实时统计实现 | 滑动窗口(LeapArray) | 滑动窗口(基于 RxJava) | Ring Bit Buffer |
动态规则配置 | 支持多种配置源 | 支持多种数据源 | 有限支持 |
扩展性 | 丰富的 SPI 扩展接口 | 插件的形式 | 接口的形式 |
基于注解的支持 | 支持 | 支持 | 支持 |
限流 | 基于 QPS,支持基于调用关系的限流 | 有限的支持 | Rate Limiter |
集群流量控制 | 支持 | 不支持 | 不支持 |
流量整形 | 支持预热模式、匀速排队模式等多种复杂场景 | 不支持 | 简单的 Rate Limiter 模式 |
系统自适应保护 | 支持 | 不支持 | 不支持 |
控制台 | 提供开箱即用的控制台,可配置规则、查看秒级监控、机器发现等 | 简单的监控查看 | 不提供控制台,可对接其它监控系统 |
多语言支持 | Java / C++ | Java | Java |
开源社区状态 | 活跃 | 停止维护 | 较活跃 |
Hystrix 的关注点在于以 隔离 和 熔断 为主的容错机制,超时或被熔断的调用将会快速失败,并可以提供 fallback 机制。
而 Sentinel 的侧重点在于:
多样化的流量控制策略,支持预热模式、匀速排队模式等多种复杂场景
熔断降级
系统负载保护
实时监控和控制台
Resilience4j是一个为Java8和函数式编程设计的容错库。
Resilience4j是一个受Netflix Hystrix启发的轻量级容错库,但它是为Java 8和函数式编程设计的。轻量级,因为库只使用Vavr,它没有任何其他外部库依赖项。相比之下,Netflix Hystrix对Archaius有一个编译依赖关系,Archaius有更多的外部库依赖关系,如Guava和Apache Commons配置。
Resilience4j提供高阶函数(decorators)来增强任何功能接口、lambda表达式或方法引用,包括断路器、速率限制器、重试或隔板。可以在任何函数接口、lambda表达式或方法引用上堆叠多个装饰器。优点是您可以选择所需的装饰器,而无需其他任何东西。
* 慢启动预热模式:当流量激增的时候,控制流量通过的速率,让通过的流量缓慢增加,在一定时间内逐渐增加到阈值上限,给冷系统一个预热的时间,避免冷系统被压垮。
* 匀速器模式:利用 Leaky Bucket 算法实现的匀速模式,严格控制了请求通过的时间间隔,同时堆积的请求将会排队,超过超时时长的请求直接被拒绝。
2、共同特性
资源模型和执行模型上的对比
Hystrix 的资源模型设计上采用了命令模式,将对外部资源的调用和 fallback 逻辑封装成一个命令对象(HystrixCommand/ HystrixObservableCommand),其底层的执行是基于 RxJava 实现的。每个 Command 创建时都要指定 commandKey 和 groupKey(用于区分资源)以及对应的隔离策略(线程池隔离 or 信号量隔离)。线程池隔离模式下需要配置线程池对应的参数(线程池名称、容量、排队超时等),然后 Command 就会在指定的线程池按照指定的容错策略执行;信号量隔离模式下需要配置最大并发数,执行 Command 时 Hystrix 就会限制其并发调用。
Sentinel 的设计则更为简单。相比 Hystrix Command 强依赖隔离规则,Sentinel 的资源定义与规则配置的耦合度更低。Hystrix 的 Command 强依赖于隔离规则配置的原因是隔离规则会直接影响 Command 的执行。在执行的时候 Hystrix 会解析 Command 的隔离规则来创建 RxJava Scheduler 并在其上调度执行,若是线程池模式则 Scheduler 底层的线程池为配置的线程池,若是信号量模式则简单包装成当前线程执行的 Scheduler。而 Sentinel 并不指定执行模型,也不关注应用是如何执行的。Sentinel 的原则非常简单:根据对应资源配置的规则来为资源执行相应的限流/降级/负载保护策略。在 Sentinel 中资源定义和规则配置是分离的。用户先通过 Sentinel API 给对应的业务逻辑定义资源(埋点),然后可以在需要的时候配置规则。埋点方式有两种:
try-catch 方式(通过 SphU.entry(...)),用户在 catch 块中执行异常处理 / fallback
if-else 方式(通过 SphO.entry(...)),当返回 false 时执行异常处理 / fallback
从 0.1.1 版本开始,Sentinel 还支持基于注解的资源定义方式,可以通过注解参数指定异常处理函数和 fallback 函数。
从 0.2.0 版本开始,Sentinel 引入异步调用链路支持,可以方便地统计异步调用资源的数据,维护异步调用链路,同时具备了适配异步框架/库的能力。可以参考 相关文档。
Sentinel 提供多样化的规则配置方式。除了直接通过 loadRules API 将规则注册到内存态之外,用户还可以注册各种外部数据源来提供动态的规则。用户可以根据系统当前的实时情况去动态地变更规则配置,数据源会将变更推送至 Sentinel 并即时生效。
隔离设计上的对比
隔离是 Hystrix 的核心功能之一。Hystrix 提供两种隔离策略:线程池隔离(Bulkhead Pattern)和信号量隔离,其中最推荐也是最常用的是线程池隔离。Hystrix 的线程池隔离针对不同的资源分别创建不同的线程池,不同服务调用都发生在不同的线程池中,在线程池排队、超时等阻塞情况时可以快速失败,并可以提供 fallback 机制。线程池隔离的好处是隔离度比较高,可以针对某个资源的线程池去进行处理而不影响其它资源,但是代价就是线程上下文切换的 overhead 比较大,特别是对低延时的调用有比较大的影响。
但是,实际情况下,线程池隔离并没有带来非常多的好处。首先就是过多的线程池会非常影响性能。考虑这样一个场景,在 Tomcat 之类的 Servlet 容器使用 Hystrix,本身 Tomcat 自身的线程数目就非常多了(可能到几十或一百多),如果加上 Hystrix 为各个资源创建的线程池,总共线程数目会非常多(几百个线程),这样上下文切换会有非常大的损耗。另外,线程池模式比较彻底的隔离性使得 Hystrix 可以针对不同资源线程池的排队、超时情况分别进行处理,但这其实是超时熔断和流量控制要解决的问题,如果组件具备了超时熔断和流量控制的能力,线程池隔离就显得没有那么必要了。
Hystrix 的信号量隔离限制对某个资源调用的并发数。这样的隔离非常轻量级,仅限制对某个资源调用的并发数,而不是显式地去创建线程池,所以 overhead 比较小,但是效果不错,也支持超时失败。Sentinel 可以通过并发线程数模式的流量控制来提供信号量隔离的功能。并且结合基于响应时间的熔断降级模式,可以在不稳定资源的平均响应时间比较高的时候自动降级,防止过多的慢调用占满并发数,影响整个系统。
熔断降级对比
Sentinel 和 Hystrix 的熔断降级功能本质上都是基于熔断器模式(Circuit Breaker Pattern)。Sentinel 与 Hystrix 都支持基于失败比率(异常比率)的熔断降级,在调用达到一定量级并且失败比率达到设定的阈值时自动进行熔断,此时所有对该资源的调用都会被 block,直到过了指定的时间窗口后才启发性地恢复。上面提到过,Sentinel 还支持基于平均响应时间的熔断降级,可以在服务响应时间持续飙高的时候自动熔断,拒绝掉更多的请求,直到一段时间后才恢复。这样可以防止调用非常慢造成级联阻塞的情况。
实时指标统计实现对比
Hystrix 和 Sentinel 的实时指标数据统计实现都是基于滑动窗口的。Hystrix 1.5 之前的版本是通过环形数组实现的滑动窗口,通过锁配合 CAS 的操作对每个桶的统计信息进行更新。Hystrix 1.5 开始对实时指标统计的实现进行了重构,将指标统计数据结构抽象成了响应式流(reactive stream)的形式,方便消费者去利用指标信息。同时底层改造成了基于 RxJava 的事件驱动模式,在服务调用成功/失败/超时的时候发布相应的事件,通过一系列的变换和聚合最终得到实时的指标统计数据流,可以被熔断器或 Dashboard 消费。
Sentinel 目前抽象出了 Metric 指标统计接口,底层可以有不同的实现,目前默认的实现是基于 LeapArray 的高性能滑动窗口,后续根据需要可能会引入 reactive stream 等实现。
断路器的实现
1、引入maven依赖
<!--添加sentinel依赖-->
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-core</artifactId>
<version>1.8.2</version>
</dependency>
<!--添加Hystrix依赖-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
<version>2.0.2.RELEASE</version>
</dependency>
<!--添加resilience4j依赖-->
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-spring-boot2</artifactId>
<version>0.14.1</version>
</dependency>
2、使用
public class TestController {
public String hiSentinel() throws InterruptedException {
// 1.5.0 版本开始可以直接利用 try-with-resources 特性,自动 exit entry
try (Entry ignored = SphU.entry("hi-v2")) {
// 被保护的逻辑
TimeUnit.SECONDS.sleep(1);
return "hi, sentinel normal";
} catch (BlockException ex) {
// 处理被流控的逻辑
return "hi, sentinel blocked";
}
}
commandKey = ,
//指定降级方法,在熔断和异常时会走降级方法
fallbackMethod = ,
commandProperties = {
//超时时间
@HystrixProperty(name = , , value = )
//判断熔断的最少请求数,默认是10;只有在一定时间内请求数量达到该值,才会进行成功率的计算
,
//熔断的阈值默认值50,表示在一定时间内有50%的请求处理失败,会触发熔断
,
},
threadPoolProperties = {
//并发,缺省为10
}
)
public String hiHystrix() throws InterruptedException {
TimeUnit.MILLISECONDS.sleep(RandomUtils.nextInt(300));
return "hi, hystrix normal";
}
public String hiResilience4j() {
return "hi, resilience4j normal";
}
public String fallbackMethod() {
return "hi, hystrix blocked";
}
private static void initFlowRules() {
List<FlowRule> rules = new ArrayList<>();
FlowRule rule = new FlowRule();
rule.setResource("hi-v2");
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
// Set limit QPS to 20.
rule.setCount(20);
rules.add(rule);
FlowRuleManager.loadRules(rules);
}
}
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
public class R4jAop {
public void pointCut() {
// do nothing
}
public Object doV2(ProceedingJoinPoint joinPoint) throws Throwable {
try {
return joinPoint.proceed();
} catch (RequestNotPermitted throwable) {
return "hi, resilience4j blocked";
} catch (Throwable throwable) {
throw throwable;
}
}
}
注解配置
resilience4j.ratelimiter.limiters.hi.limit-for-period=3
resilience4j.ratelimiter.limiters.hi.limit-refresh-period-in-millis=30000
resilience4j.ratelimiter.limiters.hi.timeout-in-millis=1000
resilience4j.ratelimiter.limiters.hi.subscribe-for-events=true
resilience4j.ratelimiter.limiters.hi.register-health-indicator=true
3、测试
class DemoApplicationTests {
private TestController testController;
void hiSentinelTest() throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(50);
for (int i = 0; i < 2000; i++) {
executorService.execute(() -> {
try {
System.out.println(testController.hiSentinel());
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
TimeUnit.SECONDS.sleep(5);
}
void hiHystrixTest() throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(50);
for (int i = 0; i < 2000; i++) {
executorService.execute(() -> {
try {
System.out.println(testController.hiHystrix());
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
TimeUnit.SECONDS.sleep(5);
}
void hiResilience4jTest() throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(50);
for (int i = 0; i < 2000; i++) {
executorService.execute(() -> {
System.out.println(testController.hiResilience4j());
});
}
TimeUnit.SECONDS.sleep(5);
}
}
public class BreakerProcessor {
/**
* 不同版本的熔断器配置对应不同的注册器,这里将注册器CircuitBreakerRegistry缓存起来
* key-版本号
* value-注册器CircuitBreakerRegistry
*/
private static final Map<String, CircuitBreakerRegistry> REGISTRY_MAP = new ConcurrentHashMap<>();
private static final String DEFAULT_REGISTRY = "default";
public <T> T getResult(String breakerName, Map<String, BreakerConfigInfo> breakerConfigMap, Supplier<T> supplier) {
CircuitBreaker breaker = getCircuitBreaker(breakerName, breakerConfigMap);
logger.info("breakerName=[{}],state=[{}],failureRate=[{}]", breakerName, breaker.getState(), breaker.getMetrics().getFailureRate());
Supplier<T> decoratedSupplier = CircuitBreaker.decorateSupplier(breaker, supplier);
return Try.ofSupplier(decoratedSupplier).recover(throwable -> null).get();
}
public CircuitBreaker getCircuitBreaker(String breakerName, Map<String, BreakerConfigInfo> breakerConfigMap) {
return getCircuitBreakerRegistry(breakerConfigMap).circuitBreaker(breakerName);
}
/**
* 获取注册器CircuitBreakerRegistry,用来统一管理不同配置的熔断器CircuitBreaker
*/
public CircuitBreakerRegistry getCircuitBreakerRegistry(Map<String, BreakerConfigInfo> breakerConfigMap) {
try {
if (MapUtils.isEmpty(breakerConfigMap)) {
return getCircuitBreakerRegistry(DEFAULT_REGISTRY, null);
}
//版本号必须以大写V作为前缀,然后按照自然排序,拿到最大的版本
String versionNo = breakerConfigMap.keySet().stream()
.filter(e -> StringUtils.startsWith(e, "V"))
.max(String::compareTo)
.orElse(DEFAULT_REGISTRY);
return getCircuitBreakerRegistry(versionNo, breakerConfigMap.get(versionNo));
} catch (Exception e) {
logger.error("解析confPlus配置获取注册器CircuitBreakerRegistry发生异常,使用默认注册器", e);
return getCircuitBreakerRegistry(DEFAULT_REGISTRY, null);
}
}
private CircuitBreakerRegistry getCircuitBreakerRegistry(String versionNo, BreakerConfigInfo info) {
logger.info("CircuitBreakerRegistry的版本和配置,versionNo=[{}],info=[{}]", versionNo, JSON.toJSONString(info));
if (REGISTRY_MAP.get(versionNo) != null) {
return REGISTRY_MAP.get(versionNo);
}
synchronized (BreakerProcessor.class) {
if (REGISTRY_MAP.get(versionNo) != null) {
return REGISTRY_MAP.get(versionNo);
}
REGISTRY_MAP.put(versionNo, CircuitBreakerRegistry.of(getCircuitBreakerConfig(info)));
return REGISTRY_MAP.get(versionNo);
}
}
private CircuitBreakerConfig getCircuitBreakerConfig(BreakerConfigInfo info) {
if (info == null) {
return CircuitBreakerConfig.custom()
//失败率阈值,当失败率大于等于该阈值时,熔断器开启
.failureRateThreshold(50)
//慢调用率阈值
.slowCallRateThreshold(100)
.slowCallDurationThreshold(Duration.ofMillis(TimeUnit.SECONDS.toMillis(60)))
//最小调用次数
.minimumNumberOfCalls(50)
//断路器在半开状态下允许通过的调用次数
.permittedNumberOfCallsInHalfOpenState(5)
//熔断器打开状态持续时间,超过改时间后才允许切换到半开状态
.waitDurationInOpenState(Duration.ofMillis(TimeUnit.SECONDS.toMillis(60)))
//滑动窗口类型:COUNT_BASED,统计最近slidingWindowSize次调用结果;TIME_BASED,统计最近slidingWindowSize秒调用结果;
.slidingWindowType(CircuitBreakerConfig.SlidingWindowType.COUNT_BASED)
//滑动窗口大小
.slidingWindowSize(50)
//列表内的异常被统计为失败
.recordExceptions(Throwable.class)
.build();
}
CircuitBreakerConfig.Builder builder = CircuitBreakerConfig.custom();
if (info.getFailureRateThreshold() != null) {
builder.failureRateThreshold(info.getFailureRateThreshold());
}
if (info.getSlowCallRateThreshold() != null) {
builder.slowCallRateThreshold(info.getSlowCallRateThreshold());
}
if (info.getSlowCallDurationThreshold() != null) {
builder.slowCallDurationThreshold(Duration.ofMillis(TimeUnit.SECONDS.toMillis(info.getSlowCallDurationThreshold())));
}
if (info.getMinimumNumberOfCalls() != null) {
builder.minimumNumberOfCalls(info.getMinimumNumberOfCalls());
}
if (info.getPermittedNumberOfCallsInHalfOpenState() != null) {
builder.permittedNumberOfCallsInHalfOpenState(info.getPermittedNumberOfCallsInHalfOpenState());
}
if (info.getWaitDurationInOpenState() != null) {
builder.waitDurationInOpenState(Duration.ofMillis(TimeUnit.SECONDS.toMillis(info.getWaitDurationInOpenState())));
}
if (StringUtils.isNotBlank(info.getSlidingWindowType())) {
builder.slidingWindowType(CircuitBreakerConfig.SlidingWindowType.valueOf(info.getSlidingWindowType()));
}
if (info.getSlidingWindowSize() != null) {
builder.slidingWindowSize(info.getSlidingWindowSize());
}
if (CollectionUtils.isNotEmpty(info.getRecordExceptions())) {
List<Class<?>> recordExceptions = parseExceptions("recordExceptions", info.getRecordExceptions());
if (CollectionUtils.isNotEmpty(recordExceptions)) {
builder.recordExceptions(recordExceptions.toArray(new Class[0]));
}
}
if (CollectionUtils.isNotEmpty(info.getIgnoreExceptions())) {
List<Class<?>> ignoreExceptions = parseExceptions("ignoreExceptions", info.getIgnoreExceptions());
if (CollectionUtils.isNotEmpty(ignoreExceptions)) {
builder.ignoreExceptions(ignoreExceptions.toArray(new Class[0]));
}
}
return builder.build();
}
private List<Class<?>> parseExceptions(String paramName, List<String> exceptions) {
List<Class<?>> exceptionClasses = new ArrayList<>();
for (String ex : exceptions) {
try {
exceptionClasses.add(Class.forName(ex));
} catch (Exception e) {
logger.warn(String.format("从confPlus解析熔断器配置参数%s异常", paramName), e);
}
}
return exceptionClasses;
}
}
参考:Wiki - Gitee.com