vlambda博客
学习文章列表

Eureka注册中心功能及源码分析

近年来微服务不断的被应用,然而随着公司业务的增加,服务也不断的增加,从而导致服务实例失败与成功也难以维护。所以此时将众多的服务实例交由注册中心统一进行管理。而服务治理的最基本需求就是服务注册于服务发现,同时还需要服务治理的高可用。然而今天就从Netfilx的Eureka说起。 真的是太多了,不想写,最后附上一个详细的调用流程图吧,心烦,衰。
Eureka的核心功能
  • 服务注册:EurekaClient会通过Http请求将自身的元信息上报给EurekaServer

  • 服务发现:EurekaClient在启动时会通过Http请求到EurekaServer中获取注册表的信息,同时会将其保存在本地,并且在本地会开启一个定时任务从EurekaServer中获取最新的注册信息,而EurekaServer中的注册信息会每隔30秒进行更新。

  • 服务续约:EurekaClient会通过Http请求进行服务续约,防止自身被误判从而导致剔除服务。

  • 服务同步:主要用于EurekaServer进行集群部署,每个EurekaServer都会进行Http请求同步,以保证各自信息一致性。

  • 服务调用:消费者(EurekaClient)从EurekaServer中获取到注册信息后,会通过Ribbon进行负载算法选择其中一个服务通过Http请求进行调用。

源码解析将略过SpringBoot源码,直接从SpringCloud Eureka源码进行讲解

EurekaServer源码解析

EurekaServer更像是一个Http服务器,说白了就是我们平时使用的SpringMVC,只不过说NetFilx的Eureka处理Http请求不是使用的SpringMVC而是使用Jersey,当然其原理都是一样的。 然而EurekaServer有两大功能,从集群节点获取注册表信息以及通过定时任务剔除服务。EurekaServer使用的 spring-cloud-starter-netflix-eureka-server 包,同时还需要在启动类上使用 @EnableEurekaServer 注解。

@EnableEurekaServer

然而这个注解里面其实并没有什么东西,也只是通过@Import注解导入了一个EurekaServerMarkerConfiguration,其主要所用是用于做一个标记配合Spring的@ConditionalOnBean注解使用的,当该类EurekaServerMarkerConfiguration存在就会对EurekaServerAutoConfiguration类进行加载以及初始化。

EurekaServerAutoConfiguration

