vlambda博客
学习文章列表

Hystrix和Feign原理和源码

Hystrix和Feign原理和源码


  • Hystrix

    • 雪崩效应的解决方案

    • 使用线程池对每个接口进行隔离,保证接口之间互不影响 (适用于99%的场景)a

    • 使用信号量隔离,每次请求通过计数信号量来限制的,当信号量大于了最大请求数maxConcurrentRequests时,进行限制,调用fallback快速返回。

    • 信号量和线程池隔离的对比

    • 请求缓存:支持将一个请求和返回结果做缓存处理

    • 请求合并:将相同的请求合并进行批量请求

    • 服务隔离:服务间进行分割开来,资源互不影响

    • 服务熔断:牺牲局部服务,保全整体服务的稳定

    • 服务降级:服务熔断之后,返回缺省值

    • 什么是Hystrix

    • 雪崩效应

    • Hystrix源码解析

  • OpenFeign

    • 什么是OpenFeign

    • 使用

    • feign原理


什么是Hystrix

是由netflix的一个开源组件,目的是为了提高整个系统的稳定性。


雪崩效应

当一个服务被其他服务调用的时候,被调用的服务崩溃之后会引发其他与之直接或间接相关服务的崩溃,简称雪崩效应
造成的原因:

  • 服务提供者不可用(硬件故障,程序Bug,缓存击穿,用户大量请求等)

  • 重试加大流量(用户重试,代码逻辑生成)

  • 服务消费者不可用(同步等待造成的资源耗尽)


雪崩效应的解决方案


请求缓存:支持将一个请求和返回结果做缓存处理

使用redis进行数据缓存


<!-- 添加redis的依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- 添加线程池依赖 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>


/**
redis的配置类
*/

@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory redisConnectionFactory){
RedisTemplate<String,Object> redisTemplate = new RedisTemplate<>();
//为string类型的key设置序列化器
redisTemplate.setKeySerializer(new StringRedisSerializer());
//为string类型的value设置序列化器
redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
redisTemplate.setConnectionFactory(redisConnectionFactory);
return redisTemplate;
}

//重写cache序列化
@Bean
public RedisCacheManager redisCacheManager(RedisTemplate redisTemplate){
RedisCacheWriter redisCacheWriter = RedisCacheWriter.nonLockingRedisCacheWriter(redisTemplate.getConnectionFactory());
RedisCacheConfiguration redisCacheConfiguration = RedisCacheConfiguration.defaultCacheConfig()
//设置默认过期时间
.entryTtl(Duration.ofMinutes(30))
//设置key和value的序列化
.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisTemplate.getKeySerializer()))
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(redisTemplate.getValueSerializer()));
return new RedisCacheManager(redisCacheWriter, redisCacheConfiguration);

}
}


@Cacheable(cacheNames = "ActivityService:user:single", key = "#id")
public Future<User> selectByUserId(Long id){
return null;
}

最后在启动类上面配置 @EnableCaching


请求合并:将相同的请求合并进行批量请求

优点:是多个相同的请求可以将数据合并调用批量的接口,降低了请求的次数
缺点:先到的请求不能立马返回结果,而是要根据规则等待


@HystrixCollapser(
batchMethod = "selectByUserIds", //请求合并的方法
scope = com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL, //请求的方式
collapserProperties = {
//间隔多久的时间会合并
@HystrixProperty(name = "timeDelayInMilliseconds", value = "20"),
//批处理之前,批处理中允许的最大请求数
@HystrixProperty(name = "maxRequestsInBatch", value = "200")
}
)
public Future<User> selectByUserId(Long id){
return null;
}


服务隔离:服务间进行分割开来,资源互不影响


使用线程池对每个接口进行隔离,保证接口之间互不影响 (适用于99%的场景)a


