vlambda博客
学习文章列表

限流降级组件Sentinel、Hystrix、Resilience4j对比

1、常用限流降级组件对比


Sentinel Hystrix resilience4j
隔离策略  信号量隔离(并发线程数限流)  线程池隔离/信号量隔离  信号量隔离 
熔断降级策略  基于响应时间、异常比率、异常数等  异常比率模式、超时熔断  基于异常比率、响应时间 
实时统计实现  滑动窗口(LeapArray)  滑动窗口(基于 RxJava)  Ring Bit Buffer 
动态规则配置  支持多种配置源 支持多种数据源  有限支持 
扩展性  丰富的 SPI 扩展接口  插件的形式  接口的形式 
基于注解的支持  支持  支持  支持 
限流  基于 QPS,支持基于调用关系的限流  有限的支持  Rate Limiter 
集群流量控制  支持  不支持  不支持 
流量整形  支持预热模式、匀速排队模式等多种复杂场景  不支持  简单的 Rate Limiter 模式 
系统自适应保护  支持  不支持  不支持 
控制台  提供开箱即用的控制台,可配置规则、查看秒级监控、机器发现等  简单的监控查看  不提供控制台,可对接其它监控系统 
多语言支持  Java / C++  Java  Java 
开源社区状态  活跃  停止维护  较活跃 

Hystrix 的关注点在于以 隔离 和 熔断 为主的容错机制,超时或被熔断的调用将会快速失败,并可以提供 fallback 机制。

而 Sentinel 的侧重点在于:

  • 多样化的流量控制策略,支持预热模式、匀速排队模式等多种复杂场景 

  • 熔断降级

  • 系统负载保护

  • 实时监控和控制台

Resilience4j是一个为Java8和函数式编程设计的容错库。

Resilience4j是一个受Netflix Hystrix启发的轻量级容错库,但它是为Java 8和函数式编程设计的。轻量级,因为库只使用Vavr,它没有任何其他外部库依赖项。相比之下,Netflix Hystrix对Archaius有一个编译依赖关系,Archaius有更多的外部库依赖关系,如Guava和Apache Commons配置。

Resilience4j提供高阶函数(decorators)来增强任何功能接口、lambda表达式或方法引用,包括断路器、速率限制器、重试或隔板。可以在任何函数接口、lambda表达式或方法引用上堆叠多个装饰器。优点是您可以选择所需的装饰器,而无需其他任何东西。

* 慢启动预热模式:当流量激增的时候,控制流量通过的速率,让通过的流量缓慢增加,在一定时间内逐渐增加到阈值上限,给冷系统一个预热的时间,避免冷系统被压垮。

* 匀速器模式:利用 Leaky Bucket 算法实现的匀速模式,严格控制了请求通过的时间间隔,同时堆积的请求将会排队,超过超时时长的请求直接被拒绝。

2、共同特性

资源模型和执行模型上的对比

Hystrix 的资源模型设计上采用了命令模式,将对外部资源的调用和 fallback 逻辑封装成一个命令对象(HystrixCommandHystrixObservableCommand),其底层的执行是基于 RxJava 实现的。每个 Command 创建时都要指定 commandKey 和 groupKey(用于区分资源)以及对应的隔离策略(线程池隔离 or 信号量隔离)。线程池隔离模式下需要配置线程池对应的参数(线程池名称、容量、排队超时等),然后 Command 就会在指定的线程池按照指定的容错策略执行;信号量隔离模式下需要配置最大并发数,执行 Command 时 Hystrix 就会限制其并发调用。

