Eureka注册中心功能及源码分析
近年来微服务不断的被应用,然而随着公司业务的增加,服务也不断的增加,从而导致服务实例失败与成功也难以维护。所以此时将众多的服务实例交由注册中心统一进行管理。而服务治理的最基本需求就是服务注册于服务发现,同时还需要服务治理的高可用。然而今天就从Netfilx的Eureka说起。 真的是太多了,不想写,最后附上一个详细的调用流程图吧,心烦,衰。
服务注册:EurekaClient会通过Http请求将自身的元信息上报给EurekaServer
服务发现:EurekaClient在启动时会通过Http请求到EurekaServer中获取注册表的信息,同时会将其保存在本地,并且在本地会开启一个定时任务从EurekaServer中获取最新的注册信息,而EurekaServer中的注册信息会每隔30秒进行更新。
服务续约:EurekaClient会通过Http请求进行服务续约,防止自身被误判从而导致剔除服务。
服务同步:主要用于EurekaServer进行集群部署,每个EurekaServer都会进行Http请求同步,以保证各自信息一致性。
服务调用:消费者(EurekaClient)从EurekaServer中获取到注册信息后,会通过Ribbon进行负载算法选择其中一个服务通过Http请求进行调用。
源码解析将略过SpringBoot源码,直接从SpringCloud Eureka源码进行讲解
EurekaServer源码解析
EurekaServer更像是一个Http服务器,说白了就是我们平时使用的SpringMVC,只不过说NetFilx的Eureka处理Http请求不是使用的SpringMVC而是使用Jersey,当然其原理都是一样的。 然而EurekaServer有两大功能,从集群节点获取注册表信息以及通过定时任务剔除服务。EurekaServer使用的 spring-cloud-starter-netflix-eureka-server 包,同时还需要在启动类上使用 @EnableEurekaServer 注解。
@EnableEurekaServer
然而这个注解里面其实并没有什么东西,也只是通过@Import注解导入了一个EurekaServerMarkerConfiguration,其主要所用是用于做一个标记配合Spring的@ConditionalOnBean注解使用的,当该类EurekaServerMarkerConfiguration存在就会对EurekaServerAutoConfiguration类进行加载以及初始化。
EurekaServerAutoConfiguration
EurekaServerAutoConfiguration类会在SpringBoot启动时会通过自动装配加载 spring-cloud-starter-netflix-eureka-server包中META-INF目录下的spring.factories文件中的类,那么也就是加载EurekaServerAutoConfiguration类 ,而此类主要为了实例化一些基本类
/*** @author Gunnar Hillert* @author Biju Kunjummen* @author Fahim Farook*/@Configuration@Import(EurekaServerInitializerConfiguration.class) //此类很重要主要y用于初始化EurekaServer配置@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)@EnableConfigurationProperties({ EurekaDashboardProperties.class,InstanceRegistryProperties.class })@PropertySource("classpath:/eureka/server.properties")public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {//这个就是主要用于dashboard@Bean@ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true)public EurekaController eurekaController() {return new EurekaController(this.applicationInfoManager);}//EurekaServer上下文@Beanpublic EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,registry, peerEurekaNodes, this.applicationInfoManager);}//EurekaServer启动类-》此类很重要@Beanpublic EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,EurekaServerContext serverContext) {return new EurekaServerBootstrap(this.applicationInfoManager,this.eurekaClientConfig, this.eurekaServerConfig, registry,serverContext);}/*** 主要是Jersey使用* Register the Jersey filter.* @param eurekaJerseyApp an {@link Application} for the filter to be registered* @return a jersey {@link FilterRegistrationBean}*/@Beanpublic FilterRegistrationBean jerseyFilterRegistration(javax.ws.rs.core.Application eurekaJerseyApp) {FilterRegistrationBean bean = new FilterRegistrationBean();bean.setFilter(new ServletContainer(eurekaJerseyApp));bean.setOrder(Ordered.LOWEST_PRECEDENCE);bean.setUrlPatterns(Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));return bean;}}
EurekaServerInitializerConfiguration
这个类主要是由 EurekaServerAutoConfiguration类通过@Import进行导入的,然而此类实现了Spring的 SmartLifecycle生命周期类,也就是说会在Spring将容器中的Bean进行初始化完成后,就会调用SmartLifecycle类中start()方法。那么下面就来分析下这个类
public class EurekaServerInitializerConfigurationimplements ServletContextAware, SmartLifecycle, Ordered {public void start() {new Thread(() -> {try {// TODO: is this class even needed now?//这里英文很清晰,EurekaServer启动从这里开始eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);log.info("Started Eureka Server");publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));EurekaServerInitializerConfiguration.this.running = true;publish(new EurekaServerStartedEvent(getEurekaServerConfig()));}catch (Exception ex) {// Help!log.error("Could not initialize Eureka servlet context", ex);}}).start();}}
上面介绍的两个类就是EurekaServer源码的主要入口类, 那么下面将直接从EurekaServer的两大功能说起
protected void initEurekaServerContext() throws Exception {// For backward compatibilityJsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),XStream.PRIORITY_VERY_HIGH);XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),XStream.PRIORITY_VERY_HIGH);if (isAws(this.applicationInfoManager.getInfo())) {this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,this.eurekaClientConfig, this.registry, this.applicationInfoManager);this.awsBinder.start();}EurekaServerContextHolder.initialize(this.serverContext);log.info("Initialized server context");// Copy registry from neighboring eureka node//从集群节点同步注册表信息int registryCount = this.registry.syncUp();//剔除服务this.registry.openForTraffic(this.applicationInfoManager, registryCount);// Register all monitoring statistics.EurekaMonitors.registerAllStats();}
从集群节点同步注册表信息
@Overridepublic int syncUp() {// Copy entire entry from neighboring DS nodeint count = 0;for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {if (i > 0) {try {Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());} catch (InterruptedException e) {logger.warn("Interrupted during registry transfer..");break;}}//这里就是获取到所有的注册实例Applications apps = eurekaClient.getApplications();//拿到所有的注册实例for (Application app : apps.getRegisteredApplications()) {//循环所有的注册实例for (InstanceInfo instance : app.getInstances()) {try {//判断是否可注册if (isRegisterable(instance)) {/*** 注册条件:根据是否有租约,如果有则保留最新时间* 否则更新为最初时间* 同时修改实例的状态为UP,以及Type为ADDED**/register(instance, instance.getLeaseInfo().getDurationInSecs(), true);count++;}} catch (Throwable t) {logger.error("During DS init copy", t);}}}}return count;}
定时任务剔除服务
protected void postInit() {renewsLastMin.start();if (evictionTaskRef.get() != null) {evictionTaskRef.get().cancel();}//剔除服务的任务,主要就是看定时任务EvictionTaskevictionTaskRef.set(new EvictionTask());//定时器schedule初始化方法evictionTimer.schedule(evictionTaskRef.get(),serverConfig.getEvictionIntervalTimerInMs(),serverConfig.getEvictionIntervalTimerInMs());}
剔除服务任务
其实我觉得这里设计的很巧妙,在一线大厂BATJ,或者说NetFilx这些公司肯定动辄就是上千的服务,所以并不会一次性直接把这些服务直接剔除,而是先找到这些需要剔除的服务,然后通过随机的方式剔除服务,为什么要这样做? 我觉得在大型公司EurekaServer肯定是集群方式,也就是说把需要剔除的服务通过随机的方式让每个EurekaServer都能够剔除服务,而不是说让其中一个EurekaServer去剔除服务。
public void evict(long additionalLeaseMs) {logger.debug("Running the evict task");if (!isLeaseExpirationEnabled()) {logger.debug("DS: lease expiration is currently disabled.");return;}// We collect first all expired items, to evict them in random order. For large eviction sets,// if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,// the impact should be evenly distributed across all applications.//这里就是把所有需要剔除的服务找出来List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();if (leaseMap != null) {for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {Lease<InstanceInfo> lease = leaseEntry.getValue();if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {expiredLeases.add(lease);}}}}// To compensate for GC pauses or drifting local time, we need to use current registry size as a base for// triggering self-preservation. Without that we would wipe out full registry.//本地注册表大小int registrySize = (int) getLocalRegistrySize();//注册表大小阈值int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());int evictionLimit = registrySize - registrySizeThreshold;//这里计算出当前EurekaServer需要剔除的服务数量,然后取最小值int toEvict = Math.min(expiredLeases.size(), evictionLimit);if (toEvict > 0) {logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);//Random random = new Random(System.currentTimeMillis());for (int i = 0; i < toEvict; i++) {// Pick a random item (Knuth shuffle algorithm)int next = i + random.nextInt(expiredLeases.size() - i);//此方法用于交换位置,也就是说会将集合中的i和next元素进行换位Collections.swap(expiredLeases, i, next);Lease<InstanceInfo> lease = expiredLeases.get(i);String appName = lease.getHolder().getAppName();String id = lease.getHolder().getId();EXPIRED.increment();logger.warn("DS: Registry: expired lease for {}/{}", appName, id);// 然后就会将该实例从注册表中删除,并修改该实例的类型为DELETED,// 其实就是为了告知其他节点,这个服务需要删除internalCancel(appName, id, false);}}}
EurekaClient源码解析
EurekaClient需要将自身的元信息告知到EurekaServer,同时还需要到EurekaServer拉取所有的注册信息并缓存到本地然后Ribbon通过负载算法选择一个服务进行调用。如果需要将服务声明为EurekaClient就需要加上@EnableEurekaClient或者@EnableDiscoveryClient,以及添加spring-cloud-starter-netflix-eureka-client包。然后SpringBoot会在启动时通过自动装配加载器META-INF/spring.factories文件中的 EurekaClientAutoConfiguration类。
EurekaClientAutoConfiguration类
@Configuration@EnableConfigurationProperties@ConditionalOnClass(EurekaClientConfig.class)@Import(DiscoveryClientOptionalArgsConfiguration.class)@ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class)@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)@ConditionalOnDiscoveryEnabled@AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class,CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })@AutoConfigureAfter(name = {"org.springframework.cloud.autoconfigure.RefreshAutoConfiguration","org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration","org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration" })public class EurekaClientAutoConfiguration {/*** 其实这个类最主要的地方就是实例话NetFilx原生的EurekaClient**/@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)public EurekaClient eurekaClient(ApplicationInfoManager manager,EurekaClientConfig config) {return new CloudEurekaClient(manager, config, this.optionalArgs,this.context);}/*** 这个是SpringCloud对EurekaClient的封装,并将原生的EurekaClient注入到SpringCloud的EurekaClient**/@Beanpublic DiscoveryClient discoveryClient(EurekaClient client,EurekaClientConfig clientConfig) {return new EurekaDiscoveryClient(client, clientConfig);}}
DiscoveryClient类源码
这个类的主要作用就对定时任务的初始化以及任务执行,比如服务注册,服务发现,服务续约等.
public class DiscoveryClient implements EurekaClient {/*** 只截取最重要部分**/try {// default size of 2 - 1 each for heartbeat and cacheRefreshscheduler = Executors.newScheduledThreadPool(2,new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-%d").setDaemon(true).build());//服务续约的线程池heartbeatExecutor = new ThreadPoolExecutor(1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-HeartbeatExecutor-%d").setDaemon(true).build()); // use direct handoff//服务发现的线程池cacheRefreshExecutor = new ThreadPoolExecutor(1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d").setDaemon(true).build()); // use direct handoff} catch (Throwable e) {throw new RuntimeException("Failed to initialize DiscoveryClient!", e);}// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch//最终,初始化这些定时任务initScheduledTasks();}
private void initScheduledTasks() {if (clientConfig.shouldFetchRegistry()) {// registry cache refresh timerint registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();scheduler.schedule(new TimedSupervisorTask("cacheRefresh",scheduler,cacheRefreshExecutor,registryFetchIntervalSeconds,TimeUnit.SECONDS,expBackOffBound,new CacheRefreshThread() //服务发现任务),registryFetchIntervalSeconds, TimeUnit.SECONDS);}if (clientConfig.shouldRegisterWithEureka()) {int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);// Heartbeat timerscheduler.schedule(new TimedSupervisorTask("heartbeat",scheduler,heartbeatExecutor,renewalIntervalInSecs,TimeUnit.SECONDS,expBackOffBound,new HeartbeatThread()//服务续约任务),renewalIntervalInSecs, TimeUnit.SECONDS);// InstanceInfo replicatorinstanceInfoReplicator = new InstanceInfoReplicator(this,instanceInfo,clientConfig.getInstanceInfoReplicationIntervalSeconds(),2); // burstSizestatusChangeListener = new ApplicationInfoManager.StatusChangeListener() {public String getId() {return "statusChangeListener";}public void notify(StatusChangeEvent statusChangeEvent) {if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {// log at warn level if DOWN was involvedlogger.warn("Saw local status change event {}", statusChangeEvent);} else {logger.info("Saw local status change event {}", statusChangeEvent);}instanceInfoReplicator.onDemandUpdate();}};if (clientConfig.shouldOnDemandUpdateStatusChange()) {applicationInfoManager.registerStatusChangeListener(statusChangeListener);}//服务注册任务instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());} else {logger.info("Not registering with Eureka server per configuration");}}
服务发现
private boolean fetchRegistry(boolean forceFullRegistryFetch) {Stopwatch tracer = FETCH_REGISTRY_TIMER.start();try {// If the delta is disabled or if it is the first time, get all// applicationsApplications applications = getApplications();//如果是第一次启动或者禁用了增量跟新注册信息,就会全量拉取注册信息if (clientConfig.shouldDisableDelta()|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))|| forceFullRegistryFetch|| (applications == null)|| (applications.getRegisteredApplications().size() == 0)|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta{logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());logger.info("Force full registry fetch : {}", forceFullRegistryFetch);logger.info("Application is null : {}", (applications == null));logger.info("Registered Applications size is zero : {}",(applications.getRegisteredApplications().size() == 0));logger.info("Application version is -1: {}", (applications.getVersion() == -1));getAndStoreFullRegistry();//全量拉取注册信息} else {getAndUpdateDelta(applications);//增量拉取注册信息}applications.setAppsHashCode(applications.getReconcileHashCode());logTotalInstances();} catch (Throwable e) {logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);return false;} finally {if (tracer != null) {tracer.stop();}}return true;}
详细调用流程图