@HystrixCommand(
groupKey = "activity-service-single", //服务的名称,相同名称的使用同一个线程池
commandKey = "selectByUserId", //接口名称,默认为方法名
threadPoolKey = "activity-service-single", //线程池名称,相同名称使用同一个线程池
commandProperties = {
//超时时间,默认1000ms
@HystrixProperty(name = HystrixPropertiesManager.EXECUTION_ISOLATION_THREAD_TIMEOUT_IN_MILLISECONDS ,value = "5000")
},
threadPoolProperties = {
//线程池大小
@HystrixProperty(name = HystrixPropertiesManager.CORE_SIZE , value = "6"),
//队列等待阈值
@HystrixProperty(name = HystrixPropertiesManager.MAX_QUEUE_SIZE , value = "100"),
//线程存活时间,默认1min
@HystrixProperty(name = HystrixPropertiesManager.KEEP_ALIVE_TIME_MINUTES , value = "2"),
//超出队列等待阈值执行拒绝策略
@HystrixProperty(name = HystrixPropertiesManager.QUEUE_SIZE_REJECTION_THRESHOLD , value = "100"),
},
fallbackMethod = "selectByUserIdFallBack"
)
public Future<User> selectByUserId(Long id){
return null;
}


使用信号量隔离,每次请求通过计数信号量来限制的,当信号量大于了最大请求数maxConcurrentRequests时,进行限制,调用fallback快速返回。


@HystrixCommand(
commandProperties = {
//超时时间,默认1000ms
@HystrixProperty(name = HystrixPropertiesManager.EXECUTION_ISOLATION_THREAD_TIMEOUT_IN_MILLISECONDS , value = "5000"),
//信号量隔离,默认值是Thread
@HystrixProperty(name = HystrixPropertiesManager.EXECUTION_ISOLATION_STRATEGY , value = "SEMAPHORE"),
//信号量最大并发,调小一些方便模拟高并发
@HystrixProperty(name = HystrixPropertiesManager.EXECUTION_ISOLATION_SEMAPHORE_MAX_CONCURRENT_REQUESTS , value = "6"),
},
fallbackMethod = "selectByUserIdFallBack"
)
public Future<User> selectByUserId(Long id){
return null;
}


信号量和线程池隔离的对比
隔离方式 是否支持超时 是否支持熔断 隔离原理 是否是异步调用 资源消耗
线程池隔离 支持 支持 每个服务单独一个线程池 支持异步和同步
信号量隔离 不支持 支持 通过信号量计数器 只支持同步


服务熔断:牺牲局部服务,保全整体服务的稳定


@HystrixCommand(
commandProperties = {
//10s内请求大于10个,就启动熔断。执行fallback的方法
@HystrixProperty(name = HystrixPropertiesManager.CIRCUIT_BREAKER_REQUEST_VOLUME_THRESHOLD , value = "10"),
//请求错误率大于50%就启动熔断器,然后for循环重试,符合熔断条件就出发fallback的方法
@HystrixProperty(name = HystrixPropertiesManager.CIRCUIT_BREAKER_ERROR_THRESHOLD_PERCENTAGE , value = "50"),
//熔断多少秒后开始重试,默认是5s
@HystrixProperty(name = HystrixPropertiesManager.CIRCUIT_BREAKER_SLEEP_WINDOW_IN_MILLISECONDS , value = "5000")
},
fallbackMethod = "selectByUserIdFallBack"
)
public Future<User> selectByUserId(Long id){
return null;
}


服务降级:服务熔断之后,返回缺省值


Hystrix源码解析

@HystrixCommand(hystrix的核心组件,初始化、执行、限流、熔断等都是在这个组件中完成的) 和 @HystrixCollapser 注解,使用 HystrixCommandAspect 进行解析


/**
@Hystrix注解
*/

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface HystrixCommand {

//分组的名称,默认是类名
String groupKey() default "";

//命令名称,默认是 @HystrixCommand 所在的方法名
String commandKey() default "";

//线程池名称
String threadPoolKey() default "";

//回调方法的名称,该类必须和@HystrixCommand注释在同一个方法,有相同的入参和出参
String fallbackMethod() default "";

//配置相关参数
HystrixProperty[] commandProperties() default {};

//线程池参数
HystrixProperty[] threadPoolProperties() default {};

//配置需要过滤的异常
Class<? extends Throwable>[] ignoreExceptions() default {};

//定义使用哪种模式
ObservableExecutionMode observableExecutionMode() default ObservableExecutionMode.EAGER;

HystrixException[] raiseHystrixExceptions() default {};

//默认的回退方法,如果fallbackMethod 和defaultFallback 同时指定了,fallbackMethod优先
// 默认的回退方法 不能有入参,出参需要保持一致
String defaultFallback() default "";
}