Sentinel 的设计则更为简单。相比 Hystrix Command 强依赖隔离规则,Sentinel 的资源定义与规则配置的耦合度更低。Hystrix 的 Command 强依赖于隔离规则配置的原因是隔离规则会直接影响 Command 的执行。在执行的时候 Hystrix 会解析 Command 的隔离规则来创建 RxJava Scheduler 并在其上调度执行,若是线程池模式则 Scheduler 底层的线程池为配置的线程池,若是信号量模式则简单包装成当前线程执行的 Scheduler。而 Sentinel 并不指定执行模型,也不关注应用是如何执行的。Sentinel 的原则非常简单:根据对应资源配置的规则来为资源执行相应的限流/降级/负载保护策略。在 Sentinel 中资源定义和规则配置是分离的。用户先通过 Sentinel API 给对应的业务逻辑定义资源(埋点),然后可以在需要的时候配置规则。埋点方式有两种:

  • try-catch 方式(通过 SphU.entry(...)),用户在 catch 块中执行异常处理 / fallback

  • if-else 方式(通过 SphO.entry(...)),当返回 false 时执行异常处理 / fallback

从 0.1.1 版本开始,Sentinel 还支持基于注解的资源定义方式,可以通过注解参数指定异常处理函数和 fallback 函数。

从 0.2.0 版本开始,Sentinel 引入异步调用链路支持,可以方便地统计异步调用资源的数据,维护异步调用链路,同时具备了适配异步框架/库的能力。可以参考 相关文档

Sentinel 提供多样化的规则配置方式。除了直接通过 loadRules API 将规则注册到内存态之外,用户还可以注册各种外部数据源来提供动态的规则。用户可以根据系统当前的实时情况去动态地变更规则配置,数据源会将变更推送至 Sentinel 并即时生效。

隔离设计上的对比

隔离是 Hystrix 的核心功能之一。Hystrix 提供两种隔离策略:线程池隔离(Bulkhead Pattern)和信号量隔离,其中最推荐也是最常用的是线程池隔离。Hystrix 的线程池隔离针对不同的资源分别创建不同的线程池,不同服务调用都发生在不同的线程池中,在线程池排队、超时等阻塞情况时可以快速失败,并可以提供 fallback 机制。线程池隔离的好处是隔离度比较高,可以针对某个资源的线程池去进行处理而不影响其它资源,但是代价就是线程上下文切换的 overhead 比较大,特别是对低延时的调用有比较大的影响。

但是,实际情况下,线程池隔离并没有带来非常多的好处。首先就是过多的线程池会非常影响性能。考虑这样一个场景,在 Tomcat 之类的 Servlet 容器使用 Hystrix,本身 Tomcat 自身的线程数目就非常多了(可能到几十或一百多),如果加上 Hystrix 为各个资源创建的线程池,总共线程数目会非常多(几百个线程),这样上下文切换会有非常大的损耗。另外,线程池模式比较彻底的隔离性使得 Hystrix 可以针对不同资源线程池的排队、超时情况分别进行处理,但这其实是超时熔断和流量控制要解决的问题,如果组件具备了超时熔断和流量控制的能力,线程池隔离就显得没有那么必要了。

Hystrix 的信号量隔离限制对某个资源调用的并发数。这样的隔离非常轻量级,仅限制对某个资源调用的并发数,而不是显式地去创建线程池,所以 overhead 比较小,但是效果不错,也支持超时失败。Sentinel 可以通过并发线程数模式的流量控制来提供信号量隔离的功能。并且结合基于响应时间的熔断降级模式,可以在不稳定资源的平均响应时间比较高的时候自动降级,防止过多的慢调用占满并发数,影响整个系统。

熔断降级对比

Sentinel 和 Hystrix 的熔断降级功能本质上都是基于熔断器模式(Circuit Breaker Pattern)。Sentinel 与 Hystrix 都支持基于失败比率(异常比率)的熔断降级,在调用达到一定量级并且失败比率达到设定的阈值时自动进行熔断,此时所有对该资源的调用都会被 block,直到过了指定的时间窗口后才启发性地恢复。上面提到过,Sentinel 还支持基于平均响应时间的熔断降级,可以在服务响应时间持续飙高的时候自动熔断,拒绝掉更多的请求,直到一段时间后才恢复。这样可以防止调用非常慢造成级联阻塞的情况。

实时指标统计实现对比

