OpenFeign 和 Hystrix 的源码解读(原创)
OpenFeign 和 Hystrix 的源码解读(原创)
Hystrix
目录
OpenFeign 和 Hystrix 的源码解读(原创)Hystrix什么是Hystrix雪崩效应雪崩效应的解决方案请求缓存:支持将一个请求和返回结果做缓存处理请求合并:将相同的请求合并进行批量请求服务隔离:服务间进行分割开来,资源互不影响使用线程池对每个接口进行隔离,保证接口之间互不影响 (适用于99%的场景)a使用信号量隔离,每次请求通过计数信号量来限制的,当信号量大于了最大请求数maxConcurrentRequests时,进行限制,调用fallback快速返回。信号量和线程池隔离的对比服务熔断:牺牲局部服务,保全整体服务的稳定服务降级:服务熔断之后,返回缺省值Hystrix源码解析OpenFeign什么是OpenFeignopenFeign的使用OpenFeign原理和源码解析feign自定义负载均衡(因为我的openFeign版本号是3.0.2,不再支持ribbon,所以使用spring-cloud-starter-loadbalancer里面的类来进行自定义)谈一谈openFeign的具体使用openFeign的版本号区别(重要)可查阅官方文档 https://spring.io/projects/spring-cloud-openfeign/#learn
什么是Hystrix
是由netflix的一个开源组件,目的是为了提高整个系统的稳定性。
雪崩效应
当一个服务被其他服务调用的时候,被调用的服务崩溃之后会引发其他与之直接或间接相关服务的崩溃,简称雪崩效应造成的原因:
服务提供者不可用(硬件故障,程序Bug,缓存击穿,用户大量请求等)
重试加大流量(用户重试,代码逻辑生成)
服务消费者不可用(同步等待造成的资源耗尽)
雪崩效应的解决方案
请求缓存:支持将一个请求和返回结果做缓存处理
使用redis进行数据缓存
<!-- 添加redis的依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- 添加线程池依赖 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
/**
redis的配置类
*/
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory redisConnectionFactory){
RedisTemplate<String,Object> redisTemplate = new RedisTemplate<>();
//为string类型的key设置序列化器
redisTemplate.setKeySerializer(new StringRedisSerializer());
//为string类型的value设置序列化器
redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
redisTemplate.setConnectionFactory(redisConnectionFactory);
return redisTemplate;
}
//重写cache序列化
@Bean
public RedisCacheManager redisCacheManager(RedisTemplate redisTemplate){
RedisCacheWriter redisCacheWriter = RedisCacheWriter.nonLockingRedisCacheWriter(redisTemplate.getConnectionFactory());
RedisCacheConfiguration redisCacheConfiguration = RedisCacheConfiguration.defaultCacheConfig()
//设置默认过期时间
.entryTtl(Duration.ofMinutes(30))
//设置key和value的序列化
.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisTemplate.getKeySerializer()))
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(redisTemplate.getValueSerializer()));
return new RedisCacheManager(redisCacheWriter, redisCacheConfiguration);
}
}
@Cacheable(cacheNames = "ActivityService:user:single", key = "#id")
public Future<User> selectByUserId(Long id){
return null;
}
最后在启动类上面配置
@EnableCaching
请求合并:将相同的请求合并进行批量请求
优点:是多个相同的请求可以将数据合并调用批量的接口,降低了请求的次数
缺点:先到的请求不能立马返回结果,而是要根据规则等待
@HystrixCollapser(
batchMethod = "selectByUserIds", //请求合并的方法
scope = com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL, //请求的方式
collapserProperties = {
//间隔多久的时间会合并
@HystrixProperty(name = "timeDelayInMilliseconds", value = "20"),
//批处理之前,批处理中允许的最大请求数
@HystrixProperty(name = "maxRequestsInBatch", value = "200")
}
)
public Future<User> selectByUserId(Long id){
return null;
}
服务隔离:服务间进行分割开来,资源互不影响
使用线程池对每个接口进行隔离,保证接口之间互不影响 (适用于99%的场景)a
@HystrixCommand(
groupKey = "activity-service-single", //服务的名称,相同名称的使用同一个线程池
commandKey = "selectByUserId", //接口名称,默认为方法名
threadPoolKey = "activity-service-single", //线程池名称,相同名称使用同一个线程池
commandProperties = {
//超时时间,默认1000ms
@HystrixProperty(name = HystrixPropertiesManager.EXECUTION_ISOLATION_THREAD_TIMEOUT_IN_MILLISECONDS ,value = "5000")
},
threadPoolProperties = {
//线程池大小
@HystrixProperty(name = HystrixPropertiesManager.CORE_SIZE , value = "6"),
//队列等待阈值
@HystrixProperty(name = HystrixPropertiesManager.MAX_QUEUE_SIZE , value = "100"),
//线程存活时间,默认1min
@HystrixProperty(name = HystrixPropertiesManager.KEEP_ALIVE_TIME_MINUTES , value = "2"),
//超出队列等待阈值执行拒绝策略
@HystrixProperty(name = HystrixPropertiesManager.QUEUE_SIZE_REJECTION_THRESHOLD , value = "100"),
},
fallbackMethod = "selectByUserIdFallBack"
)
public Future<User> selectByUserId(Long id){
return null;
}
使用信号量隔离,每次请求通过计数信号量来限制的,当信号量大于了最大请求数maxConcurrentRequests时,进行限制,调用fallback快速返回。
@HystrixCommand(
commandProperties = {
//超时时间,默认1000ms
@HystrixProperty(name = HystrixPropertiesManager.EXECUTION_ISOLATION_THREAD_TIMEOUT_IN_MILLISECONDS , value = "5000"),
//信号量隔离,默认值是Thread
@HystrixProperty(name = HystrixPropertiesManager.EXECUTION_ISOLATION_STRATEGY , value = "SEMAPHORE"),
//信号量最大并发,调小一些方便模拟高并发
@HystrixProperty(name = HystrixPropertiesManager.EXECUTION_ISOLATION_SEMAPHORE_MAX_CONCURRENT_REQUESTS , value = "6"),
},
fallbackMethod = "selectByUserIdFallBack"
)
public Future<User> selectByUserId(Long id){
return null;
}
信号量和线程池隔离的对比
隔离方式 是否支持超时 是否支持熔断 隔离原理 是否是异步调用 资源消耗 线程池隔离 支持 支持 每个服务单独一个线程池 支持异步和同步 大 信号量隔离 不支持 支持 通过信号量计数器 只支持同步 小 服务熔断:牺牲局部服务,保全整体服务的稳定
@HystrixCommand(
commandProperties = {
//10s内请求大于10个,就启动熔断。执行fallback的方法
@HystrixProperty(name = HystrixPropertiesManager.CIRCUIT_BREAKER_REQUEST_VOLUME_THRESHOLD , value = "10"),
//请求错误率大于50%就启动熔断器,然后for循环重试,符合熔断条件就出发fallback的方法
@HystrixProperty(name = HystrixPropertiesManager.CIRCUIT_BREAKER_ERROR_THRESHOLD_PERCENTAGE , value = "50"),
//熔断多少秒后开始重试,默认是5s
@HystrixProperty(name = HystrixPropertiesManager.CIRCUIT_BREAKER_SLEEP_WINDOW_IN_MILLISECONDS , value = "5000")
},
fallbackMethod = "selectByUserIdFallBack"
)
public Future<User> selectByUserId(Long id){
return null;
}服务降级:服务熔断之后,返回缺省值
Hystrix源码解析
@HystrixCommand
(hystrix的核心组件,初始化、执行、限流、熔断等都是在这个组件中完成的) 和@HystrixCollapser
注解,使用HystrixCommandAspect
进行解析/**
@Hystrix注解
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface HystrixCommand {
//分组的名称,默认是类名
String groupKey() default "";
//命令名称,默认是 @HystrixCommand 所在的方法名
String commandKey() default "";
//线程池名称
String threadPoolKey() default "";
//回调方法的名称,该类必须和@HystrixCommand注释在同一个方法,有相同的入参和出参
String fallbackMethod() default "";
//配置相关参数
HystrixProperty[] commandProperties() default {};
//线程池参数
HystrixProperty[] threadPoolProperties() default {};
//配置需要过滤的异常
Class<? extends Throwable>[] ignoreExceptions() default {};
//定义使用哪种模式
ObservableExecutionMode observableExecutionMode() default ObservableExecutionMode.EAGER;
HystrixException[] raiseHystrixExceptions() default {};
//默认的回退方法,如果fallbackMethod 和defaultFallback 同时指定了,fallbackMethod优先
// 默认的回退方法 不能有入参,出参需要保持一致
String defaultFallback() default "";
}
/**
@HystrixCollapser注解
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface HystrixCollapser {
//合并名称,默认是 注解的方法名称
String collapserKey() default "";
/* 合并的方法名称
合并的方法必须 要是这种特征:
java.util.List method(java.util.List)
合并的方法只能有一个 参数
*/
String batchMethod();
/**
范围, 默认是一次请求,可以改成 GLOBAL 全局的
*/
Scope scope() default Scope.REQUEST;
/**
* 合并的相关属性配置
* 以 key-value 形式配置,可以配置多个,数组
*/
HystrixProperty[] collapserProperties() default {};
}
@Aspect
public class HystrixCommandAspect {
private static final Map<HystrixPointcutType, MetaHolderFactory> META_HOLDER_FACTORY_MAP;
static {
META_HOLDER_FACTORY_MAP = ImmutableMap.<HystrixPointcutType, MetaHolderFactory>builder()
.put(HystrixPointcutType.COMMAND, new CommandMetaHolderFactory())
.put(HystrixPointcutType.COLLAPSER, new CollapserMetaHolderFactory())
.build();
}
//切点 @HystrixCommand
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
public void hystrixCommandAnnotationPointcut() {
}
//切点 @HystrixCollapser
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
public void hystrixCollapserAnnotationPointcut() {
}
@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
Method method = getMethodFromTarget(joinPoint);
Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
//不能两个注解同时使用HystrixCommand和HystrixCollapser注解
if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
"annotations at the same time");
}
//根据不同的注解,获取对应MetaHolderFactory
MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
//整和构建hystrix的必要信息到metaHolder中 (1) (2) 后续有讲解
MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
//获取hystrix的基类 (3)后续有讲解
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
//获取执行方式 executionType
ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
Object result;
try {
if (!metaHolder.isObservable()) {
//利用工具CommandExecutor来执行
result = CommandExecutor.execute(invokable, executionType, metaHolder);
} else {
result = executeObservable(invokable, executionType, metaHolder);
}
} catch (HystrixBadRequestException e) {
throw e.getCause();
} catch (HystrixRuntimeException e) {
throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
}
return result;
}
private Observable executeObservable(HystrixInvokable invokable, ExecutionType executionType, final MetaHolder metaHolder) {
return ((Observable) CommandExecutor.execute(invokable, executionType, metaHolder))
.onErrorResumeNext(new Func1<Throwable, Observable>() {
@Override
public Observable call(Throwable throwable) {
if (throwable instanceof HystrixBadRequestException) {
return Observable.error(throwable.getCause());
} else if (throwable instanceof HystrixRuntimeException) {
HystrixRuntimeException hystrixRuntimeException = (HystrixRuntimeException) throwable;
return Observable.error(hystrixRuntimeExceptionToThrowable(metaHolder, hystrixRuntimeException));
}
return Observable.error(throwable);
}
});
}
// (1) -> 暂不讲解
private static class CollapserMetaHolderFactory extends MetaHolderFactory {
@Override
public MetaHolder create(Object proxy, Method collapserMethod, Object obj, Object[] args, final ProceedingJoinPoint joinPoint) {
HystrixCollapser hystrixCollapser = collapserMethod.getAnnotation(HystrixCollapser.class);
if (collapserMethod.getParameterTypes().length > 1 || collapserMethod.getParameterTypes().length == 0) {
throw new IllegalStateException("Collapser method must have one argument: " + collapserMethod);
}
Method batchCommandMethod = getDeclaredMethod(obj.getClass(), hystrixCollapser.batchMethod(), List.class);
if (batchCommandMethod == null)
throw new IllegalStateException("batch method is absent: " + hystrixCollapser.batchMethod());
Class<?> batchReturnType = batchCommandMethod.getReturnType();
Class<?> collapserReturnType = collapserMethod.getReturnType();
boolean observable = collapserReturnType.equals(Observable.class);
if (!collapserMethod.getParameterTypes()[0]
.equals(getFirstGenericParameter(batchCommandMethod.getGenericParameterTypes()[0]))) {
throw new IllegalStateException("required batch method for collapser is absent, wrong generic type: expected "
+ obj.getClass().getCanonicalName() + "." +
hystrixCollapser.batchMethod() + "(java.util.List<" + collapserMethod.getParameterTypes()[0] + ">), but it's " +
getFirstGenericParameter(batchCommandMethod.getGenericParameterTypes()[0]));
}
final Class<?> collapserMethodReturnType = getFirstGenericParameter(
collapserMethod.getGenericReturnType(),
Future.class.isAssignableFrom(collapserReturnType) || Observable.class.isAssignableFrom(collapserReturnType) ? 1 : 0);
Class<?> batchCommandActualReturnType = getFirstGenericParameter(batchCommandMethod.getGenericReturnType());
if (!collapserMethodReturnType
.equals(batchCommandActualReturnType)) {
throw new IllegalStateException("Return type of batch method must be java.util.List parametrized with corresponding type: expected " +
"(java.util.List<" + collapserMethodReturnType + ">)" + obj.getClass().getCanonicalName() + "." +
hystrixCollapser.batchMethod() + "(java.util.List<" + collapserMethod.getParameterTypes()[0] + ">), but it's " +
batchCommandActualReturnType);
}
HystrixCommand hystrixCommand = batchCommandMethod.getAnnotation(HystrixCommand.class);
if (hystrixCommand == null) {
throw new IllegalStateException("batch method must be annotated with HystrixCommand annotation");
}
// method of batch hystrix command must be passed to metaholder because basically collapser doesn't have any actions
// that should be invoked upon intercepted method, it's required only for underlying batch command
MetaHolder.Builder builder = metaHolderBuilder(proxy, batchCommandMethod, obj, args, joinPoint);
if (isCompileWeaving()) {
builder.ajcMethod(getAjcMethodAroundAdvice(obj.getClass(), batchCommandMethod.getName(), List.class));
}
builder.hystrixCollapser(hystrixCollapser);
builder.defaultCollapserKey(collapserMethod.getName());
builder.collapserExecutionType(ExecutionType.getExecutionType(collapserReturnType));
builder.defaultCommandKey(batchCommandMethod.getName());
builder.hystrixCommand(hystrixCommand);
builder.executionType(ExecutionType.getExecutionType(batchReturnType));
builder.observable(observable);
FallbackMethod fallbackMethod = MethodProvider.getInstance().getFallbackMethod(obj.getClass(), batchCommandMethod);
if (fallbackMethod.isPresent()) {
fallbackMethod.validateReturnType(batchCommandMethod);
builder
.fallbackMethod(fallbackMethod.getMethod())
.fallbackExecutionType(ExecutionType.getExecutionType(fallbackMethod.getMethod().getReturnType()));
}
return builder.build();
}
}
// (2) ->
private static class CommandMetaHolderFactory extends MetaHolderFactory {
@Override
public MetaHolder create(Object proxy, Method method, Object obj, Object[] args, final ProceedingJoinPoint joinPoint) {
HystrixCommand hystrixCommand = method.getAnnotation(HystrixCommand.class);
//获取返回类型是异步、同步还是响应式(observable)
ExecutionType executionType = ExecutionType.getExecutionType(method.getReturnType());
//构建 MetaHolder,将Hystrix需要用到的参数放入MetaHolder对象里面
//setFallbackMethod(builder, obj.getClass(), method);同时设置falllback
MetaHolder.Builder builder = metaHolderBuilder(proxy, method, obj, args, joinPoint);
if (isCompileWeaving()) {
builder.ajcMethod(getAjcMethodFromTarget(joinPoint));
}
return builder.defaultCommandKey(method.getName())
.hystrixCommand(hystrixCommand)
.observableExecutionMode(hystrixCommand.observableExecutionMode())
.executionType(executionType)
.observable(ExecutionType.OBSERVABLE == executionType)
.build();
}
}
//省略
}
现在来讲一下方法执行的核心
//注意包路径
package com.netflix.hystrix.contrib.javanica.command;
public class CommandExecutor {
/**
被观察者(observable)
观察者(observe)
订阅(subscribe)
*/
public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
Validate.notNull(invokable);
Validate.notNull(metaHolder);
switch (executionType) {
//同步 从依赖的服务中获取到结果
case SYNCHRONOUS: {
//调用 HystixCommand#execute()方法进行
return castToExecutable(invokable, executionType).execute();
}
//异步 获取到futrue
case ASYNCHRONOUS: {
HystrixExecutable executable = castToExecutable(invokable, executionType);
if (metaHolder.hasFallbackMethodCommand()
&& ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
return new FutureDecorator(executable.queue());
}
return executable.queue();
}
//观察者模式 回调异步执行命令
//observe()
//toObservable() 只在订阅了之后执行命令
case OBSERVABLE: {
HystrixObservable observable = castToObservable(invokable);
return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
}
default:
throw new RuntimeException("unsupported execution type: " + executionType);
}
}
}
//上面是@HystrixCommand注解所进行前置操作
//下面是openFeign集成hystrix进行的熔断的前置操作
//都会调用 HystixCommand#execute()方法进行
package org.springframework.cloud.netflix.hystrix;
public class HystrixCircuitBreaker implements CircuitBreaker {
private HystrixCommand.Setter setter;
public HystrixCircuitBreaker(HystrixCommand.Setter setter) {
this.setter = setter;
}
@Override
public <T> T run(Supplier<T> toRun, Function<Throwable, T> fallback) {
HystrixCommand<T> command = new HystrixCommand<T>(setter) {
@Override
protected T run() throws Exception {
return toRun.get();
}
@Override
protected T getFallback() {
return fallback.apply(getExecutionException());
}
};
//调用 HystixCommand#execute()方法进行,注意,里面的方法是queue().get()
//注意,因为此地方的future的get被单独用信号量封装过,所以这里是阻塞等待
return command.execute();
}
}
package com.netflix.hystrix;
public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R>{
//用于命令的异步执行
//让线程在线程池排队,返回一个future以查看结果
//如果没有配置在线程池中运行,则作用和 execute 一样,会被阻塞
public Future<R> queue() {
//当 Future.cancel(boolean) 返回的标志位mayInterrupt为true的时候,由 Observable.toBlocking().toFuture()返回的future不会去终端执行。因此,为了遵守future的规则,我们必须围绕future操作
//这里执行请求的操作
//重点*************这里的future是进行封装过的,如下面的代码所示
//toObservable()方法也是重点,后面的AbstractCommand#toObservable()会讲解
final Future<R> delegate = toObservable().toBlocking().toFuture();
//新创建一个future去围绕前面返回的future进行操作
final Future<R> f = new Future<R>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
//如果执行的线程已经关闭,则返回关闭失败
if (delegate.isCancelled()) {
return false;
}
//当mayInterruptIfRunning为true的时候,是否应该中断执行线程
if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) {
//中断线程标志位置位true
interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning);
}
//由上面来决定执行线程是否中断
final boolean res = delegate.cancel(interruptOnFutureCancel.get());
//如果command的状态不为中断并且中断标志位为true的时候
//获取正在执行的其他线程,帮助其中断
if (!isExecutionComplete() && interruptOnFutureCancel.get()) {
final Thread t = executionThread.get();
if (t != null && !t.equals(Thread.currentThread())) {
t.interrupt();
}
}
//返回执行线程中断的结果
return res;
}
@Override
public boolean isCancelled() {
return delegate.isCancelled();
}
@Override
public boolean isDone() {
return delegate.isDone();
}
@Override
public R get() throws InterruptedException, ExecutionException {
return delegate.get();
}
@Override
public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return delegate.get(timeout, unit);
}
};
//下面对抛出的错误状态进行处理
//判断是否执行完毕,isDone是不阻塞的。 那么如果这里没有判断到怎么办呢?
if (f.isDone()) {
try {
//等待直到任务执行完毕、失败、取消
f.get();
return f;
} catch (Exception e) {
Throwable t = decomposeException(e);
if (t instanceof HystrixBadRequestException) {
return f;
} else if (t instanceof HystrixRuntimeException) {
HystrixRuntimeException hre = (HystrixRuntimeException) t;
switch (hre.getFailureType()) {
case COMMAND_EXCEPTION:
case TIMEOUT:
// we don't throw these types from queue() only from queue().get() as they are execution errors
return f;
default:
// these are errors we throw from queue() as they as rejection type errors
throw hre;
}
} else {
throw Exceptions.sneakyThrow(t);
}
}
}
//返回还在执行的future
return f;
}
}
//封装的future
public final class BlockingOperatorToFuture {
public static <T> Future<T> toFuture(Observable<? extends T> that) {
final CountDownLatch finished = new CountDownLatch(1); //完成的信号量
final AtomicReference<T> value = new AtomicReference<T>();
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
@SuppressWarnings("unchecked")
//订阅,及时根据返回值执行相应的任务
final Subscription s = ((Observable<T>)that).single().subscribe(new Subscriber<T>() {
@Override
public void onCompleted() {
finished.countDown();
}
@Override
public void onError(Throwable e) {
error.compareAndSet(null, e);
finished.countDown();
}
@Override
public void onNext(T v) {
// "single" guarantees there is only one "onNext"
value.set(v);
}
});
return new Future<T>() {
private volatile boolean cancelled;
@Override
//如果还存在未完成的任务,则取消,同时将信号量置位0,释放等待线程
public boolean cancel(boolean mayInterruptIfRunning) {
if (finished.getCount() > 0) {
cancelled = true;
s.unsubscribe();
finished.countDown();
return true;
} else {
// can't cancel
return false;
}
}
@Override
public boolean isCancelled() {
return cancelled;
}
@Override
public boolean isDone() {
return finished.getCount() == 0;
}
@Override
//通过信号量改装get为阻塞队列
public T get() throws InterruptedException, ExecutionException {
finished.await();
return getValue();
}
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (finished.await(timeout, unit)) {
return getValue();
} else {
throw new TimeoutException("Timed out after " + unit.toMillis(timeout) + "ms waiting for underlying Observable.");
}
}
private T getValue() throws ExecutionException {
final Throwable throwable = error.get();
if (throwable != null) {
throw new ExecutionException("Observable onError", throwable);
} else if (cancelled) {
// Contract of Future.get() requires us to throw this:
throw new CancellationException("Subscription unsubscribed");
} else {
return value.get();
}
}
};
}
}
/**
HystrixCommand 是 abstractCommand 的实现类,我们需要对 abstractCommand 这个类有一个大概的了解,便于理解断路器的执行理念
*/
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
protected final HystrixCircuitBreaker circuitBreaker; //断路器
protected final HystrixThreadPool threadPool; //线程池,用于给每一个请求在单独的线程上执行HystrixCommand#run()方法
protected final HystrixThreadPoolKey threadPoolKey; //线程池的key,用于监视、度量发布等操作
protected final HystrixCommandProperties properties; //HystrixCommand的属性配置信息
//未执行,完成,超时
protected enum TimedOutStatus {
NOT_EXECUTED, COMPLETED, TIMED_OUT
}
//
protected enum CommandState {
NOT_STARTED, OBSERVABLE_CHAIN_CREATED, USER_CODE_EXECUTED, UNSUBSCRIBED, TERMINAL
}
protected enum ThreadState {
NOT_USING_THREAD, STARTED, UNSUBSCRIBED, TERMINAL
}
protected final HystrixCommandMetrics metrics; //运行中的指标、数据统计、监控的数据
protected final HystrixCommandKey commandKey; //HystrixCommand的key
protected final HystrixCommandGroupKey commandGroup; //HystrixCommand分组key
protected final HystrixEventNotifier eventNotifier; //接收不同的通知
protected final HystrixConcurrencyStrategy concurrencyStrategy; //系统并发方面的操作
protected final HystrixCommandExecutionHook executionHook; //在HystrixCommand的生命周期执行操作
//回退的信号量
protected final TryableSemaphore fallbackSemaphoreOverride;
//每一个断路器都有一个信号量,用来控制回退的
protected static final ConcurrentHashMap<String, TryableSemaphore> fallbackSemaphorePerCircuit = new ConcurrentHashMap<String, TryableSemaphore>();
//执行的信号量
protected final TryableSemaphore executionSemaphoreOverride;
//每一个断路器都有一个信号量,用来控制执行的
protected static final ConcurrentHashMap<String, TryableSemaphore> executionSemaphorePerCircuit = new ConcurrentHashMap<String, TryableSemaphore>();
protected final AtomicReference<Reference<TimerListener>> timeoutTimer = new AtomicReference<Reference<TimerListener>>();
protected AtomicReference<CommandState> commandState = new AtomicReference<CommandState>(CommandState.NOT_STARTED);
protected AtomicReference<ThreadState> threadState = new AtomicReference<ThreadState>(ThreadState.NOT_USING_THREAD);
//就代码执行的状态
protected volatile ExecutionResult executionResult = ExecutionResult.EMPTY; //state on shared execution
//请求的返回值来自缓存,默认false
protected volatile boolean isResponseFromCache = false;
protected volatile ExecutionResult executionResultAtTimeOfCancellation;
protected volatile long commandStartTimestamp = -1L;
//命令执行超时
protected final AtomicReference<TimedOutStatus> isCommandTimedOut = new AtomicReference<TimedOutStatus>(TimedOutStatus.NOT_EXECUTED);
protected volatile Action0 endCurrentThreadExecutingCommand;
protected final HystrixRequestCache requestCache;
protected final HystrixRequestLog currentRequestLog;
private static ConcurrentHashMap<Class<?>, String> defaultNameCache = new ConcurrentHashMap<Class<?>, String>();
protected static ConcurrentHashMap<HystrixCommandKey, Boolean> commandContainsFallback = new ConcurrentHashMap<HystrixCommandKey, Boolean>();
//请求的执行
public Observable<R> toObservable() {
final AbstractCommand<R> _cmd = this;
//完成时的清理动作
final Action0 terminateCommandCleanup = new Action0() {
@Override
public void call() {
//观察者模式执行
if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) {
handleCommandEnd(false); //user code never ran
//手动执行
} else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) {
handleCommandEnd(true); //user code did run
}
}
};
//取消订阅时候的清理操作,也就是command的状态为 CANCELLED
final Action0 unsubscribeCommandCleanup = new Action0() {
@Override
public void call() {
if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) {
if (!_cmd.executionResult.containsTerminalEvent()) {
_cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey);
try {
executionHook.onUnsubscribe(_cmd);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onUnsubscribe", hookEx);
}
_cmd.executionResultAtTimeOfCancellation = _cmd.executionResult
.addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED);
}
handleCommandEnd(false); //user code never ran
} else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) {
if (!_cmd.executionResult.containsTerminalEvent()) {
_cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey);
try {
executionHook.onUnsubscribe(_cmd);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onUnsubscribe", hookEx);
}
_cmd.executionResultAtTimeOfCancellation = _cmd.executionResult
.addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED);
}
handleCommandEnd(true); //user code did run
}
}
};
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
//如果command的取消订阅,则中断时间流
if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
return Observable.never();
}
//执行方法
return applyHystrixSemantics(_cmd);
}
};
final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() {
@Override
public R call(R r) {
R afterFirstApplication = r;
try {
afterFirstApplication = executionHook.onComplete(_cmd, r);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onComplete", hookEx);
}
try {
return executionHook.onEmit(_cmd, afterFirstApplication);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onEmit", hookEx);
return afterFirstApplication;
}
}
};
final Action0 fireOnCompletedHook = new Action0() {
@Override
public void call() {
try {
executionHook.onSuccess(_cmd);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onSuccess", hookEx);
}
}
};
//使用 Observable.defer 的延迟操作,不过所有与该操作符相关的数据都是在订阅是才生效的。
//rx的Observable可以下来了解一下
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
//修改状态为创建订阅
if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
}
commandStartTimestamp = System.currentTimeMillis();
//是否启用命令记录日志记录,requestLog.enabled 默认是true
if (properties.requestLogEnabled().get()) {
//日志插入
if (currentRequestLog != null) {
currentRequestLog.addExecutedCommand(_cmd);
}
}
//requestCache.enabled是否允许并且实现类的 getCacheKey()方法返回值伟true
final boolean requestCacheEnabled = isRequestCachingEnabled();
final String cacheKey = getCacheKey();
//如果请求缓存允许,从缓存中获取结果
if (requestCacheEnabled) {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}
//执行applyHystrixSemantics这个方法
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
Observable<R> afterCache;
// 如果请求缓存允许,并且指定了缓存key,那么就将请求缓存到相应的地方
if (requestCacheEnabled && cacheKey != null) {
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
if (fromCache != null) {
//如果另外一个线程成功了,那我们就使用缓存
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else {
// we just created an ObservableCommand so we cast and return it
afterCache = toCache.toObservable();
}
} else {
afterCache = hystrixObservable;
}
//再放入观察者需要针对某些状态需要做的事情
return afterCache
.doOnTerminate(terminateCommandCleanup)
.doOnUnsubscribe(unsubscribeCommandCleanup)
.doOnCompleted(fireOnCompletedHook);
}
});
}
// 执行方法
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
executionHook.onStart(_cmd);
//判断请求是否允许执行
if (circuitBreaker.allowRequest()) {
//获取到回退的信号量,参数fallback.isolation.semaphore.maxConcurrentRequests,默认大小是10个
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
executionSemaphore.release();
}
}
};
final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
@Override
public void call(Throwable t) {
eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
}
};
//尝试获取信号量
if (executionSemaphore.tryAcquire()) {
try {
//executionResult记录命令的执行状态
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
//执行方法,使用观察者模式进行成功或者失败的处理
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
return handleSemaphoreRejectionViaFallback();
}
} else {
return handleShortCircuitViaFallback();
}
}
}
接下来讲解一下
HystrixCircuitBreaker
这个实现断路器功能的核心接口类
同事,它有两个内部的实现类HystrixCircuitBreakerImpl
和NoOpCircuitBreaker
,HystrixCircuitBreakerImpl
主要是要去查看HystrixCommand
的状态,决定要不要去执行请求。public interface HystrixCircuitBreaker {
//所有的请求都会去询问是否允许被执行
public boolean allowRequest();
//断路器是否打开
public boolean isOpen();
/**
* Invoked on successful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state.
*/
/* package */void markSuccess();
static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
private final HystrixCommandProperties properties;
private final HystrixCommandMetrics metrics;
//断路器的状态,默认是关闭
private AtomicBoolean circuitOpen = new AtomicBoolean(false);
//断路器允许被打开或者最后尝试的时间点
private AtomicLong circuitOpenedOrLastTestedTime = new AtomicLong();
//
//
//
protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
this.properties = properties;
this.metrics = metrics;
}
public void markSuccess() {
if (circuitOpen.get()) {
if (circuitOpen.compareAndSet(true, false)) {
//win the thread race to reset metrics
//Unsubscribe from the current stream to reset the health counts stream. This only affects the health counts view,
//and all other metric consumers are unaffected by the reset
metrics.resetStream();
}
}
}
@Override
public boolean allowRequest() {
//查看circuitBreaker.forceOpen是否配置了,是否强制打开断路器,默认是false
if (properties.circuitBreakerForceOpen().get()) {
//如果circuitBreaker.forceOpen是true,那么就是断路器打开,所有的请求都不允许通过
return false;
}
//circuitBreaker.forceClosed:允许忽略错误和永不打开断路器(让所有请求通过),默认是false
if (properties.circuitBreakerForceClosed().get()) {
// 及时允许所有请求通过,我们仍然想模拟正常的请求执行isOpen()
isOpen();
return true;
}
//如果断路器没有打开或者达到重试的要求,默认5s,则允许通过请求
return !isOpen() || allowSingleTest();
}
public boolean allowSingleTest() {
long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
// 1) 如果断路器打开
// 2) circuitBreaker.sleepWindowInMilliseconds,默认值5000,如果当前的时间点大于电路打开之后的重试时间,默认是5s重试一下
if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
return true;
}
}
return false;
}
@Override
public boolean isOpen() {
//判断断路器是否打开,默认false
if (circuitOpen.get()) {
return true;
}
// 检索出错误次数和错误百分比
HealthCounts health = metrics.getHealthCounts();
//circuitBreaker.requestVolumeThreshold,默认是20;检查总请求是是否小于在作出开关操作之前要求的最小请求数
if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
// 如果偶没有达到执行开关操作的最小请求数时,直接返回false
return false;
}
//circuitBreaker.errorThresholdPercentage,默认是50;检查失败的百分比是否达到打开短路器的最小百分比
if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
return false;
} else {
// 将断路器开关置位true打开
if (circuitOpen.compareAndSet(false, true)) {
//设置时打开或者最后测试的时间
circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
return true;
} else {
//即使有多线程竞争,说明另外一个线程也在打开断路器,所以并不影响
return true;
}
}
}
}
static class NoOpCircuitBreaker implements HystrixCircuitBreaker {
@Override
public boolean allowRequest() {
return true;
}
@Override
public boolean isOpen() {
return false;
}
@Override
public void markSuccess() {
}
}
}
OpenFeign
什么是OpenFeign
ribbon
是一个基于HTTP和TCP的客户端,提供了负载均衡、失败重试、ping等功能;feign
是一个声明式的Web服务端,内置有ribbon。与ribbon不同的是,fein只需要定义服务绑定接口且以声明式的方法,优雅而简单的实现了服务调用。openFein
是feign的升级版,支持@RequestMapping这种springMVC的注解openFeign的使用
/**
开启feign
*/
@SpringBootApplication
@EnableFeignClients //开启feign
public class UserApplication {
public static void main(String[] args) {
SpringApplication.run(UserApplication.class, args);
}
}
/**
实例使用
*/
@FeignClient(value = "activity")
public interface RegisterFeign {
@RequestMapping("/firstLoginActivity")
public String firstLogin(Long userId);
}
OpenFeign原理和源码解析
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Import(FeignClientsRegistrar.class)
public @interface EnableFeignClients {
String[] value() default {};
String[] basePackages() default {};
Class<?>[] basePackageClasses() default {};
Class<?>[] defaultConfiguration() default {};
Class<?>[] clients() default {};
}
//使用FeignClientsRegistrar来初始化feiginClient
class FeignClientsRegistrar implements ImportBeanDefinitionRegistrar, ResourceLoaderAware, EnvironmentAware {
@Override
public void registerBeanDefinitions(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {
//加载EnableFeignClients里面的参数配置
registerDefaultConfiguration(metadata, registry);
//注册所发现的各个feign客户端到容器register
registerFeignClients(metadata, registry);
}
public void registerFeignClients(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {
LinkedHashSet<BeanDefinition> candidateComponents = new LinkedHashSet<>();
Map<String, Object> attrs = metadata.getAnnotationAttributes(EnableFeignClients.class.getName());
final Class<?>[] clients = attrs == null ? null : (Class<?>[]) attrs.get("clients");
//如果在@EnableFeignClients中没有标注client,则找寻@FeignClient注解的所有类进行类路径注入
if (clients == null || clients.length == 0) {
ClassPathScanningCandidateComponentProvider scanner = getScanner();
scanner.setResourceLoader(this.resourceLoader);
scanner.addIncludeFilter(new AnnotationTypeFilter(FeignClient.class));
Set<String> basePackages = getBasePackages(metadata);
for (String basePackage : basePackages) {
candidateComponents.addAll(scanner.findCandidateComponents(basePackage));
}
}
//如果在@EnableFeignClients中标注了所有client,则直接进行类路径注入
else {
for (Class<?> clazz : clients) {
candidateComponents.add(new AnnotatedGenericBeanDefinition(clazz));
}
}
//扫描每一个basePackage,获取其中的feign客户端定义
for (BeanDefinition candidateComponent : candidateComponents) {
if (candidateComponent instanceof AnnotatedBeanDefinition) {
// verify annotated class is an interface
AnnotatedBeanDefinition beanDefinition = (AnnotatedBeanDefinition) candidateComponent;
AnnotationMetadata annotationMetadata = beanDefinition.getMetadata();
Assert.isTrue(annotationMetadata.isInterface(), "@FeignClient can only be specified on an interface");
Map<String, Object> attributes = annotationMetadata
.getAnnotationAttributes(FeignClient.class.getCanonicalName());
//将所定义的feign客户端配置属性作为一个bean注册到容器中
String name = getClientName(attributes);
registerClientConfiguration(registry, name, attributes.get("configuration"));
//将所定义feign客户端作为一个bean注入到容器中
registerFeignClient(registry, annotationMetadata, attributes);
}
}
}
//将上述的属性作为一个bean注册到容器中
private void registerClientConfiguration(BeanDefinitionRegistry registry, Object name, Object configuration) {
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(FeignClientSpecification.class);
builder.addConstructorArgValue(name);
builder.addConstructorArgValue(configuration);
registry.registerBeanDefinition(name + "." + FeignClientSpecification.class.getSimpleName(),
builder.getBeanDefinition());
}
//将所定义feign客户端作为一个bean注入到容器中
private void registerFeignClient(BeanDefinitionRegistry registry, AnnotationMetadata annotationMetadata,
Map<String, Object> attributes) {
String className = annotationMetadata.getClassName();
Class clazz = ClassUtils.resolveClassName(className, null);
ConfigurableBeanFactory beanFactory = registry instanceof ConfigurableBeanFactory
? (ConfigurableBeanFactory) registry : null;
String contextId = getContextId(beanFactory, attributes);
String name = getName(attributes);
FeignClientFactoryBean factoryBean = new FeignClientFactoryBean();
factoryBean.setBeanFactory(beanFactory);
factoryBean.setName(name);
factoryBean.setContextId(contextId);
factoryBean.setType(clazz);
BeanDefinitionBuilder definition = BeanDefinitionBuilder.genericBeanDefinition(clazz, () -> {
factoryBean.setUrl(getUrl(beanFactory, attributes));
factoryBean.setPath(getPath(beanFactory, attributes));
factoryBean.setDecode404(Boolean.parseBoolean(String.valueOf(attributes.get("decode404"))));
Object fallback = attributes.get("fallback");
if (fallback != null) {
factoryBean.setFallback(fallback instanceof Class ? (Class<?>) fallback
: ClassUtils.resolveClassName(fallback.toString(), null));
}
Object fallbackFactory = attributes.get("fallbackFactory");
if (fallbackFactory != null) {
factoryBean.setFallbackFactory(fallbackFactory instanceof Class ? (Class<?>) fallbackFactory
: ClassUtils.resolveClassName(fallbackFactory.toString(), null));
}
return factoryBean.getObject();
});
definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);
definition.setLazyInit(true);
validate(attributes);
AbstractBeanDefinition beanDefinition = definition.getBeanDefinition();
beanDefinition.setAttribute(FactoryBean.OBJECT_TYPE_ATTRIBUTE, className);
beanDefinition.setAttribute("feignClientsRegistrarFactoryBean", factoryBean);
// has a default, won't be null
boolean primary = (Boolean) attributes.get("primary");
beanDefinition.setPrimary(primary);
String[] qualifiers = getQualifiers(attributes);
if (ObjectUtils.isEmpty(qualifiers)) {
qualifiers = new String[] { contextId + "FeignClient" };
}
BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className, qualifiers);
BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);
}
}
总结一下FeignClientsRegistrar的主要作用
加载@EnableClientFeign的参数配置
注册缺省feign客户端配置bean定义
我们所定义的@feignClient会以bean的形式注入到bean容器中,当使用@Autowire注册feign客户端的时候,容器会使用工厂类FeignClientFactoryBean为其生成一个实例。@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface FeignClient {
@AliasFor("name")
String value() default "";
String contextId() default "";
@AliasFor("value")
String name() default "";
@Deprecated
String qualifier() default "";
String[] qualifiers() default {};
String url() default "";
boolean decode404() default false;
Class<?>[] configuration() default {};
Class<?> fallback() default void.class;
Class<?> fallbackFactory() default void.class;
String path() default "";
boolean primary() default true;
}
Openfeign负载均衡的原理(其实也就是loadBalancer负载均衡的原理,以为你ribbon目前已经没有更新,高版本的OpenFeign默认不支持ribbon)
//通过反射调用到下面一步
public class FeignBlockingLoadBalancerClient implements Client {
@Override
public Response execute(Request request, Request.Options options) throws IOException {
//获取请求的url,例如http://activity/firstLoginActivity
final URI originalUri = URI.create(request.url());
//获取请求的服务名称,例如activity
String serviceId = originalUri.getHost();
//校验服务名称不能为空
Assert.state(serviceId != null, "Request URI does not contain a valid hostname: " + originalUri);
String hint = getHint(serviceId);
//拼接所需要发送的请求
DefaultRequest<RequestDataContext> lbRequest = new DefaultRequest<>(
new RequestDataContext(buildRequestData(request), hint));
//获取在负载均衡前后需要做的一些操作
Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator
.getSupportedLifecycleProcessors(
loadBalancerClientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),
RequestDataContext.class, ResponseData.class, ServiceInstance.class);
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));
//负载均衡核心代码,根据负载均衡的规则,获取对应的服务地址实例
ServiceInstance instance = loadBalancerClient.choose(serviceId, lbRequest);
org.springframework.cloud.client.loadbalancer.Response<ServiceInstance> lbResponse = new DefaultResponse(
instance);
//如果没有获取到服务,则返回异常
if (instance == null) {
String message = "Load balancer does not contain an instance for the service " + serviceId;
if (LOG.isWarnEnabled()) {
LOG.warn(message);
}
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
.onComplete(new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(
CompletionContext.Status.DISCARD, lbRequest, lbResponse)));
return Response.builder().request(request).status(HttpStatus.SERVICE_UNAVAILABLE.value())
.body(message, StandardCharsets.UTF_8).build();
}
//根据服务实例的地址,替换originalUri中的地址,拼接成完成的请求服务地址
String reconstructedUrl = loadBalancerClient.reconstructURI(instance, originalUri).toString();
Request newRequest = buildRequest(request, reconstructedUrl);
//请求服务
return executeWithLoadBalancerLifecycleProcessing(delegate, options, newRequest, lbRequest, lbResponse,
supportedLifecycleProcessors);
}
}
public class BlockingLoadBalancerClient implements LoadBalancerClient {
@Override
public <T> ServiceInstance choose(String serviceId, Request<T> request) {
//获取到RoundRobinLoadBalancer的负载均衡器
ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerClientFactory.getInstance(serviceId);
if (loadBalancer == null) {
return null;
}
//根据获取到的负载均衡器进行选择哪一台服务器
Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block();
if (loadBalancerResponse == null) {
return null;
}
return loadBalancerResponse.getServer();
}
}
/**
获取到RoundRobinLoadBalancer的负载均衡器
*/
public class LoadBalancerClientFactory extends NamedContextFactory<LoadBalancerClientSpecification>
implements ReactiveLoadBalancer.Factory<ServiceInstance> {
//特别关注ReactorServiceInstanceLoadBalancer这个类,这是允许负载均衡的一个标志注解
@Override
public ReactiveLoadBalancer<ServiceInstance> getInstance(String serviceId) {
return getInstance(serviceId, ReactorServiceInstanceLoadBalancer.class);
}
//接下来会根据ReactorServiceInstanceLoadBalancer去找到他的默认实现类
@Configuration(proxyBeanMethods = false)
@ConditionalOnDiscoveryEnabled
public class LoadBalancerClientConfiguration {
//将RoundRobinLoadBalancer作为他的默认负载均衡器
@Bean
@ConditionalOnMissingBean
public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment,
LoadBalancerClientFactory loadBalancerClientFactory) {
String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
return new RoundRobinLoadBalancer(
loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
}
}
/**
根据获取到的负载均衡器进行选择哪一台服务器
默认是调用RoundRobinLoadBalancer的choose方法进行选择,我们自定义过滤器的核心就是重新choose方法
负载均衡的原理,就是从一开始注册RoundRobinLoadBalancer的时候就使用new Random().nextInt(1000)随机生成一个1000以下的数字,
然后每次需要进行负载均衡的时候对数字进行加1处理int pos = Math.abs(this.position.incrementAndGet());
最后获取这个数字对服务列表的取余位置的服务就可以了。instances.get(pos % instances.size());
*/
public class RoundRobinLoadBalancer implements ReactorServiceInstanceLoadBalancer {
@Override
public Mono<Response<ServiceInstance>> choose(Request request) {
//获取实例列表 NoopServiceInstanceListSupplier是ServiceInstanceListSupplier的实现类,用户获取某个服务的列表
//ServiceInstance服务的实体类
ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
.getIfAvailable(NoopServiceInstanceListSupplier::new);
return supplier.get(request).next()
.map(serviceInstances -> processInstanceResponse(supplier, serviceInstances));
}
}
/**
那么是怎样获取某个服务的服务列表的呢?这一块确实没有搞的太明白
*/
ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
.getIfAvailable(NoopServiceInstanceListSupplier::new);
//通过processInstanceResponse#getInstanceResponse(serviceInstances)来获取对应的服务
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {
if (instances.isEmpty()) {
if (log.isWarnEnabled()) {
log.warn("No servers available for service: " + serviceId);
}
return new EmptyResponse();
}
// 每次需要进行负载均衡的时候对数字进行加1处理
int pos = Math.abs(this.position.incrementAndGet());
//获取这个数字对服务列表的取余位置的服务就可以了
ServiceInstance instance = instances.get(pos % instances.size());
return new DefaultResponse(instance);
}
feign自定义负载均衡(因为我的openFeign版本号是3.0.2,不再支持ribbon,所以使用spring-cloud-starter-loadbalancer里面的类来进行自定义)
/**
* 具体的业务逻辑类
* 自定编写负载策略,轮询偶数位服务
*/
public class ZhxsharkBalancer implements ReactorServiceInstanceLoadBalancer {
private static final Log log = LogFactory.getLog(ZhxsharkBalancer.class);
final AtomicInteger position;
final String serviceId;
//记录服务列表的大小
AtomicInteger serviceSize;
//记录之前的服务列表
List<ServiceInstance> beforeInstances;
ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
public ZhxsharkBalancer(ObjectProvider<ServiceInstanceListSupplier> lazyProvider, String name, int position) {
this.serviceId = name;
this.serviceInstanceListSupplierProvider = lazyProvider;
this.position = new AtomicInteger(position);
}
@Override
public Mono<Response<ServiceInstance>> choose(Request request) {
ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);
return supplier.get(request).next()
.map(serviceInstances -> processInstanceResponse(supplier, serviceInstances));
}
private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier,
List<ServiceInstance> serviceInstances) {
Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(serviceInstances);
if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
}
return serviceInstanceResponse;
}
/**
* 自定编写负载策略,轮询偶数位服务
* @param serviceInstances
* @return
*/
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) {
if (serviceInstances.isEmpty()) {
if (log.isWarnEnabled()) {
log.warn("No servers available for service: " + serviceId);
}
return new EmptyResponse();
}
//将偶数位的服务队列记录在beforeInstances中
if (serviceSize == null || serviceSize.get() != serviceInstances.size()){
serviceSize = new AtomicInteger(serviceInstances.size());
beforeInstances = new ArrayList<>(16);
for (int i=0; i<serviceInstances.size(); i=i+2){
beforeInstances.add(i/2, serviceInstances.get(i));
}
}
int pos = Math.abs(this.position.incrementAndGet());
ServiceInstance serviceInstance = beforeInstances.get(pos % beforeInstances.size());
return new DefaultResponse(serviceInstance);
}
}
/**
配置启动类,具体的实现原理可以通过@LoadBalancerClients这个注解探索一下,这里不做解释
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnDiscoveryEnabled
@LoadBalancerClients(defaultConfiguration = {ZxLoaderBalancerConfiguration.class})
public class ZxLoaderBalancerConfiguration {
@Bean
public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment,
LoadBalancerClientFactory loadBalancerClientFactory) {
String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
return new ZhxsharkBalancer(
loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name, RandomUtils.nextInt(1000));
}
}
谈一谈openFeign的具体使用
根据不同的feignClient配置不同的参数,可以最后在@EnableFeignClients(defaultConfiguration = xxx)配置,使用于所有没有配置的客户端
@ConfigurationProperties("feign.client")
public class FeignClientProperties {
//省略了一些部分
private Map<String, FeignClientConfiguration> config = new HashMap<>();
/**
* Feign client configuration.
*/
public static class FeignClientConfiguration {
//日志等级
private Logger.Level loggerLevel;
//连接超时时间
private Integer connectTimeout;
//读取超时时间
private Integer readTimeout;
//重试机制
private Class<Retryer> retryer;
//将该feign的异常都归置处理
private Class<ErrorDecoder> errorDecoder;
//请求拦截器
private List<Class<RequestInterceptor>> requestInterceptors;
//默认同步信息
private Map<String, Collection<String>> defaultRequestHeaders;
//默认参数信息
private Map<String, Collection<String>> defaultQueryParameters;
//404编码返回标识
private Boolean decode404;
//解码
private Class<Decoder> decoder;
//编码
private Class<Encoder> encoder;
//定义在接口上有效的注解和参数
private Class<Contract> contract;
private ExceptionPropagationPolicy exceptionPropagationPolicy;
//自定义某些核心的功能
private List<Class<Capability>> capabilities;
//标准化配置
private MetricsProperties metrics;
private Boolean followRedirects;
public static class MetricsProperties {
private Boolean enabled = true;
public Boolean getEnabled() {
return enabled;
}
public void setEnabled(Boolean enabled) {
this.enabled = enabled;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
MetricsProperties that = (MetricsProperties) o;
return Objects.equals(enabled, that.enabled);
}
@Override
public int hashCode() {
return Objects.hash(enabled);
}
}
}
}
//讲解一下默认重试机制
public interface Retryer extends Cloneable {
void continueOrPropagate(RetryableException e);
Retryer clone();
class Default implements Retryer {
private final int maxAttempts;
private final long period;
private final long maxPeriod;
int attempt;
long sleptForMillis;
public Default() {
this(100, SECONDS.toMillis(1), 5);
}
public Default(long period, long maxPeriod, int maxAttempts) {
this.period = period;
this.maxPeriod = maxPeriod;
this.maxAttempts = maxAttempts;
this.attempt = 1;
}
// visible for testing;
protected long currentTimeMillis() {
return System.currentTimeMillis();
}
public void continueOrPropagate(RetryableException e) {
if (attempt++ >= maxAttempts) {
throw e;
}
long interval;
if (e.retryAfter() != null) {
interval = e.retryAfter().getTime() - currentTimeMillis();
if (interval > maxPeriod) {
interval = maxPeriod;
}
if (interval < 0) {
return;
}
} else {
interval = nextMaxInterval();
}
try {
Thread.sleep(interval);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
throw e;
}
sleptForMillis += interval;
}
long nextMaxInterval() {
long interval = (long) (period * Math.pow(1.5, attempt - 1));
return interval > maxPeriod ? maxPeriod : interval;
}
@Override
public Retryer clone() {
return new Default(period, maxPeriod, maxAttempts);
}
}
/**
* Implementation that never retries request. It propagates the RetryableException.
*/
Retryer NEVER_RETRY = new Retryer() {
@Override
public void continueOrPropagate(RetryableException e) {
throw e;
}
@Override
public Retryer clone() {
return this;
}
};
}
//如果没有注入Retryer这个bean,那么就会采用默认的不重试机制
//题外话,其实springboot中很多默认的参数配置会采用这种方式实现
@Bean
@ConditionalOnMissingBean
public Retryer feignRetryer() {
return Retryer.NEVER_RETRY;
}
//其中重试的实现是如下代码
final class SynchronousMethodHandler implements MethodHandler {
//...
@Override
public Object invoke(Object[] argv) throws Throwable {
RequestTemplate template = buildTemplateFromArgs.create(argv);
Options options = findOptions(argv);
//会使用克隆,默认创建一个default的retryer
Retryer retryer = this.retryer.clone();
while (true) {
try {
//重新请求
return executeAndDecode(template, options);
} catch (RetryableException e) {
try {
//按照重试规则,如果重试到达尽头,那么就抛出异常
retryer.continueOrPropagate(e);
} catch (RetryableException th) {
Throwable cause = th.getCause();
if (propagationPolicy == UNWRAP && cause != null) {
throw cause;
} else {
throw th;
}
}
if (logLevel != Logger.Level.NONE) {
logger.logRetry(metadata.configKey(), logLevel);
}
continue;
}
}
}
}
比如如下配置
/**
* 自定义核心功能,可以实现解码、编码、日志。。。等之前定义的所有配置
*/
public class ActivityCapability implements Capability {
@Override
public Client enrich(Client client) {
return null;
}
@Override
public Retryer enrich(Retryer retryer) {
return null;
}
@Override
public RequestInterceptor enrich(RequestInterceptor requestInterceptor) {
return null;
}
@Override
public Logger enrich(Logger logger) {
return null;
}
@Override
public Logger.Level enrich(Logger.Level level) {
return null;
}
@Override
public Contract enrich(Contract contract) {
return null;
}
@Override
public Request.Options enrich(Request.Options options) {
return null;
}
@Override
public Encoder enrich(Encoder encoder) {
return null;
}
@Override
public Decoder enrich(Decoder decoder) {
return null;
}
@Override
public InvocationHandlerFactory enrich(InvocationHandlerFactory invocationHandlerFactory) {
return null;
}
@Override
public QueryMapEncoder enrich(QueryMapEncoder queryMapEncoder) {
return null;
}
}
/**
* 自定义接口验证
*/
public class ActivityContract implements Contract {
@Override
public List<MethodMetadata> parseAndValidateMetadata(Class<?> targetType) {
return null;
}
}
/**
* 自定义解码器
*/
public class ActivityDecode implements Decoder {
@Override
public Object decode(Response response, Type type) throws IOException, DecodeException, FeignException {
return null;
}
}
/**
* 自定义编码器
*/
public class ActivityEncode implements Encoder{
@Override
public void encode(Object object, Type bodyType, RequestTemplate template) throws EncodeException {
}
}
/**
* 自定义错误分析
*/
public class ActivityErrorDecoder implements ErrorDecoder {
@Override
public Exception decode(String methodKey, Response response) {
return null;
}
}
/**
* 自定义拦截器,做一些比如header的操作等
*/
public class ActivityRequestInterceptor implements RequestInterceptor {
@Override
public void apply(RequestTemplate template) {
}
}
/**
* 自定义重试机制
*/
public class ActivityRetryer implements Retryer {
@Override
public void continueOrPropagate(RetryableException e) {
}
@Override
public Retryer clone() {
return null;
}
}
对应
application.yml中的配置
feign:
client:
config:
avtivity:
connectTimeout: 5000
readTimeout: 5000
loggerLevel: full
errorDecoder: com.zhxshark.user.ActivityErrorDecoder
retryer: com.zhxshark.user.ActivityRetryer
defaultQueryParameters:
query: queryValue
defaultRequestHeaders:
header: headerValue
requestInterceptors:
- com.zhxshark.user.ActivityRequestInterceptor
decode404: false
encoder: com.zhxshark.user.ActivityEncoder
decoder: com.zhxshark.user.ActivityDecoder
contract: com.zhxshark.user.ActivityContract
capabilities:
- com.zhxshark.user.ActivityCapability
metrics.enabled: false
openFeign的版本号区别(重要)可查阅官方文档 https://spring.io/projects/spring-cloud-openfeign/#learn
上述是使用的
org.springframework.cloud:spring-cloud-starter-openfeign:3.0.2
这个版本号,其中不再有LoadBalancerFeignClient
这个作为feignClient,
而是默认使用FeignBlockingLoadBalancerClient
这个隶属于org.springframework.cloud.openfeign.loadbalancer
这个路径下面的客户端实例进行负载均衡;
目前来说有四个版本
版本号 支持spring-cloud-starter-loadbalancer 支持spring-cloud-starter-netflix-ribbon 是否包含hystrix 3.0.2 CURRENT GA 支持 不支持 不包含 3.0.3-SNAPSHOT SNAPSHOT 支持 不支持 不包含 2.2.9.BUILD-SNAPSHOT SNAPSHOT 支持 支持 包含 2.2.8.RELEASE GA 支持 支持 包含 其中支持使用
spring-cloud-starter-netflix-ribbon
作为负载均衡的需要在配置类中使用spring.cloud.loadbalancer.ribbon.enabled=true
来开启