/**
@HystrixCollapser注解
*/

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface HystrixCollapser {

//合并名称,默认是 注解的方法名称
String collapserKey() default "";

/* 合并的方法名称
合并的方法必须 要是这种特征:
java.util.List method(java.util.List)
合并的方法只能有一个 参数
*/

String batchMethod();

/**
范围, 默认是一次请求,可以改成 GLOBAL 全局的
*/

Scope scope() default Scope.REQUEST;

/**
* 合并的相关属性配置
* 以 key-value 形式配置,可以配置多个,数组
*/

HystrixProperty[] collapserProperties() default {};
}


@Aspect
public class HystrixCommandAspect {

private static final Map<HystrixPointcutType, MetaHolderFactory> META_HOLDER_FACTORY_MAP;

static {
META_HOLDER_FACTORY_MAP = ImmutableMap.<HystrixPointcutType, MetaHolderFactory>builder()
.put(HystrixPointcutType.COMMAND, new CommandMetaHolderFactory())
.put(HystrixPointcutType.COLLAPSER, new CollapserMetaHolderFactory())
.build();
}

//切点 @HystrixCommand
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
public void hystrixCommandAnnotationPointcut() {
}
//切点 @HystrixCollapser
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
public void hystrixCollapserAnnotationPointcut() {
}

@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
Method method = getMethodFromTarget(joinPoint);
Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
//不能两个注解同时使用HystrixCommand和HystrixCollapser注解
if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
"annotations at the same time");
}
//根据不同的注解,获取对应MetaHolderFactory
MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
//整和构建hystrix的必要信息到metaHolder中 (1) (2) 后续有讲解
MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
//获取hystrix的基类 (3)后续有讲解
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
//获取执行方式 executionType
ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();

Object result;
try {
if (!metaHolder.isObservable()) {
//利用工具CommandExecutor来执行
result = CommandExecutor.execute(invokable, executionType, metaHolder);
} else {
result = executeObservable(invokable, executionType, metaHolder);
}
} catch (HystrixBadRequestException e) {
throw e.getCause();
} catch (HystrixRuntimeException e) {
throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
}
return result;
}

private Observable executeObservable(HystrixInvokable invokable, ExecutionType executionType, final MetaHolder metaHolder) {
return ((Observable) CommandExecutor.execute(invokable, executionType, metaHolder))
.onErrorResumeNext(new Func1<Throwable, Observable>() {
@Override
public Observable call(Throwable throwable) {
if (throwable instanceof HystrixBadRequestException) {
return Observable.error(throwable.getCause());
} else if (throwable instanceof HystrixRuntimeException) {
HystrixRuntimeException hystrixRuntimeException = (HystrixRuntimeException) throwable;
return Observable.error(hystrixRuntimeExceptionToThrowable(metaHolder, hystrixRuntimeException));
}
return Observable.error(throwable);
}
});
}

private Throwable hystrixRuntimeExceptionToThrowable(MetaHolder metaHolder, HystrixRuntimeException e) {
if (metaHolder.raiseHystrixExceptionsContains(HystrixException.RUNTIME_EXCEPTION)) {
return e;
}
return getCause(e);
}

private Throwable getCause(HystrixRuntimeException e) {
if (e.getFailureType() != HystrixRuntimeException.FailureType.COMMAND_EXCEPTION) {
return e;
}

Throwable cause = e.getCause();

// latest exception in flow should be propagated to end user
if (e.getFallbackException() instanceof FallbackInvocationException) {
cause = e.getFallbackException().getCause();
if (cause instanceof HystrixRuntimeException) {
cause = getCause((HystrixRuntimeException) cause);
}
} else if (cause instanceof CommandActionExecutionException) { // this situation is possible only if a callee throws an exception which type extends Throwable directly
CommandActionExecutionException commandActionExecutionException = (CommandActionExecutionException) cause;
cause = commandActionExecutionException.getCause();
}

return Optional.fromNullable(cause).or(e);
}