Hystrix 和 Sentinel 的实时指标数据统计实现都是基于滑动窗口的。Hystrix 1.5 之前的版本是通过环形数组实现的滑动窗口,通过锁配合 CAS 的操作对每个桶的统计信息进行更新。Hystrix 1.5 开始对实时指标统计的实现进行了重构,将指标统计数据结构抽象成了响应式流(reactive stream)的形式,方便消费者去利用指标信息。同时底层改造成了基于 RxJava 的事件驱动模式,在服务调用成功/失败/超时的时候发布相应的事件,通过一系列的变换和聚合最终得到实时的指标统计数据流,可以被熔断器或 Dashboard 消费。

Sentinel 目前抽象出了 Metric 指标统计接口,底层可以有不同的实现,目前默认的实现是基于 LeapArray 的高性能滑动窗口,后续根据需要可能会引入 reactive stream 等实现。

 

断路器的实现

1、引入maven依赖
<!--添加sentinel依赖--><dependency> <groupId>com.alibaba.csp</groupId> <artifactId>sentinel-core</artifactId> <version>1.8.2</version></dependency> <!--添加Hystrix依赖--><dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix</artifactId> <version>2.0.2.RELEASE</version></dependency> <!--添加resilience4j依赖--><dependency> <groupId>io.github.resilience4j</groupId> <artifactId>resilience4j-spring-boot2</artifactId> <version>0.14.1</version></dependency>
2、使用
@RestControllerpublic class TestController { @GetMapping("/hi-sentinel") @SentinelResource("hi-v2") public String hiSentinel() throws InterruptedException { // 1.5.0 版本开始可以直接利用 try-with-resources 特性,自动 exit entry try (Entry ignored = SphU.entry("hi-v2")) { // 被保护的逻辑 TimeUnit.SECONDS.sleep(1); return "hi, sentinel normal"; } catch (BlockException ex) { // 处理被流控的逻辑 return "hi, sentinel blocked"; } } @GetMapping("/hi-hystrix") @HystrixCommand( commandKey = "hystrix_key", //指定降级方法,在熔断和异常时会走降级方法 fallbackMethod = "fallbackMethod", commandProperties = { //超时时间 @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "3000"), //判断熔断的最少请求数,默认是10;只有在一定时间内请求数量达到该值,才会进行成功率的计算 @HystrixProperty(name = "circuitBreaker.requestVolumeThreshold", value = "10"), //熔断的阈值默认值50,表示在一定时间内有50%的请求处理失败,会触发熔断 @HystrixProperty(name = "circuitBreaker.errorThresholdPercentage", value = "10"), }, threadPoolProperties = { //并发,缺省为10 @HystrixProperty(name = "coreSize", value = "10") } ) public String hiHystrix() throws InterruptedException { TimeUnit.MILLISECONDS.sleep(RandomUtils.nextInt(300)); return "hi, hystrix normal"; } @GetMapping("/hi-resilience4j") @RateLimiter(name = "hi") public String hiResilience4j() { return "hi, resilience4j normal"; } public String fallbackMethod() { return "hi, hystrix blocked"; } @PostConstruct private static void initFlowRules() { List<FlowRule> rules = new ArrayList<>(); FlowRule rule = new FlowRule(); rule.setResource("hi-v2"); rule.setGrade(RuleConstant.FLOW_GRADE_QPS); // Set limit QPS to 20. rule.setCount(20); rules.add(rule); FlowRuleManager.loadRules(rules); } }

@SpringBootApplication@EnableAspectJAutoProxy@EnableHystrixpublic class DemoApplication { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } }

@Aspect@Servicepublic class R4jAop { @Pointcut("@annotation(io.github.resilience4j.ratelimiter.annotation.RateLimiter)") public void pointCut() { // do nothing } @Around("pointCut()") public Object doV2(ProceedingJoinPoint joinPoint) throws Throwable { try { return joinPoint.proceed(); } catch (RequestNotPermitted throwable) { return "hi, resilience4j blocked"; } catch (Throwable throwable) { throw throwable; } } }
注解配置
resilience4j.ratelimiter.limiters.hi.limit-for-period=3resilience4j.ratelimiter.limiters.hi.limit-refresh-period-in-millis=30000resilience4j.ratelimiter.limiters.hi.timeout-in-millis=1000resilience4j.ratelimiter.limiters.hi.subscribe-for-events=trueresilience4j.ratelimiter.limiters.hi.register-health-indicator=true


3、测试
@SpringBootTestclass DemoApplicationTests { @Autowired private TestController testController; @Test void hiSentinelTest() throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(50); for (int i = 0; i < 2000; i++) { executorService.execute(() -> { try { System.out.println(testController.hiSentinel()); } catch (InterruptedException e) { e.printStackTrace(); } }); } TimeUnit.SECONDS.sleep(5); } @Test void hiHystrixTest() throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(50); for (int i = 0; i < 2000; i++) { executorService.execute(() -> { try { System.out.println(testController.hiHystrix()); } catch (InterruptedException e) { e.printStackTrace(); } }); } TimeUnit.SECONDS.sleep(5); } @Test void hiResilience4jTest() throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(50); for (int i = 0; i < 2000; i++) { executorService.execute(() -> { System.out.println(testController.hiResilience4j()); }); } TimeUnit.SECONDS.sleep(5); } }