EurekaServerAutoConfiguration类会在SpringBoot启动时会通过自动装配加载 spring-cloud-starter-netflix-eureka-server包中META-INF目录下的spring.factories文件中的类,那么也就是加载EurekaServerAutoConfiguration类 ‍,而此类主要为了实例化一些基本类
/** * @author Gunnar Hillert * @author Biju Kunjummen * @author Fahim Farook */@Configuration@Import(EurekaServerInitializerConfiguration.class//此类很重要主要y用于初始化EurekaServer配置@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)@EnableConfigurationProperties({ EurekaDashboardProperties.class, InstanceRegistryProperties.class })@PropertySource("classpath:/eureka/server.properties")public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {    //这个就是主要用于dashboard @Bean @ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true) public EurekaController eurekaController() { return new EurekaController(this.applicationInfoManager); }    //EurekaServer上下文 @Bean public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) { return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs, registry, peerEurekaNodes, this.applicationInfoManager); }    //EurekaServer启动类-》此类很重要 @Bean public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry, EurekaServerContext serverContext) { return new EurekaServerBootstrap(this.applicationInfoManager, this.eurekaClientConfig, this.eurekaServerConfig, registry, serverContext); }     /**     * 主要是Jersey使用 * Register the Jersey filter. * @param eurekaJerseyApp an {@link Application} for the filter to be registered * @return a jersey {@link FilterRegistrationBean} */ @Bean public FilterRegistrationBean jerseyFilterRegistration( javax.ws.rs.core.Application eurekaJerseyApp) { FilterRegistrationBean bean = new FilterRegistrationBean(); bean.setFilter(new ServletContainer(eurekaJerseyApp)); bean.setOrder(Ordered.LOWEST_PRECEDENCE); bean.setUrlPatterns(          Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*")); return bean; }}

EurekaServerInitializerConfiguration

这个类主要是由 EurekaServerAutoConfiguration类通过@Import进行导入的,然而此类实现了Spring的 SmartLifecycle生命周期类,也就是说会在Spring将容器中的Bean进行初始化完成后,就会调用SmartLifecycle类中start()方法。那么下面就来分析下这个类
@Configurationpublic class EurekaServerInitializerConfiguration    implements ServletContextAwareSmartLifecycleOrdered {    @Override public void start() { new Thread(() -> { try {          // TODO: is this class even needed now?           //这里英文很清晰,EurekaServer启动从这里开始 eurekaServerBootstrap.contextInitialized( EurekaServerInitializerConfiguration.this.servletContext); log.info("Started Eureka Server");  publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig())); EurekaServerInitializerConfiguration.this.running = true; publish(new EurekaServerStartedEvent(getEurekaServerConfig())); } catch (Exception ex) { // Help! log.error("Could not initialize Eureka servlet context", ex); } }).start(); }}
上面介绍的两个类就是EurekaServer源码的主要入口类, 那么下面将直接从EurekaServer的两大功能说起
protected void initEurekaServerContext() throws Exception { // For backward compatibility JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH); XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);
if (isAws(this.applicationInfoManager.getInfo())) { this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig, this.eurekaClientConfig, this.registry, this.applicationInfoManager); this.awsBinder.start();    }    EurekaServerContextHolder.initialize(this.serverContext);    log.info("Initialized server context"); // Copy registry from neighboring eureka node    //从集群节点同步注册表信息 int registryCount = this.registry.syncUp();    //剔除服务    this.registry.openForTraffic(this.applicationInfoManager, registryCount); // Register all monitoring statistics. EurekaMonitors.registerAllStats(); }

从集群节点同步注册表信息

@Overridepublic int syncUp() { // Copy entire entry from neighboring DS node  int count = 0; for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) { if (i > 0) { try { Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs()); } catch (InterruptedException e) { logger.warn("Interrupted during registry transfer.."); break; } }      //这里就是获取到所有的注册实例 Applications apps = eurekaClient.getApplications(); //拿到所有的注册实例 for (Application app : apps.getRegisteredApplications()) { //循环所有的注册实例 for (InstanceInfo instance : app.getInstances()) { try {                  //判断是否可注册 if (isRegisterable(instance)) {                      /**                       * 注册条件:根据是否有租约,如果有则保留最新时间                       * 否则更新为最初时间                       * 同时修改实例的状态为UP,以及Type为ADDED                       **/ register(instance, instance.getLeaseInfo().getDurationInSecs(), true); count++; } } catch (Throwable t) { logger.error("During DS init copy", t); } } } } return count; }

定时任务剔除服务

 protected void postInit() { renewsLastMin.start(); if (evictionTaskRef.get() != null) { evictionTaskRef.get().cancel(); }      //剔除服务的任务,主要就是看定时任务EvictionTask evictionTaskRef.set(new EvictionTask());      //定时器schedule初始化方法 evictionTimer.schedule(evictionTaskRef.get(), serverConfig.getEvictionIntervalTimerInMs(), serverConfig.getEvictionIntervalTimerInMs()); }

剔除服务任务

其实我觉得这里设计的很巧妙,在一线大厂BATJ,或者说NetFilx这些公司肯定动辄就是上千的服务,所以并不会一次性直接把这些服务直接剔除,而是先找到这些需要剔除的服务,然后通过随机的方式剔除服务,为什么要这样做?
我觉得在大型公司EurekaServer肯定是集群方式,也就是说把需要剔除的服务通过随机的方式让每个EurekaServer都能够剔除服务,而不是说让其中一个EurekaServer去剔除服务。
public void evict(long additionalLeaseMs) { logger.debug("Running the evict task");
if (!isLeaseExpirationEnabled()) { logger.debug("DS: lease expiration is currently disabled."); return; }
// We collect first all expired items, to evict them in random order. For large eviction sets, // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it, // the impact should be evenly distributed across all applications.    //这里就是把所有需要剔除的服务找出来 List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>(); for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) { Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue(); if (leaseMap != null) { for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) { Lease<InstanceInfo> lease = leaseEntry.getValue(); if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) { expiredLeases.add(lease); } } } }
// To compensate for GC pauses or drifting local time, we need to use current registry size as a base for // triggering self-preservation. Without that we would wipe out full registry.    //本地注册表大小    int registrySize = (int) getLocalRegistrySize();    //注册表大小阈值 int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold()); int evictionLimit = registrySize - registrySizeThreshold;    //这里计算出当前EurekaServer需要剔除的服务数量,然后取最小值 int toEvict = Math.min(expiredLeases.size(), evictionLimit); if (toEvict > 0) { logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit); // Random random = new Random(System.currentTimeMillis()); for (int i = 0; i < toEvict; i++) { // Pick a random item (Knuth shuffle algorithm) int next = i + random.nextInt(expiredLeases.size() - i);            //此方法用于交换位置,也就是说会将集合中的i和next元素进行换位 Collections.swap(expiredLeases, i, next); Lease<InstanceInfo> lease = expiredLeases.get(i); String appName = lease.getHolder().getAppName(); String id = lease.getHolder().getId(); EXPIRED.increment(); logger.warn("DS: Registry: expired lease for {}/{}", appName, id);            // 然后就会将该实例从注册表中删除,并修改该实例的类型为DELETED,            // 其实就是为了告知其他节点,这个服务需要删除 internalCancel(appName, id, false); } }}

EurekaClient源码解析

EurekaClient需要将自身的元信息告知到EurekaServer,同时还需要到EurekaServer拉取所有的注册信息并缓存到本地然后Ribbon通过负载算法选择一个服务进行调用。如果需要将服务声明为EurekaClient就需要加上@EnableEurekaClient或者@EnableDiscoveryClient,以及添加spring-cloud-starter-netflix-eureka-client包。然后SpringBoot会在启动时通过自动装配加载器META-INF/spring.factories文件中的 EurekaClientAutoConfiguration类。

EurekaClientAutoConfiguration类

@Configuration@EnableConfigurationProperties@ConditionalOnClass(EurekaClientConfig.class)@Import(DiscoveryClientOptionalArgsConfiguration.class)@ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class)@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)@ConditionalOnDiscoveryEnabled@AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class, CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })@AutoConfigureAfter(name = { "org.springframework.cloud.autoconfigure.RefreshAutoConfiguration", "org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration", "org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration" })public class EurekaClientAutoConfiguration {      /**    * 其实这个类最主要的地方就是实例话NetFilx原生的EurekaClient    **/   @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT) public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config) { return new CloudEurekaClient(manager, config, this.optionalArgs, this.context); }      /**     * 这个是SpringCloud对EurekaClient的封装,并将原生的EurekaClient注入到SpringCloud的EurekaClient     **/     @Bean public DiscoveryClient discoveryClient(EurekaClient client, EurekaClientConfig clientConfig) { return new EurekaDiscoveryClient(client, clientConfig);    }   }

DiscoveryClient类源码

这个类的主要作用就对定时任务的初始化以及任务执行,比如服务注册,服务发现,服务续约等.
public class DiscoveryClient implements EurekaClient {  /**   * 只截取最重要部分   **/ try { // default size of 2 - 1 each for heartbeat and cacheRefresh scheduler = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d") .setDaemon(true) .build());     //服务续约的线程池 heartbeatExecutor = new ThreadPoolExecutor( 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d") .setDaemon(true) .build() ); // use direct handoff     //服务发现的线程池 cacheRefreshExecutor = new ThreadPoolExecutor( 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d") .setDaemon(true) .build()             );  // use direct handoff } catch (Throwable e) { throw new RuntimeException("Failed to initialize DiscoveryClient!", e);  // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch //最终,初始化这些定时任务     initScheduledTasks();}
private void initScheduledTasks() { if (clientConfig.shouldFetchRegistry()) { // registry cache refresh timer int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); scheduler.schedule( new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound,                        new CacheRefreshThread() //服务发现任务 ), registryFetchIntervalSeconds, TimeUnit.SECONDS); }
if (clientConfig.shouldRegisterWithEureka()) { int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();        logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs); // Heartbeat timer scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound,                        new HeartbeatThread()//服务续约任务 ), renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; }
@Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } };
if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } //服务注册任务 instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); }}

服务发现

private boolean fetchRegistry(boolean forceFullRegistryFetch) {    Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); try { // If the delta is disabled or if it is the first time, get all // applications Applications applications = getApplications();        //如果是第一次启动或者禁用了增量跟新注册信息,就会全量拉取注册信息 if (clientConfig.shouldDisableDelta() || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch || (applications == null) || (applications.getRegisteredApplications().size() == 0) || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta { logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta()); logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress()); logger.info("Force full registry fetch : {}", forceFullRegistryFetch); logger.info("Application is null : {}", (applications == null)); logger.info("Registered Applications size is zero : {}", (applications.getRegisteredApplications().size() == 0)); logger.info("Application version is -1: {}", (applications.getVersion() == -1));            getAndStoreFullRegistry();//全量拉取注册信息 } else {            getAndUpdateDelta(applications);//增量拉取注册信息 } applications.setAppsHashCode(applications.getReconcileHashCode()); logTotalInstances(); } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e); return false; } finally { if (tracer != null) { tracer.stop(); }    } return true;}

详细调用流程图