/**
* A factory to create MetaHolder depending on {@link HystrixPointcutType}.
*/

private static abstract class MetaHolderFactory {
public MetaHolder create(final ProceedingJoinPoint joinPoint) {
Method method = getMethodFromTarget(joinPoint);
Object obj = joinPoint.getTarget();
Object[] args = joinPoint.getArgs();
Object proxy = joinPoint.getThis();
return create(proxy, method, obj, args, joinPoint);
}

public abstract MetaHolder create(Object proxy, Method method, Object obj, Object[] args, final ProceedingJoinPoint joinPoint);

MetaHolder.Builder metaHolderBuilder(Object proxy, Method method, Object obj, Object[] args, final ProceedingJoinPoint joinPoint) {
MetaHolder.Builder builder = MetaHolder.builder()
.args(args).method(method).obj(obj).proxyObj(proxy)
.joinPoint(joinPoint);

setFallbackMethod(builder, obj.getClass(), method);
builder = setDefaultProperties(builder, obj.getClass(), joinPoint);
return builder;
}
}

// (1) ->
private static class CollapserMetaHolderFactory extends MetaHolderFactory {

@Override
public MetaHolder create(Object proxy, Method collapserMethod, Object obj, Object[] args, final ProceedingJoinPoint joinPoint) {
HystrixCollapser hystrixCollapser = collapserMethod.getAnnotation(HystrixCollapser.class);
if (collapserMethod.getParameterTypes().length > 1 || collapserMethod.getParameterTypes().length == 0) {
throw new IllegalStateException("Collapser method must have one argument: " + collapserMethod);
}

Method batchCommandMethod = getDeclaredMethod(obj.getClass(), hystrixCollapser.batchMethod(), List.class);

if (batchCommandMethod == null)
throw new IllegalStateException("batch method is absent: " + hystrixCollapser.batchMethod());

Class<?> batchReturnType = batchCommandMethod.getReturnType();
Class<?> collapserReturnType = collapserMethod.getReturnType();
boolean observable = collapserReturnType.equals(Observable.class);

if (!collapserMethod.getParameterTypes()[0]
.equals(getFirstGenericParameter(batchCommandMethod.getGenericParameterTypes()[0]))) {
throw new IllegalStateException("required batch method for collapser is absent, wrong generic type: expected "
+ obj.getClass().getCanonicalName() + "." +
hystrixCollapser.batchMethod() + "(java.util.List<" + collapserMethod.getParameterTypes()[0] + ">), but it's " +
getFirstGenericParameter(batchCommandMethod.getGenericParameterTypes()[0]));
}

final Class<?> collapserMethodReturnType = getFirstGenericParameter(
collapserMethod.getGenericReturnType(),
Future.class.isAssignableFrom(collapserReturnType) || Observable.class.isAssignableFrom(collapserReturnType) ? 1 : 0);

Class<?> batchCommandActualReturnType = getFirstGenericParameter(batchCommandMethod.getGenericReturnType());
if (!collapserMethodReturnType
.equals(batchCommandActualReturnType)) {
throw new IllegalStateException("Return type of batch method must be java.util.List parametrized with corresponding type: expected " +
"(java.util.List<" + collapserMethodReturnType + ">)" + obj.getClass().getCanonicalName() + "." +
hystrixCollapser.batchMethod() + "(java.util.List<" + collapserMethod.getParameterTypes()[0] + ">), but it's " +
batchCommandActualReturnType);
}

HystrixCommand hystrixCommand = batchCommandMethod.getAnnotation(HystrixCommand.class);
if (hystrixCommand == null) {
throw new IllegalStateException("batch method must be annotated with HystrixCommand annotation");
}
// method of batch hystrix command must be passed to metaholder because basically collapser doesn't have any actions
// that should be invoked upon intercepted method, it's required only for underlying batch command

MetaHolder.Builder builder = metaHolderBuilder(proxy, batchCommandMethod, obj, args, joinPoint);

if (isCompileWeaving()) {
builder.ajcMethod(getAjcMethodAroundAdvice(obj.getClass(), batchCommandMethod.getName(), List.class));
}

builder.hystrixCollapser(hystrixCollapser);
builder.defaultCollapserKey(collapserMethod.getName());
builder.collapserExecutionType(ExecutionType.getExecutionType(collapserReturnType));

builder.defaultCommandKey(batchCommandMethod.getName());
builder.hystrixCommand(hystrixCommand);
builder.executionType(ExecutionType.getExecutionType(batchReturnType));
builder.observable(observable);
FallbackMethod fallbackMethod = MethodProvider.getInstance().getFallbackMethod(obj.getClass(), batchCommandMethod);
if (fallbackMethod.isPresent()) {
fallbackMethod.validateReturnType(batchCommandMethod);
builder
.fallbackMethod(fallbackMethod.getMethod())
.fallbackExecutionType(ExecutionType.getExecutionType(fallbackMethod.getMethod().getReturnType()));
}
return builder.build();
}
}