@Componentpublic class BreakerProcessor {
/** * 不同版本的熔断器配置对应不同的注册器,这里将注册器CircuitBreakerRegistry缓存起来 * key-版本号 * value-注册器CircuitBreakerRegistry */ private static final Map<String, CircuitBreakerRegistry> REGISTRY_MAP = new ConcurrentHashMap<>(); private static final String DEFAULT_REGISTRY = "default";

public <T> T getResult(String breakerName, Map<String, BreakerConfigInfo> breakerConfigMap, Supplier<T> supplier) { CircuitBreaker breaker = getCircuitBreaker(breakerName, breakerConfigMap); logger.info("breakerName=[{}],state=[{}],failureRate=[{}]", breakerName, breaker.getState(), breaker.getMetrics().getFailureRate()); Supplier<T> decoratedSupplier = CircuitBreaker.decorateSupplier(breaker, supplier); return Try.ofSupplier(decoratedSupplier).recover(throwable -> null).get(); }

public CircuitBreaker getCircuitBreaker(String breakerName, Map<String, BreakerConfigInfo> breakerConfigMap) { return getCircuitBreakerRegistry(breakerConfigMap).circuitBreaker(breakerName); }

/** * 获取注册器CircuitBreakerRegistry,用来统一管理不同配置的熔断器CircuitBreaker */ public CircuitBreakerRegistry getCircuitBreakerRegistry(Map<String, BreakerConfigInfo> breakerConfigMap) { try { if (MapUtils.isEmpty(breakerConfigMap)) { return getCircuitBreakerRegistry(DEFAULT_REGISTRY, null); } //版本号必须以大写V作为前缀,然后按照自然排序,拿到最大的版本 String versionNo = breakerConfigMap.keySet().stream() .filter(e -> StringUtils.startsWith(e, "V")) .max(String::compareTo) .orElse(DEFAULT_REGISTRY); return getCircuitBreakerRegistry(versionNo, breakerConfigMap.get(versionNo)); } catch (Exception e) { logger.error("解析confPlus配置获取注册器CircuitBreakerRegistry发生异常,使用默认注册器", e); return getCircuitBreakerRegistry(DEFAULT_REGISTRY, null); } }

private CircuitBreakerRegistry getCircuitBreakerRegistry(String versionNo, BreakerConfigInfo info) { logger.info("CircuitBreakerRegistry的版本和配置,versionNo=[{}],info=[{}]", versionNo, JSON.toJSONString(info)); if (REGISTRY_MAP.get(versionNo) != null) { return REGISTRY_MAP.get(versionNo); } synchronized (BreakerProcessor.class) { if (REGISTRY_MAP.get(versionNo) != null) { return REGISTRY_MAP.get(versionNo); } REGISTRY_MAP.put(versionNo, CircuitBreakerRegistry.of(getCircuitBreakerConfig(info))); return REGISTRY_MAP.get(versionNo); } }

private CircuitBreakerConfig getCircuitBreakerConfig(BreakerConfigInfo info) { if (info == null) { return CircuitBreakerConfig.custom() //失败率阈值,当失败率大于等于该阈值时,熔断器开启 .failureRateThreshold(50) //慢调用率阈值 .slowCallRateThreshold(100) .slowCallDurationThreshold(Duration.ofMillis(TimeUnit.SECONDS.toMillis(60))) //最小调用次数 .minimumNumberOfCalls(50) //断路器在半开状态下允许通过的调用次数 .permittedNumberOfCallsInHalfOpenState(5) //熔断器打开状态持续时间,超过改时间后才允许切换到半开状态 .waitDurationInOpenState(Duration.ofMillis(TimeUnit.SECONDS.toMillis(60))) //滑动窗口类型:COUNT_BASED,统计最近slidingWindowSize次调用结果;TIME_BASED,统计最近slidingWindowSize秒调用结果; .slidingWindowType(CircuitBreakerConfig.SlidingWindowType.COUNT_BASED) //滑动窗口大小 .slidingWindowSize(50) //列表内的异常被统计为失败 .recordExceptions(Throwable.class) .build(); } CircuitBreakerConfig.Builder builder = CircuitBreakerConfig.custom(); if (info.getFailureRateThreshold() != null) { builder.failureRateThreshold(info.getFailureRateThreshold()); } if (info.getSlowCallRateThreshold() != null) { builder.slowCallRateThreshold(info.getSlowCallRateThreshold()); } if (info.getSlowCallDurationThreshold() != null) { builder.slowCallDurationThreshold(Duration.ofMillis(TimeUnit.SECONDS.toMillis(info.getSlowCallDurationThreshold()))); } if (info.getMinimumNumberOfCalls() != null) { builder.minimumNumberOfCalls(info.getMinimumNumberOfCalls()); } if (info.getPermittedNumberOfCallsInHalfOpenState() != null) { builder.permittedNumberOfCallsInHalfOpenState(info.getPermittedNumberOfCallsInHalfOpenState()); } if (info.getWaitDurationInOpenState() != null) { builder.waitDurationInOpenState(Duration.ofMillis(TimeUnit.SECONDS.toMillis(info.getWaitDurationInOpenState()))); } if (StringUtils.isNotBlank(info.getSlidingWindowType())) { builder.slidingWindowType(CircuitBreakerConfig.SlidingWindowType.valueOf(info.getSlidingWindowType())); } if (info.getSlidingWindowSize() != null) { builder.slidingWindowSize(info.getSlidingWindowSize()); } if (CollectionUtils.isNotEmpty(info.getRecordExceptions())) { List<Class<?>> recordExceptions = parseExceptions("recordExceptions", info.getRecordExceptions()); if (CollectionUtils.isNotEmpty(recordExceptions)) { builder.recordExceptions(recordExceptions.toArray(new Class[0])); } } if (CollectionUtils.isNotEmpty(info.getIgnoreExceptions())) { List<Class<?>> ignoreExceptions = parseExceptions("ignoreExceptions", info.getIgnoreExceptions()); if (CollectionUtils.isNotEmpty(ignoreExceptions)) { builder.ignoreExceptions(ignoreExceptions.toArray(new Class[0])); } } return builder.build(); }

private List<Class<?>> parseExceptions(String paramName, List<String> exceptions) { List<Class<?>> exceptionClasses = new ArrayList<>(); for (String ex : exceptions) { try { exceptionClasses.add(Class.forName(ex)); } catch (Exception e) { logger.warn(String.format("从confPlus解析熔断器配置参数%s异常", paramName), e); } } return exceptionClasses; }
}

参考:Wiki - Gitee.com