OpenFeign 和 Hystrix 的源码解读(原创)
OpenFeign 和 Hystrix 的源码解读(原创)
Hystrix
目录
OpenFeign 和 Hystrix 的源码解读(原创)Hystrix什么是Hystrix雪崩效应雪崩效应的解决方案请求缓存:支持将一个请求和返回结果做缓存处理请求合并:将相同的请求合并进行批量请求服务隔离:服务间进行分割开来,资源互不影响使用线程池对每个接口进行隔离,保证接口之间互不影响 (适用于99%的场景)a使用信号量隔离,每次请求通过计数信号量来限制的,当信号量大于了最大请求数maxConcurrentRequests时,进行限制,调用fallback快速返回。信号量和线程池隔离的对比服务熔断:牺牲局部服务,保全整体服务的稳定服务降级:服务熔断之后,返回缺省值Hystrix源码解析OpenFeign什么是OpenFeignopenFeign的使用OpenFeign原理和源码解析feign自定义负载均衡(因为我的openFeign版本号是3.0.2,不再支持ribbon,所以使用spring-cloud-starter-loadbalancer里面的类来进行自定义)谈一谈openFeign的具体使用openFeign的版本号区别(重要)可查阅官方文档 https://spring.io/projects/spring-cloud-openfeign/#learn
什么是Hystrix
是由netflix的一个开源组件,目的是为了提高整个系统的稳定性。
雪崩效应
当一个服务被其他服务调用的时候,被调用的服务崩溃之后会引发其他与之直接或间接相关服务的崩溃,简称雪崩效应造成的原因:
服务提供者不可用(硬件故障,程序Bug,缓存击穿,用户大量请求等)
重试加大流量(用户重试,代码逻辑生成)
服务消费者不可用(同步等待造成的资源耗尽)
雪崩效应的解决方案
请求缓存:支持将一个请求和返回结果做缓存处理
使用redis进行数据缓存<!-- 添加redis的依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!-- 添加线程池依赖 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId></dependency>/**redis的配置类*/@Configurationpublic class RedisConfig {@Beanpublic RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory redisConnectionFactory){RedisTemplate<String,Object> redisTemplate = new RedisTemplate<>();//为string类型的key设置序列化器redisTemplate.setKeySerializer(new StringRedisSerializer());//为string类型的value设置序列化器redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());redisTemplate.setHashKeySerializer(new StringRedisSerializer());redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());redisTemplate.setConnectionFactory(redisConnectionFactory);return redisTemplate;}//重写cache序列化@Beanpublic RedisCacheManager redisCacheManager(RedisTemplate redisTemplate){RedisCacheWriter redisCacheWriter = RedisCacheWriter.nonLockingRedisCacheWriter(redisTemplate.getConnectionFactory());RedisCacheConfiguration redisCacheConfiguration = RedisCacheConfiguration.defaultCacheConfig()//设置默认过期时间.entryTtl(Duration.ofMinutes(30))//设置key和value的序列化.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisTemplate.getKeySerializer())).serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(redisTemplate.getValueSerializer()));return new RedisCacheManager(redisCacheWriter, redisCacheConfiguration);}}@Cacheable(cacheNames = "ActivityService:user:single", key = "#id")public Future<User> selectByUserId(Long id){return null;}最后在启动类上面配置
@EnableCaching请求合并:将相同的请求合并进行批量请求
优点:是多个相同的请求可以将数据合并调用批量的接口,降低了请求的次数缺点:先到的请求不能立马返回结果,而是要根据规则等待@HystrixCollapser(batchMethod = "selectByUserIds", //请求合并的方法scope = com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL, //请求的方式collapserProperties = {//间隔多久的时间会合并@HystrixProperty(name = "timeDelayInMilliseconds", value = "20"),//批处理之前,批处理中允许的最大请求数@HystrixProperty(name = "maxRequestsInBatch", value = "200")})public Future<User> selectByUserId(Long id){return null;}服务隔离:服务间进行分割开来,资源互不影响
使用线程池对每个接口进行隔离,保证接口之间互不影响 (适用于99%的场景)a
@HystrixCommand(groupKey = "activity-service-single", //服务的名称,相同名称的使用同一个线程池commandKey = "selectByUserId", //接口名称,默认为方法名threadPoolKey = "activity-service-single", //线程池名称,相同名称使用同一个线程池commandProperties = {//超时时间,默认1000ms@HystrixProperty(name = HystrixPropertiesManager.EXECUTION_ISOLATION_THREAD_TIMEOUT_IN_MILLISECONDS ,value = "5000")},threadPoolProperties = {//线程池大小@HystrixProperty(name = HystrixPropertiesManager.CORE_SIZE , value = "6"),//队列等待阈值@HystrixProperty(name = HystrixPropertiesManager.MAX_QUEUE_SIZE , value = "100"),//线程存活时间,默认1min@HystrixProperty(name = HystrixPropertiesManager.KEEP_ALIVE_TIME_MINUTES , value = "2"),//超出队列等待阈值执行拒绝策略@HystrixProperty(name = HystrixPropertiesManager.QUEUE_SIZE_REJECTION_THRESHOLD , value = "100"),},fallbackMethod = "selectByUserIdFallBack")public Future<User> selectByUserId(Long id){return null;}使用信号量隔离,每次请求通过计数信号量来限制的,当信号量大于了最大请求数maxConcurrentRequests时,进行限制,调用fallback快速返回。
@HystrixCommand(commandProperties = {//超时时间,默认1000ms@HystrixProperty(name = HystrixPropertiesManager.EXECUTION_ISOLATION_THREAD_TIMEOUT_IN_MILLISECONDS , value = "5000"),//信号量隔离,默认值是Thread@HystrixProperty(name = HystrixPropertiesManager.EXECUTION_ISOLATION_STRATEGY , value = "SEMAPHORE"),//信号量最大并发,调小一些方便模拟高并发@HystrixProperty(name = HystrixPropertiesManager.EXECUTION_ISOLATION_SEMAPHORE_MAX_CONCURRENT_REQUESTS , value = "6"),},fallbackMethod = "selectByUserIdFallBack")public Future<User> selectByUserId(Long id){return null;}信号量和线程池隔离的对比
隔离方式 是否支持超时 是否支持熔断 隔离原理 是否是异步调用 资源消耗 线程池隔离 支持 支持 每个服务单独一个线程池 支持异步和同步 大 信号量隔离 不支持 支持 通过信号量计数器 只支持同步 小 服务熔断:牺牲局部服务,保全整体服务的稳定
@HystrixCommand(
commandProperties = {
//10s内请求大于10个,就启动熔断。执行fallback的方法
@HystrixProperty(name = HystrixPropertiesManager.CIRCUIT_BREAKER_REQUEST_VOLUME_THRESHOLD , value = "10"),
//请求错误率大于50%就启动熔断器,然后for循环重试,符合熔断条件就出发fallback的方法
@HystrixProperty(name = HystrixPropertiesManager.CIRCUIT_BREAKER_ERROR_THRESHOLD_PERCENTAGE , value = "50"),
//熔断多少秒后开始重试,默认是5s
@HystrixProperty(name = HystrixPropertiesManager.CIRCUIT_BREAKER_SLEEP_WINDOW_IN_MILLISECONDS , value = "5000")
},
fallbackMethod = "selectByUserIdFallBack"
)
public Future<User> selectByUserId(Long id){
return null;
}服务降级:服务熔断之后,返回缺省值
Hystrix源码解析
@HystrixCommand(hystrix的核心组件,初始化、执行、限流、熔断等都是在这个组件中完成的) 和@HystrixCollapser注解,使用HystrixCommandAspect进行解析/**@Hystrix注解*/@Target({ElementType.METHOD})@Retention(RetentionPolicy.RUNTIME)@Inherited@Documentedpublic @interface HystrixCommand {//分组的名称,默认是类名String groupKey() default "";//命令名称,默认是 @HystrixCommand 所在的方法名String commandKey() default "";//线程池名称String threadPoolKey() default "";//回调方法的名称,该类必须和@HystrixCommand注释在同一个方法,有相同的入参和出参String fallbackMethod() default "";//配置相关参数HystrixProperty[] commandProperties() default {};//线程池参数HystrixProperty[] threadPoolProperties() default {};//配置需要过滤的异常Class<? extends Throwable>[] ignoreExceptions() default {};//定义使用哪种模式ObservableExecutionMode observableExecutionMode() default ObservableExecutionMode.EAGER;HystrixException[] raiseHystrixExceptions() default {};//默认的回退方法,如果fallbackMethod 和defaultFallback 同时指定了,fallbackMethod优先// 默认的回退方法 不能有入参,出参需要保持一致String defaultFallback() default "";}/**@HystrixCollapser注解*/@Target({ElementType.METHOD})@Retention(RetentionPolicy.RUNTIME)@Documentedpublic @interface HystrixCollapser {//合并名称,默认是 注解的方法名称String collapserKey() default "";/* 合并的方法名称合并的方法必须 要是这种特征:java.util.List method(java.util.List)合并的方法只能有一个 参数*/String batchMethod();/**范围, 默认是一次请求,可以改成 GLOBAL 全局的*/Scope scope() default Scope.REQUEST;/*** 合并的相关属性配置* 以 key-value 形式配置,可以配置多个,数组*/HystrixProperty[] collapserProperties() default {};}@Aspectpublic class HystrixCommandAspect {private static final Map<HystrixPointcutType, MetaHolderFactory> META_HOLDER_FACTORY_MAP;static {META_HOLDER_FACTORY_MAP = ImmutableMap.<HystrixPointcutType, MetaHolderFactory>builder().put(HystrixPointcutType.COMMAND, new CommandMetaHolderFactory()).put(HystrixPointcutType.COLLAPSER, new CollapserMetaHolderFactory()).build();}//切点 @HystrixCommand@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")public void hystrixCommandAnnotationPointcut() {}//切点 @HystrixCollapser@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")public void hystrixCollapserAnnotationPointcut() {}@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {Method method = getMethodFromTarget(joinPoint);Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);//不能两个注解同时使用HystrixCommand和HystrixCollapser注解if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +"annotations at the same time");}//根据不同的注解,获取对应MetaHolderFactoryMetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));//整和构建hystrix的必要信息到metaHolder中 (1) (2) 后续有讲解MetaHolder metaHolder = metaHolderFactory.create(joinPoint);//获取hystrix的基类 (3)后续有讲解HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);//获取执行方式 executionTypeExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();Object result;try {if (!metaHolder.isObservable()) {//利用工具CommandExecutor来执行result = CommandExecutor.execute(invokable, executionType, metaHolder);} else {result = executeObservable(invokable, executionType, metaHolder);}} catch (HystrixBadRequestException e) {throw e.getCause();} catch (HystrixRuntimeException e) {throw hystrixRuntimeExceptionToThrowable(metaHolder, e);}return result;}private Observable executeObservable(HystrixInvokable invokable, ExecutionType executionType, final MetaHolder metaHolder) {return ((Observable) CommandExecutor.execute(invokable, executionType, metaHolder)).onErrorResumeNext(new Func1<Throwable, Observable>() {@Overridepublic Observable call(Throwable throwable) {if (throwable instanceof HystrixBadRequestException) {return Observable.error(throwable.getCause());} else if (throwable instanceof HystrixRuntimeException) {HystrixRuntimeException hystrixRuntimeException = (HystrixRuntimeException) throwable;return Observable.error(hystrixRuntimeExceptionToThrowable(metaHolder, hystrixRuntimeException));}return Observable.error(throwable);}});}// (1) -> 暂不讲解private static class CollapserMetaHolderFactory extends MetaHolderFactory {@Overridepublic MetaHolder create(Object proxy, Method collapserMethod, Object obj, Object[] args, final ProceedingJoinPoint joinPoint) {HystrixCollapser hystrixCollapser = collapserMethod.getAnnotation(HystrixCollapser.class);if (collapserMethod.getParameterTypes().length > 1 || collapserMethod.getParameterTypes().length == 0) {throw new IllegalStateException("Collapser method must have one argument: " + collapserMethod);}Method batchCommandMethod = getDeclaredMethod(obj.getClass(), hystrixCollapser.batchMethod(), List.class);if (batchCommandMethod == null)throw new IllegalStateException("batch method is absent: " + hystrixCollapser.batchMethod());Class<?> batchReturnType = batchCommandMethod.getReturnType();Class<?> collapserReturnType = collapserMethod.getReturnType();boolean observable = collapserReturnType.equals(Observable.class);if (!collapserMethod.getParameterTypes()[0].equals(getFirstGenericParameter(batchCommandMethod.getGenericParameterTypes()[0]))) {throw new IllegalStateException("required batch method for collapser is absent, wrong generic type: expected "+ obj.getClass().getCanonicalName() + "." +hystrixCollapser.batchMethod() + "(java.util.List<" + collapserMethod.getParameterTypes()[0] + ">), but it's " +getFirstGenericParameter(batchCommandMethod.getGenericParameterTypes()[0]));}final Class<?> collapserMethodReturnType = getFirstGenericParameter(collapserMethod.getGenericReturnType(),Future.class.isAssignableFrom(collapserReturnType) || Observable.class.isAssignableFrom(collapserReturnType) ? 1 : 0);Class<?> batchCommandActualReturnType = getFirstGenericParameter(batchCommandMethod.getGenericReturnType());if (!collapserMethodReturnType.equals(batchCommandActualReturnType)) {throw new IllegalStateException("Return type of batch method must be java.util.List parametrized with corresponding type: expected " +"(java.util.List<" + collapserMethodReturnType + ">)" + obj.getClass().getCanonicalName() + "." +hystrixCollapser.batchMethod() + "(java.util.List<" + collapserMethod.getParameterTypes()[0] + ">), but it's " +batchCommandActualReturnType);}HystrixCommand hystrixCommand = batchCommandMethod.getAnnotation(HystrixCommand.class);if (hystrixCommand == null) {throw new IllegalStateException("batch method must be annotated with HystrixCommand annotation");}// method of batch hystrix command must be passed to metaholder because basically collapser doesn't have any actions// that should be invoked upon intercepted method, it's required only for underlying batch commandMetaHolder.Builder builder = metaHolderBuilder(proxy, batchCommandMethod, obj, args, joinPoint);if (isCompileWeaving()) {builder.ajcMethod(getAjcMethodAroundAdvice(obj.getClass(), batchCommandMethod.getName(), List.class));}builder.hystrixCollapser(hystrixCollapser);builder.defaultCollapserKey(collapserMethod.getName());builder.collapserExecutionType(ExecutionType.getExecutionType(collapserReturnType));builder.defaultCommandKey(batchCommandMethod.getName());builder.hystrixCommand(hystrixCommand);builder.executionType(ExecutionType.getExecutionType(batchReturnType));builder.observable(observable);FallbackMethod fallbackMethod = MethodProvider.getInstance().getFallbackMethod(obj.getClass(), batchCommandMethod);if (fallbackMethod.isPresent()) {fallbackMethod.validateReturnType(batchCommandMethod);builder.fallbackMethod(fallbackMethod.getMethod()).fallbackExecutionType(ExecutionType.getExecutionType(fallbackMethod.getMethod().getReturnType()));}return builder.build();}}// (2) ->private static class CommandMetaHolderFactory extends MetaHolderFactory {@Overridepublic MetaHolder create(Object proxy, Method method, Object obj, Object[] args, final ProceedingJoinPoint joinPoint) {HystrixCommand hystrixCommand = method.getAnnotation(HystrixCommand.class);//获取返回类型是异步、同步还是响应式(observable)ExecutionType executionType = ExecutionType.getExecutionType(method.getReturnType());//构建 MetaHolder,将Hystrix需要用到的参数放入MetaHolder对象里面//setFallbackMethod(builder, obj.getClass(), method);同时设置falllbackMetaHolder.Builder builder = metaHolderBuilder(proxy, method, obj, args, joinPoint);if (isCompileWeaving()) {builder.ajcMethod(getAjcMethodFromTarget(joinPoint));}return builder.defaultCommandKey(method.getName()).hystrixCommand(hystrixCommand).observableExecutionMode(hystrixCommand.observableExecutionMode()).executionType(executionType).observable(ExecutionType.OBSERVABLE == executionType).build();}}//省略}现在来讲一下方法执行的核心
//注意包路径package com.netflix.hystrix.contrib.javanica.command;public class CommandExecutor {/**被观察者(observable)观察者(observe)订阅(subscribe)*/public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {Validate.notNull(invokable);Validate.notNull(metaHolder);switch (executionType) {//同步 从依赖的服务中获取到结果case SYNCHRONOUS: {//调用 HystixCommand#execute()方法进行return castToExecutable(invokable, executionType).execute();}//异步 获取到futruecase ASYNCHRONOUS: {HystrixExecutable executable = castToExecutable(invokable, executionType);if (metaHolder.hasFallbackMethodCommand()&& ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {return new FutureDecorator(executable.queue());}return executable.queue();}//观察者模式 回调异步执行命令//observe()//toObservable() 只在订阅了之后执行命令case OBSERVABLE: {HystrixObservable observable = castToObservable(invokable);return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();}default:throw new RuntimeException("unsupported execution type: " + executionType);}}}//上面是@HystrixCommand注解所进行前置操作//下面是openFeign集成hystrix进行的熔断的前置操作//都会调用 HystixCommand#execute()方法进行package org.springframework.cloud.netflix.hystrix;public class HystrixCircuitBreaker implements CircuitBreaker {private HystrixCommand.Setter setter;public HystrixCircuitBreaker(HystrixCommand.Setter setter) {this.setter = setter;}@Overridepublic <T> T run(Supplier<T> toRun, Function<Throwable, T> fallback) {HystrixCommand<T> command = new HystrixCommand<T>(setter) {@Overrideprotected T run() throws Exception {return toRun.get();}@Overrideprotected T getFallback() {return fallback.apply(getExecutionException());}};//调用 HystixCommand#execute()方法进行,注意,里面的方法是queue().get()//注意,因为此地方的future的get被单独用信号量封装过,所以这里是阻塞等待return command.execute();}}package com.netflix.hystrix;public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R>{//用于命令的异步执行//让线程在线程池排队,返回一个future以查看结果//如果没有配置在线程池中运行,则作用和 execute 一样,会被阻塞public Future<R> queue() {//当 Future.cancel(boolean) 返回的标志位mayInterrupt为true的时候,由 Observable.toBlocking().toFuture()返回的future不会去终端执行。因此,为了遵守future的规则,我们必须围绕future操作//这里执行请求的操作//重点*************这里的future是进行封装过的,如下面的代码所示//toObservable()方法也是重点,后面的AbstractCommand#toObservable()会讲解final Future<R> delegate = toObservable().toBlocking().toFuture();//新创建一个future去围绕前面返回的future进行操作final Future<R> f = new Future<R>() {@Overridepublic boolean cancel(boolean mayInterruptIfRunning) {//如果执行的线程已经关闭,则返回关闭失败if (delegate.isCancelled()) {return false;}//当mayInterruptIfRunning为true的时候,是否应该中断执行线程if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) {//中断线程标志位置位trueinterruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning);}//由上面来决定执行线程是否中断final boolean res = delegate.cancel(interruptOnFutureCancel.get());//如果command的状态不为中断并且中断标志位为true的时候//获取正在执行的其他线程,帮助其中断if (!isExecutionComplete() && interruptOnFutureCancel.get()) {final Thread t = executionThread.get();if (t != null && !t.equals(Thread.currentThread())) {t.interrupt();}}//返回执行线程中断的结果return res;}@Overridepublic boolean isCancelled() {return delegate.isCancelled();}@Overridepublic boolean isDone() {return delegate.isDone();}@Overridepublic R get() throws InterruptedException, ExecutionException {return delegate.get();}@Overridepublic R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {return delegate.get(timeout, unit);}};//下面对抛出的错误状态进行处理//判断是否执行完毕,isDone是不阻塞的。 那么如果这里没有判断到怎么办呢?if (f.isDone()) {try {//等待直到任务执行完毕、失败、取消f.get();return f;} catch (Exception e) {Throwable t = decomposeException(e);if (t instanceof HystrixBadRequestException) {return f;} else if (t instanceof HystrixRuntimeException) {HystrixRuntimeException hre = (HystrixRuntimeException) t;switch (hre.getFailureType()) {case COMMAND_EXCEPTION:case TIMEOUT:// we don't throw these types from queue() only from queue().get() as they are execution errorsreturn f;default:// these are errors we throw from queue() as they as rejection type errorsthrow hre;}} else {throw Exceptions.sneakyThrow(t);}}}//返回还在执行的futurereturn f;}}//封装的futurepublic final class BlockingOperatorToFuture {public static <T> Future<T> toFuture(Observable<? extends T> that) {final CountDownLatch finished = new CountDownLatch(1); //完成的信号量final AtomicReference<T> value = new AtomicReference<T>();final AtomicReference<Throwable> error = new AtomicReference<Throwable>();@SuppressWarnings("unchecked")//订阅,及时根据返回值执行相应的任务final Subscription s = ((Observable<T>)that).single().subscribe(new Subscriber<T>() {@Overridepublic void onCompleted() {finished.countDown();}@Overridepublic void onError(Throwable e) {error.compareAndSet(null, e);finished.countDown();}@Overridepublic void onNext(T v) {// "single" guarantees there is only one "onNext"value.set(v);}});return new Future<T>() {private volatile boolean cancelled;@Override//如果还存在未完成的任务,则取消,同时将信号量置位0,释放等待线程public boolean cancel(boolean mayInterruptIfRunning) {if (finished.getCount() > 0) {cancelled = true;s.unsubscribe();finished.countDown();return true;} else {// can't cancelreturn false;}}@Overridepublic boolean isCancelled() {return cancelled;}@Overridepublic boolean isDone() {return finished.getCount() == 0;}@Override//通过信号量改装get为阻塞队列public T get() throws InterruptedException, ExecutionException {finished.await();return getValue();}@Overridepublic T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {if (finished.await(timeout, unit)) {return getValue();} else {throw new TimeoutException("Timed out after " + unit.toMillis(timeout) + "ms waiting for underlying Observable.");}}private T getValue() throws ExecutionException {final Throwable throwable = error.get();if (throwable != null) {throw new ExecutionException("Observable onError", throwable);} else if (cancelled) {// Contract of Future.get() requires us to throw this:throw new CancellationException("Subscription unsubscribed");} else {return value.get();}}};}}/**HystrixCommand 是 abstractCommand 的实现类,我们需要对 abstractCommand 这个类有一个大概的了解,便于理解断路器的执行理念*/abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {protected final HystrixCircuitBreaker circuitBreaker; //断路器protected final HystrixThreadPool threadPool; //线程池,用于给每一个请求在单独的线程上执行HystrixCommand#run()方法protected final HystrixThreadPoolKey threadPoolKey; //线程池的key,用于监视、度量发布等操作protected final HystrixCommandProperties properties; //HystrixCommand的属性配置信息//未执行,完成,超时protected enum TimedOutStatus {NOT_EXECUTED, COMPLETED, TIMED_OUT}//protected enum CommandState {NOT_STARTED, OBSERVABLE_CHAIN_CREATED, USER_CODE_EXECUTED, UNSUBSCRIBED, TERMINAL}protected enum ThreadState {NOT_USING_THREAD, STARTED, UNSUBSCRIBED, TERMINAL}protected final HystrixCommandMetrics metrics; //运行中的指标、数据统计、监控的数据protected final HystrixCommandKey commandKey; //HystrixCommand的keyprotected final HystrixCommandGroupKey commandGroup; //HystrixCommand分组keyprotected final HystrixEventNotifier eventNotifier; //接收不同的通知protected final HystrixConcurrencyStrategy concurrencyStrategy; //系统并发方面的操作protected final HystrixCommandExecutionHook executionHook; //在HystrixCommand的生命周期执行操作//回退的信号量protected final TryableSemaphore fallbackSemaphoreOverride;//每一个断路器都有一个信号量,用来控制回退的protected static final ConcurrentHashMap<String, TryableSemaphore> fallbackSemaphorePerCircuit = new ConcurrentHashMap<String, TryableSemaphore>();//执行的信号量protected final TryableSemaphore executionSemaphoreOverride;//每一个断路器都有一个信号量,用来控制执行的protected static final ConcurrentHashMap<String, TryableSemaphore> executionSemaphorePerCircuit = new ConcurrentHashMap<String, TryableSemaphore>();protected final AtomicReference<Reference<TimerListener>> timeoutTimer = new AtomicReference<Reference<TimerListener>>();protected AtomicReference<CommandState> commandState = new AtomicReference<CommandState>(CommandState.NOT_STARTED);protected AtomicReference<ThreadState> threadState = new AtomicReference<ThreadState>(ThreadState.NOT_USING_THREAD);//就代码执行的状态protected volatile ExecutionResult executionResult = ExecutionResult.EMPTY; //state on shared execution//请求的返回值来自缓存,默认falseprotected volatile boolean isResponseFromCache = false;protected volatile ExecutionResult executionResultAtTimeOfCancellation;protected volatile long commandStartTimestamp = -1L;//命令执行超时protected final AtomicReference<TimedOutStatus> isCommandTimedOut = new AtomicReference<TimedOutStatus>(TimedOutStatus.NOT_EXECUTED);protected volatile Action0 endCurrentThreadExecutingCommand;protected final HystrixRequestCache requestCache;protected final HystrixRequestLog currentRequestLog;private static ConcurrentHashMap<Class<?>, String> defaultNameCache = new ConcurrentHashMap<Class<?>, String>();protected static ConcurrentHashMap<HystrixCommandKey, Boolean> commandContainsFallback = new ConcurrentHashMap<HystrixCommandKey, Boolean>();//请求的执行public Observable<R> toObservable() {final AbstractCommand<R> _cmd = this;//完成时的清理动作final Action0 terminateCommandCleanup = new Action0() {@Overridepublic void call() {//观察者模式执行if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) {handleCommandEnd(false); //user code never ran//手动执行} else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) {handleCommandEnd(true); //user code did run}}};//取消订阅时候的清理操作,也就是command的状态为 CANCELLEDfinal Action0 unsubscribeCommandCleanup = new Action0() {@Overridepublic void call() {if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) {if (!_cmd.executionResult.containsTerminalEvent()) {_cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey);try {executionHook.onUnsubscribe(_cmd);} catch (Throwable hookEx) {logger.warn("Error calling HystrixCommandExecutionHook.onUnsubscribe", hookEx);}_cmd.executionResultAtTimeOfCancellation = _cmd.executionResult.addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED);}handleCommandEnd(false); //user code never ran} else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) {if (!_cmd.executionResult.containsTerminalEvent()) {_cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey);try {executionHook.onUnsubscribe(_cmd);} catch (Throwable hookEx) {logger.warn("Error calling HystrixCommandExecutionHook.onUnsubscribe", hookEx);}_cmd.executionResultAtTimeOfCancellation = _cmd.executionResult.addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED);}handleCommandEnd(true); //user code did run}}};final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {@Overridepublic Observable<R> call() {//如果command的取消订阅,则中断时间流if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {return Observable.never();}//执行方法return applyHystrixSemantics(_cmd);}};final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() {@Overridepublic R call(R r) {R afterFirstApplication = r;try {afterFirstApplication = executionHook.onComplete(_cmd, r);} catch (Throwable hookEx) {logger.warn("Error calling HystrixCommandExecutionHook.onComplete", hookEx);}try {return executionHook.onEmit(_cmd, afterFirstApplication);} catch (Throwable hookEx) {logger.warn("Error calling HystrixCommandExecutionHook.onEmit", hookEx);return afterFirstApplication;}}};final Action0 fireOnCompletedHook = new Action0() {@Overridepublic void call() {try {executionHook.onSuccess(_cmd);} catch (Throwable hookEx) {logger.warn("Error calling HystrixCommandExecutionHook.onSuccess", hookEx);}}};//使用 Observable.defer 的延迟操作,不过所有与该操作符相关的数据都是在订阅是才生效的。//rx的Observable可以下来了解一下return Observable.defer(new Func0<Observable<R>>() {@Overridepublic Observable<R> call() {//修改状态为创建订阅if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);}commandStartTimestamp = System.currentTimeMillis();//是否启用命令记录日志记录,requestLog.enabled 默认是trueif (properties.requestLogEnabled().get()) {//日志插入if (currentRequestLog != null) {currentRequestLog.addExecutedCommand(_cmd);}}//requestCache.enabled是否允许并且实现类的 getCacheKey()方法返回值伟truefinal boolean requestCacheEnabled = isRequestCachingEnabled();final String cacheKey = getCacheKey();//如果请求缓存允许,从缓存中获取结果if (requestCacheEnabled) {HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);if (fromCache != null) {isResponseFromCache = true;return handleRequestCacheHitAndEmitValues(fromCache, _cmd);}}//执行applyHystrixSemantics这个方法Observable<R> hystrixObservable =Observable.defer(applyHystrixSemantics).map(wrapWithAllOnNextHooks);Observable<R> afterCache;// 如果请求缓存允许,并且指定了缓存key,那么就将请求缓存到相应的地方if (requestCacheEnabled && cacheKey != null) {HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);if (fromCache != null) {//如果另外一个线程成功了,那我们就使用缓存toCache.unsubscribe();isResponseFromCache = true;return handleRequestCacheHitAndEmitValues(fromCache, _cmd);} else {// we just created an ObservableCommand so we cast and return itafterCache = toCache.toObservable();}} else {afterCache = hystrixObservable;}//再放入观察者需要针对某些状态需要做的事情return afterCache.doOnTerminate(terminateCommandCleanup).doOnUnsubscribe(unsubscribeCommandCleanup).doOnCompleted(fireOnCompletedHook);}});}// 执行方法private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {executionHook.onStart(_cmd);//判断请求是否允许执行if (circuitBreaker.allowRequest()) {//获取到回退的信号量,参数fallback.isolation.semaphore.maxConcurrentRequests,默认大小是10个final TryableSemaphore executionSemaphore = getExecutionSemaphore();final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);final Action0 singleSemaphoreRelease = new Action0() {@Overridepublic void call() {if (semaphoreHasBeenReleased.compareAndSet(false, true)) {executionSemaphore.release();}}};final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {@Overridepublic void call(Throwable t) {eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);}};//尝试获取信号量if (executionSemaphore.tryAcquire()) {try {//executionResult记录命令的执行状态executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());//执行方法,使用观察者模式进行成功或者失败的处理return executeCommandAndObserve(_cmd).doOnError(markExceptionThrown).doOnTerminate(singleSemaphoreRelease).doOnUnsubscribe(singleSemaphoreRelease);} catch (RuntimeException e) {return Observable.error(e);}} else {return handleSemaphoreRejectionViaFallback();}} else {return handleShortCircuitViaFallback();}}}接下来讲解一下
HystrixCircuitBreaker这个实现断路器功能的核心接口类
同事,它有两个内部的实现类HystrixCircuitBreakerImpl和NoOpCircuitBreaker,HystrixCircuitBreakerImpl主要是要去查看HystrixCommand的状态,决定要不要去执行请求。public interface HystrixCircuitBreaker {//所有的请求都会去询问是否允许被执行public boolean allowRequest();//断路器是否打开public boolean isOpen();/*** Invoked on successful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state.*//* package */void markSuccess();static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {private final HystrixCommandProperties properties;private final HystrixCommandMetrics metrics;//断路器的状态,默认是关闭private AtomicBoolean circuitOpen = new AtomicBoolean(false);//断路器允许被打开或者最后尝试的时间点private AtomicLong circuitOpenedOrLastTestedTime = new AtomicLong();//////protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {this.properties = properties;this.metrics = metrics;}public void markSuccess() {if (circuitOpen.get()) {if (circuitOpen.compareAndSet(true, false)) {//win the thread race to reset metrics//Unsubscribe from the current stream to reset the health counts stream. This only affects the health counts view,//and all other metric consumers are unaffected by the resetmetrics.resetStream();}}}@Overridepublic boolean allowRequest() {//查看circuitBreaker.forceOpen是否配置了,是否强制打开断路器,默认是falseif (properties.circuitBreakerForceOpen().get()) {//如果circuitBreaker.forceOpen是true,那么就是断路器打开,所有的请求都不允许通过return false;}//circuitBreaker.forceClosed:允许忽略错误和永不打开断路器(让所有请求通过),默认是falseif (properties.circuitBreakerForceClosed().get()) {// 及时允许所有请求通过,我们仍然想模拟正常的请求执行isOpen()isOpen();return true;}//如果断路器没有打开或者达到重试的要求,默认5s,则允许通过请求return !isOpen() || allowSingleTest();}public boolean allowSingleTest() {long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();// 1) 如果断路器打开// 2) circuitBreaker.sleepWindowInMilliseconds,默认值5000,如果当前的时间点大于电路打开之后的重试时间,默认是5s重试一下if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {return true;}}return false;}@Overridepublic boolean isOpen() {//判断断路器是否打开,默认falseif (circuitOpen.get()) {return true;}// 检索出错误次数和错误百分比HealthCounts health = metrics.getHealthCounts();//circuitBreaker.requestVolumeThreshold,默认是20;检查总请求是是否小于在作出开关操作之前要求的最小请求数if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {// 如果偶没有达到执行开关操作的最小请求数时,直接返回falsereturn false;}//circuitBreaker.errorThresholdPercentage,默认是50;检查失败的百分比是否达到打开短路器的最小百分比if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {return false;} else {// 将断路器开关置位true打开if (circuitOpen.compareAndSet(false, true)) {//设置时打开或者最后测试的时间circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());return true;} else {//即使有多线程竞争,说明另外一个线程也在打开断路器,所以并不影响return true;}}}}static class NoOpCircuitBreaker implements HystrixCircuitBreaker {@Overridepublic boolean allowRequest() {return true;}@Overridepublic boolean isOpen() {return false;}@Overridepublic void markSuccess() {}}}OpenFeign
什么是OpenFeign
ribbon是一个基于HTTP和TCP的客户端,提供了负载均衡、失败重试、ping等功能;feign是一个声明式的Web服务端,内置有ribbon。与ribbon不同的是,fein只需要定义服务绑定接口且以声明式的方法,优雅而简单的实现了服务调用。openFein是feign的升级版,支持@RequestMapping这种springMVC的注解openFeign的使用
/**开启feign*/@SpringBootApplication@EnableFeignClients //开启feignpublic class UserApplication {public static void main(String[] args) {SpringApplication.run(UserApplication.class, args);}}/**实例使用*/@FeignClient(value = "activity")public interface RegisterFeign {@RequestMapping("/firstLoginActivity")public String firstLogin(Long userId);}OpenFeign原理和源码解析
@Retention(RetentionPolicy.RUNTIME)@Target(ElementType.TYPE)@Documented@Import(FeignClientsRegistrar.class)public @interface EnableFeignClients {String[] value() default {};String[] basePackages() default {};Class<?>[] basePackageClasses() default {};Class<?>[] defaultConfiguration() default {};Class<?>[] clients() default {};}//使用FeignClientsRegistrar来初始化feiginClientclass FeignClientsRegistrar implements ImportBeanDefinitionRegistrar, ResourceLoaderAware, EnvironmentAware {@Overridepublic void registerBeanDefinitions(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {//加载EnableFeignClients里面的参数配置registerDefaultConfiguration(metadata, registry);//注册所发现的各个feign客户端到容器registerregisterFeignClients(metadata, registry);}public void registerFeignClients(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {LinkedHashSet<BeanDefinition> candidateComponents = new LinkedHashSet<>();Map<String, Object> attrs = metadata.getAnnotationAttributes(EnableFeignClients.class.getName());final Class<?>[] clients = attrs == null ? null : (Class<?>[]) attrs.get("clients");//如果在@EnableFeignClients中没有标注client,则找寻@FeignClient注解的所有类进行类路径注入if (clients == null || clients.length == 0) {ClassPathScanningCandidateComponentProvider scanner = getScanner();scanner.setResourceLoader(this.resourceLoader);scanner.addIncludeFilter(new AnnotationTypeFilter(FeignClient.class));Set<String> basePackages = getBasePackages(metadata);for (String basePackage : basePackages) {candidateComponents.addAll(scanner.findCandidateComponents(basePackage));}}//如果在@EnableFeignClients中标注了所有client,则直接进行类路径注入else {for (Class<?> clazz : clients) {candidateComponents.add(new AnnotatedGenericBeanDefinition(clazz));}}//扫描每一个basePackage,获取其中的feign客户端定义for (BeanDefinition candidateComponent : candidateComponents) {if (candidateComponent instanceof AnnotatedBeanDefinition) {// verify annotated class is an interfaceAnnotatedBeanDefinition beanDefinition = (AnnotatedBeanDefinition) candidateComponent;AnnotationMetadata annotationMetadata = beanDefinition.getMetadata();Assert.isTrue(annotationMetadata.isInterface(), "@FeignClient can only be specified on an interface");Map<String, Object> attributes = annotationMetadata.getAnnotationAttributes(FeignClient.class.getCanonicalName());//将所定义的feign客户端配置属性作为一个bean注册到容器中String name = getClientName(attributes);registerClientConfiguration(registry, name, attributes.get("configuration"));//将所定义feign客户端作为一个bean注入到容器中registerFeignClient(registry, annotationMetadata, attributes);}}}//将上述的属性作为一个bean注册到容器中private void registerClientConfiguration(BeanDefinitionRegistry registry, Object name, Object configuration) {BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(FeignClientSpecification.class);builder.addConstructorArgValue(name);builder.addConstructorArgValue(configuration);registry.registerBeanDefinition(name + "." + FeignClientSpecification.class.getSimpleName(),builder.getBeanDefinition());}//将所定义feign客户端作为一个bean注入到容器中private void registerFeignClient(BeanDefinitionRegistry registry, AnnotationMetadata annotationMetadata,Map<String, Object> attributes) {String className = annotationMetadata.getClassName();Class clazz = ClassUtils.resolveClassName(className, null);ConfigurableBeanFactory beanFactory = registry instanceof ConfigurableBeanFactory? (ConfigurableBeanFactory) registry : null;String contextId = getContextId(beanFactory, attributes);String name = getName(attributes);FeignClientFactoryBean factoryBean = new FeignClientFactoryBean();factoryBean.setBeanFactory(beanFactory);factoryBean.setName(name);factoryBean.setContextId(contextId);factoryBean.setType(clazz);BeanDefinitionBuilder definition = BeanDefinitionBuilder.genericBeanDefinition(clazz, () -> {factoryBean.setUrl(getUrl(beanFactory, attributes));factoryBean.setPath(getPath(beanFactory, attributes));factoryBean.setDecode404(Boolean.parseBoolean(String.valueOf(attributes.get("decode404"))));Object fallback = attributes.get("fallback");if (fallback != null) {factoryBean.setFallback(fallback instanceof Class ? (Class<?>) fallback: ClassUtils.resolveClassName(fallback.toString(), null));}Object fallbackFactory = attributes.get("fallbackFactory");if (fallbackFactory != null) {factoryBean.setFallbackFactory(fallbackFactory instanceof Class ? (Class<?>) fallbackFactory: ClassUtils.resolveClassName(fallbackFactory.toString(), null));}return factoryBean.getObject();});definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);definition.setLazyInit(true);validate(attributes);AbstractBeanDefinition beanDefinition = definition.getBeanDefinition();beanDefinition.setAttribute(FactoryBean.OBJECT_TYPE_ATTRIBUTE, className);beanDefinition.setAttribute("feignClientsRegistrarFactoryBean", factoryBean);// has a default, won't be nullboolean primary = (Boolean) attributes.get("primary");beanDefinition.setPrimary(primary);String[] qualifiers = getQualifiers(attributes);if (ObjectUtils.isEmpty(qualifiers)) {qualifiers = new String[] { contextId + "FeignClient" };}BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className, qualifiers);BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);}}总结一下FeignClientsRegistrar的主要作用
加载@EnableClientFeign的参数配置
注册缺省feign客户端配置bean定义
我们所定义的@feignClient会以bean的形式注入到bean容器中,当使用@Autowire注册feign客户端的时候,容器会使用工厂类FeignClientFactoryBean为其生成一个实例。@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documented@Inheritedpublic @interface FeignClient {@AliasFor("name")String value() default "";String contextId() default "";@AliasFor("value")String name() default "";@DeprecatedString qualifier() default "";String[] qualifiers() default {};String url() default "";boolean decode404() default false;Class<?>[] configuration() default {};Class<?> fallback() default void.class;Class<?> fallbackFactory() default void.class;String path() default "";boolean primary() default true;}Openfeign负载均衡的原理(其实也就是loadBalancer负载均衡的原理,以为你ribbon目前已经没有更新,高版本的OpenFeign默认不支持ribbon)
//通过反射调用到下面一步public class FeignBlockingLoadBalancerClient implements Client {@Overridepublic Response execute(Request request, Request.Options options) throws IOException {//获取请求的url,例如http://activity/firstLoginActivityfinal URI originalUri = URI.create(request.url());//获取请求的服务名称,例如activityString serviceId = originalUri.getHost();//校验服务名称不能为空Assert.state(serviceId != null, "Request URI does not contain a valid hostname: " + originalUri);String hint = getHint(serviceId);//拼接所需要发送的请求DefaultRequest<RequestDataContext> lbRequest = new DefaultRequest<>(new RequestDataContext(buildRequestData(request), hint));//获取在负载均衡前后需要做的一些操作Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator.getSupportedLifecycleProcessors(loadBalancerClientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),RequestDataContext.class, ResponseData.class, ServiceInstance.class);supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));//负载均衡核心代码,根据负载均衡的规则,获取对应的服务地址实例ServiceInstance instance = loadBalancerClient.choose(serviceId, lbRequest);org.springframework.cloud.client.loadbalancer.Response<ServiceInstance> lbResponse = new DefaultResponse(instance);//如果没有获取到服务,则返回异常if (instance == null) {String message = "Load balancer does not contain an instance for the service " + serviceId;if (LOG.isWarnEnabled()) {LOG.warn(message);}supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(CompletionContext.Status.DISCARD, lbRequest, lbResponse)));return Response.builder().request(request).status(HttpStatus.SERVICE_UNAVAILABLE.value()).body(message, StandardCharsets.UTF_8).build();}//根据服务实例的地址,替换originalUri中的地址,拼接成完成的请求服务地址String reconstructedUrl = loadBalancerClient.reconstructURI(instance, originalUri).toString();Request newRequest = buildRequest(request, reconstructedUrl);//请求服务return executeWithLoadBalancerLifecycleProcessing(delegate, options, newRequest, lbRequest, lbResponse,supportedLifecycleProcessors);}}public class BlockingLoadBalancerClient implements LoadBalancerClient {@Overridepublic <T> ServiceInstance choose(String serviceId, Request<T> request) {//获取到RoundRobinLoadBalancer的负载均衡器ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerClientFactory.getInstance(serviceId);if (loadBalancer == null) {return null;}//根据获取到的负载均衡器进行选择哪一台服务器Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block();if (loadBalancerResponse == null) {return null;}return loadBalancerResponse.getServer();}}/**获取到RoundRobinLoadBalancer的负载均衡器*/public class LoadBalancerClientFactory extends NamedContextFactory<LoadBalancerClientSpecification>implements ReactiveLoadBalancer.Factory<ServiceInstance> {//特别关注ReactorServiceInstanceLoadBalancer这个类,这是允许负载均衡的一个标志注解@Overridepublic ReactiveLoadBalancer<ServiceInstance> getInstance(String serviceId) {return getInstance(serviceId, ReactorServiceInstanceLoadBalancer.class);}//接下来会根据ReactorServiceInstanceLoadBalancer去找到他的默认实现类@Configuration(proxyBeanMethods = false)@ConditionalOnDiscoveryEnabledpublic class LoadBalancerClientConfiguration {//将RoundRobinLoadBalancer作为他的默认负载均衡器@Bean@ConditionalOnMissingBeanpublic ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment,LoadBalancerClientFactory loadBalancerClientFactory) {String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);return new RoundRobinLoadBalancer(loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);}}/**根据获取到的负载均衡器进行选择哪一台服务器默认是调用RoundRobinLoadBalancer的choose方法进行选择,我们自定义过滤器的核心就是重新choose方法负载均衡的原理,就是从一开始注册RoundRobinLoadBalancer的时候就使用new Random().nextInt(1000)随机生成一个1000以下的数字,然后每次需要进行负载均衡的时候对数字进行加1处理int pos = Math.abs(this.position.incrementAndGet());最后获取这个数字对服务列表的取余位置的服务就可以了。instances.get(pos % instances.size());*/public class RoundRobinLoadBalancer implements ReactorServiceInstanceLoadBalancer {@Overridepublic Mono<Response<ServiceInstance>> choose(Request request) {//获取实例列表 NoopServiceInstanceListSupplier是ServiceInstanceListSupplier的实现类,用户获取某个服务的列表//ServiceInstance服务的实体类ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);return supplier.get(request).next().map(serviceInstances -> processInstanceResponse(supplier, serviceInstances));}}/**那么是怎样获取某个服务的服务列表的呢?这一块确实没有搞的太明白*/ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);//通过processInstanceResponse#getInstanceResponse(serviceInstances)来获取对应的服务private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {if (instances.isEmpty()) {if (log.isWarnEnabled()) {log.warn("No servers available for service: " + serviceId);}return new EmptyResponse();}// 每次需要进行负载均衡的时候对数字进行加1处理int pos = Math.abs(this.position.incrementAndGet());//获取这个数字对服务列表的取余位置的服务就可以了ServiceInstance instance = instances.get(pos % instances.size());return new DefaultResponse(instance);}feign自定义负载均衡(因为我的openFeign版本号是3.0.2,不再支持ribbon,所以使用spring-cloud-starter-loadbalancer里面的类来进行自定义)
/*** 具体的业务逻辑类* 自定编写负载策略,轮询偶数位服务*/public class ZhxsharkBalancer implements ReactorServiceInstanceLoadBalancer {private static final Log log = LogFactory.getLog(ZhxsharkBalancer.class);final AtomicInteger position;final String serviceId;//记录服务列表的大小AtomicInteger serviceSize;//记录之前的服务列表List<ServiceInstance> beforeInstances;ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;public ZhxsharkBalancer(ObjectProvider<ServiceInstanceListSupplier> lazyProvider, String name, int position) {this.serviceId = name;this.serviceInstanceListSupplierProvider = lazyProvider;this.position = new AtomicInteger(position);}@Overridepublic Mono<Response<ServiceInstance>> choose(Request request) {ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);return supplier.get(request).next().map(serviceInstances -> processInstanceResponse(supplier, serviceInstances));}private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier,List<ServiceInstance> serviceInstances) {Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(serviceInstances);if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());}return serviceInstanceResponse;}/*** 自定编写负载策略,轮询偶数位服务* @param serviceInstances* @return*/private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) {if (serviceInstances.isEmpty()) {if (log.isWarnEnabled()) {log.warn("No servers available for service: " + serviceId);}return new EmptyResponse();}//将偶数位的服务队列记录在beforeInstances中if (serviceSize == null || serviceSize.get() != serviceInstances.size()){serviceSize = new AtomicInteger(serviceInstances.size());beforeInstances = new ArrayList<>(16);for (int i=0; i<serviceInstances.size(); i=i+2){beforeInstances.add(i/2, serviceInstances.get(i));}}int pos = Math.abs(this.position.incrementAndGet());ServiceInstance serviceInstance = beforeInstances.get(pos % beforeInstances.size());return new DefaultResponse(serviceInstance);}}/**配置启动类,具体的实现原理可以通过@LoadBalancerClients这个注解探索一下,这里不做解释*/@Configuration(proxyBeanMethods = false)@ConditionalOnDiscoveryEnabled@LoadBalancerClients(defaultConfiguration = {ZxLoaderBalancerConfiguration.class})public class ZxLoaderBalancerConfiguration {@Beanpublic ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment,LoadBalancerClientFactory loadBalancerClientFactory) {String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);return new ZhxsharkBalancer(loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name, RandomUtils.nextInt(1000));}}谈一谈openFeign的具体使用
根据不同的feignClient配置不同的参数,可以最后在@EnableFeignClients(defaultConfiguration = xxx)配置,使用于所有没有配置的客户端
@ConfigurationProperties("feign.client")public class FeignClientProperties {//省略了一些部分private Map<String, FeignClientConfiguration> config = new HashMap<>();/*** Feign client configuration.*/public static class FeignClientConfiguration {//日志等级private Logger.Level loggerLevel;//连接超时时间private Integer connectTimeout;//读取超时时间private Integer readTimeout;//重试机制private Class<Retryer> retryer;//将该feign的异常都归置处理private Class<ErrorDecoder> errorDecoder;//请求拦截器private List<Class<RequestInterceptor>> requestInterceptors;//默认同步信息private Map<String, Collection<String>> defaultRequestHeaders;//默认参数信息private Map<String, Collection<String>> defaultQueryParameters;//404编码返回标识private Boolean decode404;//解码private Class<Decoder> decoder;//编码private Class<Encoder> encoder;//定义在接口上有效的注解和参数private Class<Contract> contract;private ExceptionPropagationPolicy exceptionPropagationPolicy;//自定义某些核心的功能private List<Class<Capability>> capabilities;//标准化配置private MetricsProperties metrics;private Boolean followRedirects;public static class MetricsProperties {private Boolean enabled = true;public Boolean getEnabled() {return enabled;}public void setEnabled(Boolean enabled) {this.enabled = enabled;}@Overridepublic boolean equals(Object o) {if (this == o) {return true;}if (o == null || getClass() != o.getClass()) {return false;}MetricsProperties that = (MetricsProperties) o;return Objects.equals(enabled, that.enabled);}@Overridepublic int hashCode() {return Objects.hash(enabled);}}}}//讲解一下默认重试机制public interface Retryer extends Cloneable {void continueOrPropagate(RetryableException e);Retryer clone();class Default implements Retryer {private final int maxAttempts;private final long period;private final long maxPeriod;int attempt;long sleptForMillis;public Default() {this(100, SECONDS.toMillis(1), 5);}public Default(long period, long maxPeriod, int maxAttempts) {this.period = period;this.maxPeriod = maxPeriod;this.maxAttempts = maxAttempts;this.attempt = 1;}// visible for testing;protected long currentTimeMillis() {return System.currentTimeMillis();}public void continueOrPropagate(RetryableException e) {if (attempt++ >= maxAttempts) {throw e;}long interval;if (e.retryAfter() != null) {interval = e.retryAfter().getTime() - currentTimeMillis();if (interval > maxPeriod) {interval = maxPeriod;}if (interval < 0) {return;}} else {interval = nextMaxInterval();}try {Thread.sleep(interval);} catch (InterruptedException ignored) {Thread.currentThread().interrupt();throw e;}sleptForMillis += interval;}long nextMaxInterval() {long interval = (long) (period * Math.pow(1.5, attempt - 1));return interval > maxPeriod ? maxPeriod : interval;}@Overridepublic Retryer clone() {return new Default(period, maxPeriod, maxAttempts);}}/*** Implementation that never retries request. It propagates the RetryableException.*/Retryer NEVER_RETRY = new Retryer() {@Overridepublic void continueOrPropagate(RetryableException e) {throw e;}@Overridepublic Retryer clone() {return this;}};}//如果没有注入Retryer这个bean,那么就会采用默认的不重试机制//题外话,其实springboot中很多默认的参数配置会采用这种方式实现@Bean@ConditionalOnMissingBeanpublic Retryer feignRetryer() {return Retryer.NEVER_RETRY;}//其中重试的实现是如下代码final class SynchronousMethodHandler implements MethodHandler {//...@Overridepublic Object invoke(Object[] argv) throws Throwable {RequestTemplate template = buildTemplateFromArgs.create(argv);Options options = findOptions(argv);//会使用克隆,默认创建一个default的retryerRetryer retryer = this.retryer.clone();while (true) {try {//重新请求return executeAndDecode(template, options);} catch (RetryableException e) {try {//按照重试规则,如果重试到达尽头,那么就抛出异常retryer.continueOrPropagate(e);} catch (RetryableException th) {Throwable cause = th.getCause();if (propagationPolicy == UNWRAP && cause != null) {throw cause;} else {throw th;}}if (logLevel != Logger.Level.NONE) {logger.logRetry(metadata.configKey(), logLevel);}continue;}}}}比如如下配置
/*** 自定义核心功能,可以实现解码、编码、日志。。。等之前定义的所有配置*/public class ActivityCapability implements Capability {@Overridepublic Client enrich(Client client) {return null;}@Overridepublic Retryer enrich(Retryer retryer) {return null;}@Overridepublic RequestInterceptor enrich(RequestInterceptor requestInterceptor) {return null;}@Overridepublic Logger enrich(Logger logger) {return null;}@Overridepublic Logger.Level enrich(Logger.Level level) {return null;}@Overridepublic Contract enrich(Contract contract) {return null;}@Overridepublic Request.Options enrich(Request.Options options) {return null;}@Overridepublic Encoder enrich(Encoder encoder) {return null;}@Overridepublic Decoder enrich(Decoder decoder) {return null;}@Overridepublic InvocationHandlerFactory enrich(InvocationHandlerFactory invocationHandlerFactory) {return null;}@Overridepublic QueryMapEncoder enrich(QueryMapEncoder queryMapEncoder) {return null;}}/*** 自定义接口验证*/public class ActivityContract implements Contract {@Overridepublic List<MethodMetadata> parseAndValidateMetadata(Class<?> targetType) {return null;}}/*** 自定义解码器*/public class ActivityDecode implements Decoder {@Overridepublic Object decode(Response response, Type type) throws IOException, DecodeException, FeignException {return null;}}/*** 自定义编码器*/public class ActivityEncode implements Encoder{@Overridepublic void encode(Object object, Type bodyType, RequestTemplate template) throws EncodeException {}}/*** 自定义错误分析*/public class ActivityErrorDecoder implements ErrorDecoder {@Overridepublic Exception decode(String methodKey, Response response) {return null;}}/*** 自定义拦截器,做一些比如header的操作等*/public class ActivityRequestInterceptor implements RequestInterceptor {@Overridepublic void apply(RequestTemplate template) {}}/*** 自定义重试机制*/public class ActivityRetryer implements Retryer {@Overridepublic void continueOrPropagate(RetryableException e) {}@Overridepublic Retryer clone() {return null;}}对应
application.yml中的配置feign:client:config:avtivity:connectTimeout: 5000readTimeout: 5000loggerLevel: fullerrorDecoder: com.zhxshark.user.ActivityErrorDecoderretryer: com.zhxshark.user.ActivityRetryerdefaultQueryParameters:query: queryValuedefaultRequestHeaders:header: headerValuerequestInterceptors:- com.zhxshark.user.ActivityRequestInterceptordecode404: falseencoder: com.zhxshark.user.ActivityEncoderdecoder: com.zhxshark.user.ActivityDecodercontract: com.zhxshark.user.ActivityContractcapabilities:- com.zhxshark.user.ActivityCapabilitymetrics.enabled: falseopenFeign的版本号区别(重要)可查阅官方文档 https://spring.io/projects/spring-cloud-openfeign/#learn
上述是使用的
org.springframework.cloud:spring-cloud-starter-openfeign:3.0.2这个版本号,其中不再有LoadBalancerFeignClient这个作为feignClient,
而是默认使用FeignBlockingLoadBalancerClient这个隶属于org.springframework.cloud.openfeign.loadbalancer这个路径下面的客户端实例进行负载均衡;
目前来说有四个版本
版本号 支持spring-cloud-starter-loadbalancer 支持spring-cloud-starter-netflix-ribbon 是否包含hystrix 3.0.2 CURRENT GA 支持 不支持 不包含 3.0.3-SNAPSHOT SNAPSHOT 支持 不支持 不包含 2.2.9.BUILD-SNAPSHOT SNAPSHOT 支持 支持 包含 2.2.8.RELEASE GA 支持 支持 包含 其中支持使用
spring-cloud-starter-netflix-ribbon作为负载均衡的需要在配置类中使用spring.cloud.loadbalancer.ribbon.enabled=true来开启