// (2) ->
private static class CommandMetaHolderFactory extends MetaHolderFactory {
@Override
public MetaHolder create(Object proxy, Method method, Object obj, Object[] args, final ProceedingJoinPoint joinPoint) {
HystrixCommand hystrixCommand = method.getAnnotation(HystrixCommand.class);
//获取返回类型是异步、同步还是响应式(observable)
ExecutionType executionType = ExecutionType.getExecutionType(method.getReturnType());
//构建 MetaHolder,将Hystrix需要用到的参数放入MetaHolder对象里面
//setFallbackMethod(builder, obj.getClass(), method);同时设置falllback
MetaHolder.Builder builder = metaHolderBuilder(proxy, method, obj, args, joinPoint);
if (isCompileWeaving()) {
builder.ajcMethod(getAjcMethodFromTarget(joinPoint));
}
return builder.defaultCommandKey(method.getName())
.hystrixCommand(hystrixCommand)
.observableExecutionMode(hystrixCommand.observableExecutionMode())
.executionType(executionType)
.observable(ExecutionType.OBSERVABLE == executionType)
.build();
}
}
//省略
}

现在来讲一下方法执行的核心


public class CommandExecutor {

/**
被观察者(observable)
观察者(observe)
订阅(subscribe)
*/

public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
Validate.notNull(invokable);
Validate.notNull(metaHolder);

switch (executionType) {
//同步 从依赖的服务中获取到结果
case SYNCHRONOUS: {
return castToExecutable(invokable, executionType).execute();
}
//异步 获取到futrue
case ASYNCHRONOUS: {
HystrixExecutable executable = castToExecutable(invokable, executionType);
if (metaHolder.hasFallbackMethodCommand()
&& ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
return new FutureDecorator(executable.queue());
}
return executable.queue();
}
//观察者模式 回调异步执行命令
//observe()
//toObservable() 只在订阅了之后执行命令
case OBSERVABLE: {
HystrixObservable observable = castToObservable(invokable);
return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
}
default:
throw new RuntimeException("unsupported execution type: " + executionType);
}
}
}


OpenFeign


什么是OpenFeign

ribbon 是一个基于HTTP和TCP的客户端,提供了负载均衡、失败重试、ping等功能;
feign 是一个声明式的Web服务端,内置有ribbon。与ribbon不同的是,fein只需要定义服务绑定接口且以声明式的方法,优雅而简单的实现了服务调用。
openFein 是feign的升级版,支持@RequestMapping这种springMVC的注解


使用


/**
开启feign
*/

@SpringBootApplication
@EnableFeignClients //开启feign
public class UserApplication {
public static void main(String[] args) {
SpringApplication.run(UserApplication.class, args);
}
}
/**
实例使用
*/

@FeignClient(value = "activity")
public interface RegisterFeign {
@RequestMapping("/firstLoginActivity")
public String firstLogin(Long userId);
}


feign原理


@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Import(FeignClientsRegistrar.class)
public @interface EnableFeignClients {

String[] value() default {};

String[] basePackages() default {};

Class<?>[] basePackageClasses() default {};

Class<?>[] defaultConfiguration() default {};

Class<?>[] clients() default {};
}

//使用FeignClientsRegistrar来初始化feiginClient
class FeignClientsRegistrar implements ImportBeanDefinitionRegistrar, ResourceLoaderAware, EnvironmentAware {
@Override
public void registerBeanDefinitions(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {
//加载EnableFeignClients里面的参数配置
registerDefaultConfiguration(metadata, registry);
//注册所发现的各个feign客户端到容器register
registerFeignClients(metadata, registry);
}

public void registerFeignClients(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {

LinkedHashSet<BeanDefinition> candidateComponents = new LinkedHashSet<>();
Map<String, Object> attrs = metadata.getAnnotationAttributes(EnableFeignClients.class.getName());
final Class<?>[] clients = attrs == null ? null : (Class<?>[]) attrs.get("clients");
//如果在@EnableFeignClients中没有标注client,则找寻@FeignClient注解的所有类进行类路径注入
if (clients == null || clients.length == 0) {
ClassPathScanningCandidateComponentProvider scanner = getScanner();
scanner.setResourceLoader(this.resourceLoader);
scanner.addIncludeFilter(new AnnotationTypeFilter(FeignClient.class));
Set<String> basePackages = getBasePackages(metadata);
for (String basePackage : basePackages) {
candidateComponents.addAll(scanner.findCandidateComponents(basePackage));
}
}
//如果在@EnableFeignClients中标注了所有client,则直接进行类路径注入
else {
for (Class<?> clazz : clients) {
candidateComponents.add(new AnnotatedGenericBeanDefinition(clazz));
}
}
//扫描每一个basePackage,获取其中的feign客户端定义
for (BeanDefinition candidateComponent : candidateComponents) {
if (candidateComponent instanceof AnnotatedBeanDefinition) {
// verify annotated class is an interface
AnnotatedBeanDefinition beanDefinition = (AnnotatedBeanDefinition) candidateComponent;
AnnotationMetadata annotationMetadata = beanDefinition.getMetadata();
Assert.isTrue(annotationMetadata.isInterface(), "@FeignClient can only be specified on an interface");

Map<String, Object> attributes = annotationMetadata
.getAnnotationAttributes(FeignClient.class.getCanonicalName());
//将所定义的feign客户端配置属性作为一个bean注册到容器中
String name = getClientName(attributes);
registerClientConfiguration(registry, name, attributes.get("configuration"));
//将所定义feign客户端作为一个bean注入到容器中
registerFeignClient(registry, annotationMetadata, attributes);
}
}
}

//将上述的属性作为一个bean注册到容器中
private void registerClientConfiguration(BeanDefinitionRegistry registry, Object name, Object configuration) {
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(FeignClientSpecification.class);
builder.addConstructorArgValue(name);
builder.addConstructorArgValue(configuration);
registry.registerBeanDefinition(name + "." + FeignClientSpecification.class.getSimpleName(),
builder.getBeanDefinition());
}

//将所定义feign客户端作为一个bean注入到容器中
private void registerFeignClient(BeanDefinitionRegistry registry, AnnotationMetadata annotationMetadata,
Map<String, Object> attributes) {
String className = annotationMetadata.getClassName();
Class clazz = ClassUtils.resolveClassName(className, null);
ConfigurableBeanFactory beanFactory = registry instanceof ConfigurableBeanFactory
? (ConfigurableBeanFactory) registry : null;
String contextId = getContextId(beanFactory, attributes);
String name = getName(attributes);
FeignClientFactoryBean factoryBean = new FeignClientFactoryBean();
factoryBean.setBeanFactory(beanFactory);
factoryBean.setName(name);
factoryBean.setContextId(contextId);
factoryBean.setType(clazz);
BeanDefinitionBuilder definition = BeanDefinitionBuilder.genericBeanDefinition(clazz, () -> {
factoryBean.setUrl(getUrl(beanFactory, attributes));
factoryBean.setPath(getPath(beanFactory, attributes));
factoryBean.setDecode404(Boolean.parseBoolean(String.valueOf(attributes.get("decode404"))));
Object fallback = attributes.get("fallback");
if (fallback != null) {
factoryBean.setFallback(fallback instanceof Class ? (Class<?>) fallback
: ClassUtils.resolveClassName(fallback.toString(), null));
}
Object fallbackFactory = attributes.get("fallbackFactory");
if (fallbackFactory != null) {
factoryBean.setFallbackFactory(fallbackFactory instanceof Class ? (Class<?>) fallbackFactory
: ClassUtils.resolveClassName(fallbackFactory.toString(), null));
}
return factoryBean.getObject();
});
definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);
definition.setLazyInit(true);
validate(attributes);

AbstractBeanDefinition beanDefinition = definition.getBeanDefinition();
beanDefinition.setAttribute(FactoryBean.OBJECT_TYPE_ATTRIBUTE, className);
beanDefinition.setAttribute("feignClientsRegistrarFactoryBean", factoryBean);

// has a default, won't be null
boolean primary = (Boolean) attributes.get("primary");

beanDefinition.setPrimary(primary);

String[] qualifiers = getQualifiers(attributes);
if (ObjectUtils.isEmpty(qualifiers)) {
qualifiers = new String[] { contextId + "FeignClient" };
}

BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className, qualifiers);
BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);
}
}

总结一下FeignClientsRegistrar的主要作用

  1. 加载@EnableClientFeign的参数配置

  2. 注册缺省feign客户端配置bean定义
    我们所定义的@feignClient会以bean的形式注入到bean容器中,当使用@Autowire注册feign客户端的时候,容器会使用工厂类FeignClientFactoryBean为其生成一个实例。


@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface FeignClient {

@AliasFor("name")
String value() default "";

String contextId() default "";

@AliasFor("value")
String name() default "";

@Deprecated
String qualifier() default "";

String[] qualifiers() default {};

String url() default "";

boolean decode404() default false;

Class<?>[] configuration() default {};

Class<?> fallback() default void.class;

Class<?> fallbackFactory() default void.class;

String path() default "";

boolean primary() default true;

}

feign负载均衡的原理(其实也就是ribbon负载均衡的原理)


//通过反射调用到下面一步
public class FeignBlockingLoadBalancerClient implements Client {
@Override
public Response execute(Request request, Request.Options options) throws IOException {
//获取请求的url,例如http://activity/firstLoginActivity
final URI originalUri = URI.create(request.url());
//获取请求的服务名称,例如activity
String serviceId = originalUri.getHost();
//校验服务名称不能为空
Assert.state(serviceId != null, "Request URI does not contain a valid hostname: " + originalUri);
String hint = getHint(serviceId);
//拼接所需要发送的请求
DefaultRequest<RequestDataContext> lbRequest = new DefaultRequest<>(
new RequestDataContext(buildRequestData(request), hint));
//获取在负载均衡前后需要做的一些操作
Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator
.getSupportedLifecycleProcessors(
loadBalancerClientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),
RequestDataContext.class, ResponseData.class, ServiceInstance.class);
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));
//负载均衡核心代码,将URI中的服务名称替换为负载均衡选择的地址
ServiceInstance instance = loadBalancerClient.choose(serviceId, lbRequest);
org.springframework.cloud.client.loadbalancer.Response<ServiceInstance> lbResponse = new DefaultResponse(
instance);
if (instance == null) {
String message = "Load balancer does not contain an instance for the service " + serviceId;
if (LOG.isWarnEnabled()) {
LOG.warn(message);
}
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
.onComplete(new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(
CompletionContext.Status.DISCARD, lbRequest, lbResponse)));
return Response.builder().request(request).status(HttpStatus.SERVICE_UNAVAILABLE.value())
.body(message, StandardCharsets.UTF_8).build();
}
String reconstructedUrl = loadBalancerClient.reconstructURI(instance, originalUri).toString();
Request newRequest = buildRequest(request, reconstructedUrl);
return executeWithLoadBalancerLifecycleProcessing(delegate, options, newRequest, lbRequest, lbResponse,
supportedLifecycleProcessors);
}
}

public class BlockingLoadBalancerClient implements LoadBalancerClient {
@Override
public <T> ServiceInstance choose(String serviceId, Request<T> request) {
ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerClientFactory.getInstance(serviceId);
if (loadBalancer == null) {
return null;
}
Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block();
if (loadBalancerResponse == null) {
return null;
}
return loadBalancerResponse.getServer();
}
}

。。。待更新