vlambda博客
学习文章列表

《微服务专题》SpringCloud-Eureka源码分析解读



         作者: 琅琊才子李自富
出版:琅琊府出版社
   类型:码畜的自我修养

在微服务专题中,我们首先对SpringCloud进行常用组件的学习,比如:服务发现——Eureka,客服端负载均衡——Ribbon,断路器——Hystrix,服务网关——GateWay,模块调用——Open Feign。

本篇我们主要来介绍Eureka。主要从Client和Server两个模块进行讲解,客户端包括服务注册、服务心跳续约,服务列表拉取。服务端主要介绍包括接收注册服务、续约请求等等。按照惯例,我们还是来介绍几个本篇将会使用到的接口和类。


第一章

本篇接口和类的介绍

1、DiscoveryClient 注册中心客户端实体类,主要包含服务注册、服务续约以及其他的操作。

2、InstanceInfo 是注册时候客户端给服务端的服务的元数据。

3、AbstractInstanceRegistry 注册中心服务端具体接收服务注册进行存储的抽象类。

4、ApplicationResource Eureka Server会接收Eureka Client发送的REST请求,进行服务的注册,续约,下线等操作,这部分代码的逻辑都在ApplicationResource这个类中。

5、ConcurrentHashMap<String, Map<String, Lease>>对象中,它是一个两层Map结构,第一层的key存储服务名:InstanceInfo中的appName属性,第二层的key存储实例名:InstanceInfo中的instanceId属性。


第二章

源码入口

1、获取ServiceUrls。 那么,我们就看看来详细看看DiscoveryClient类。先解读一下该类头部的注释有个总体的了解,注释的大致内容如下:

这个类用于帮助与Eureka Server互相协作。

Eureka Client负责了下面的任务:
- 向Eureka Server注册服务实例
- 向Eureka Server为租约续期
- 当服务关闭期间,向Eureka Server取消租约
- 查询Eureka Server中的服务实例列表

Eureka Client还需要配置一个Eureka Server的URL列表。

在具体研究Eureka Client具体负责的任务之前,我们先看看对Eureka Server的URL列表配置在哪里。根据我们配置的属性名:eureka.client.serviceUrl.defaultZone,通过serviceUrl我们找到该属性相关的加载属性,但是在SR5版本中它们都被@Deprecated标注了,并在注视中可以看到@link到了替代类com.netflix.discovery.endpoint.EndpointUtils,我们可以在该类中找到下面这个函数:

public static Map<String, List<String>> getServiceUrlsMapFromConfig(
  EurekaClientConfig clientConfig, String instanceZone, boolean preferSameZone) {
   Map<String, List<String>> orderedUrls = new LinkedHashMap<>();
   String region = getRegion(clientConfig);
   String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion());
   if (availZones == null || availZones.length == 0) {
       availZones = new String[1];
       availZones[0] = DEFAULT_ZONE;
   }
……
   int myZoneOffset = getZoneOffset(instanceZone, preferSameZone, availZones);

   String zone = availZones[myZoneOffset];
   List<String> serviceUrls = clientConfig.getEurekaServerServiceUrls(zone);
   if (serviceUrls != null) {
       orderedUrls.put(zone, serviceUrls);
   }
……
   return orderedUrls;
}

上面这部分代码,我们主要关注点放在获取ServiceUrls就可以。List<String> serviceUrls = clientConfig.getEurekaServerServiceUrls(zone);

public List<String> getEurekaServerServiceUrls(String myZone) {
String serviceUrls = this.serviceUrl.get(myZone);
if (serviceUrls == null || serviceUrls.isEmpty()) {
 serviceUrls = this.serviceUrl.get(DEFAULT_ZONE);
}
if (!StringUtils.isEmpty(serviceUrls)) {
 final String[] serviceUrlsSplit = StringUtils.commaDelimitedListToStringArray(serviceUrls);
 List<String> eurekaServiceUrls = new ArrayList<>(serviceUrlsSplit.length);
 for (String eurekaServiceUrl : serviceUrlsSplit) {
  if (!endsWithSlash(eurekaServiceUrl)) {
   eurekaServiceUrl += "/";
  }
  eurekaServiceUrls.add(eurekaServiceUrl);
 }
 return eurekaServiceUrls;
}
return new ArrayList<>();
}

具体获取serviceUrls的实现,我们可以详细查看getEurekaServerServiceUrls函数的具体实现类EurekaClientConfigBean,该类是EurekaClientConfig和EurekaConstants接口的实现,用来加载配置文件中的内容,这里有非常多有用的信息,这里我们先说一下此处我们关心的,关于defaultZone的信息。通过搜索defaultZone,我们可以很容易的找到下面这个函数,它具体实现了,如何解析该参数的过程,通过此内容,我们就可以知道,eureka.client.serviceUrl.defaultZone属性可以配置多个,并且需要通过逗号分隔。

2、服务注册和服务续约。 DiscoveryClient.initScheduledTasks(),下面这部分代码主要是服务注册和心跳续约的逻辑。

private void initScheduledTasks() {
   ...
   // 根据配置是否需要注册到server端
   if (clientConfig.shouldRegisterWithEureka()) {
         // Heartbeat timer   这里是定时任务进行心跳续约注册
           heartbeatTask = new TimedSupervisorTask(
                   "heartbeat",
                   scheduler,
                   heartbeatExecutor,
                   renewalIntervalInSecs,
                   TimeUnit.SECONDS,
                   expBackOffBound,
                   new HeartbeatThread()
           );
           scheduler.schedule(
                   heartbeatTask,
                   renewalIntervalInSecs, TimeUnit.SECONDS);

       // 建了一个InstanceInfoReplicator类的实例,它会执行一个定时任务
       instanceInfoReplicator = new InstanceInfoReplicator(
               this,
              instanceInfo,
               clientConfig.getInstanceInfoReplicationIntervalSeconds(),
               2); // burstSize
       ...
       instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
   } else {
       logger.info("Not registering with Eureka server per configuration");
   }
}

InstanceInfoReplicator是一个实现了Runnable接口的子类,我们直接看run()的逻辑里面的 discoveryClient.register();

boolean register() throws Throwable {
   logger.info(PREFIX + appPathIdentifier + ": registering service...");
   EurekaHttpResponse<Void> httpResponse;
   try {
       httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
   } catch (Exception e) {
       logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
       throw e;
   }
   if (logger.isInfoEnabled()) {
       logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
   }
   return httpResponse.getStatusCode() == 204;
}

通过属性命名,大家基本也能猜出来,注册操作也是通过REST请求的方式进行的。同时,这里我们也能看到发起注册请求的时候,传入了一个com.netflix.appinfo.InstanceInfo对象,该对象就是注册时候客户端给服务端的服务的元数据。 “服务续约”的实现较为简单,直接以REST请求的方式进行续约: HeartbeatThread也是一个实现了Runnable的子类。我们定时任务执行这个Runnable是,直接查看run()实现逻辑。

public void run() {
           if (renew()) {
               lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
           }
       }
// 发起REST请求进行服务续约。
boolean renew() {
   EurekaHttpResponse<InstanceInfo> httpResponse;
   try {
       httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
       logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
       if (httpResponse.getStatusCode() == 404) {
           REREGISTER_COUNTER.increment();
           logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
           return register();
       }
       return httpResponse.getStatusCode() == 200;
   } catch (Throwable e) {
       logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
       return false;
   }
}

3、服务拉取

if (clientConfig.shouldFetchRegistry()) {
           // registry cache refresh timer
           int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
           int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
           cacheRefreshTask = new TimedSupervisorTask(
                   "cacheRefresh",
                   scheduler,
                   cacheRefreshExecutor,
                   registryFetchIntervalSeconds,
                   TimeUnit.SECONDS,
                   expBackOffBound,
                   new CacheRefreshThread()
           );
           scheduler.schedule(
                   cacheRefreshTask,
                   registryFetchIntervalSeconds, TimeUnit.SECONDS);
       }

在 if (clientConfig.shouldFetchRegistry()) {}逻辑里面。其判断依据就是我们之前所提到的eureka.client.fetch-registry=true参数,它默认是为true的,大部分情况下我们不需要关心。为了定期的更新客户端的服务清单,以保证服务访问的正确性,“服务获取”的请求不会只限于服务启动,而是一个定时执行的任务,从源码中我们可以看到任务运行中的registryFetchIntervalSeconds参数对应eureka.client.registry-fetch-interval-seconds=30配置参数,它默认为30秒。 4、server服务端。 以“服务注册”请求为例:

@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
                 @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
   logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
   // validate that the instanceinfo contains all the necessary required fields
   ...
   // handle cases where clients may be registering with bad DataCenterInfo with missing data
   DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
   if (dataCenterInfo instanceof UniqueIdentifier) {
       String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
       if (isBlank(dataCenterInfoId)) {
           boolean experimental = "true".equalsIgnoreCase(
    serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
           if (experimental) {
               String entity = "DataCenterInfo of type " + dataCenterInfo.getClass()
         + " must contain a valid id";
               return Response.status(400).entity(entity).build();
           } else if (dataCenterInfo instanceof AmazonInfo) {
               AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
               String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
               if (effectiveId == null) {
                   amazonInfo.getMetadata().put(
      AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
               }
           } else {
               logger.warn("Registering DataCenterInfo of type {} without an appropriate id",
     dataCenterInfo.getClass());
           }
       }
   }

   registry.register(info, "true".equals(isReplication));
   return Response.status(204).build();  // 204 to be backwards compatible
}

在对注册信息进行了一大堆校验之后,会调用org.springframework.cloud.netflix.eureka.server.InstanceRegistry对象中的register(InstanceInfo info, int leaseDuration, boolean isReplication)函数来进行服务注册:

public void register(InstanceInfo info, int leaseDuration, boolean isReplication) {
if (log.isDebugEnabled()) {
 log.debug("register " + info.getAppName() + ", vip " + info.getVIPAddress()
   + ", leaseDuration " + leaseDuration + ", isReplication "
   + isReplication);
}
this.ctxt.publishEvent(new EurekaInstanceRegisteredEvent(this, info,
  leaseDuration, isReplication));

super.register(info, leaseDuration, isReplication);
}

在注册函数中,先调用publishEvent函数,将该新服务注册的事件传播出去,然后调用com.netflix.eureka.registry.AbstractInstanceRegistry父类中的注册实现,将InstanceInfo中的元数据信息存储在一个ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>对象中,它是一个两层Map结构,第一层的key存储服务名:InstanceInfo中的appName属性,第二层的key存储实例名:InstanceInfo中的instanceId属性。更新完map信息后,还会更新缓存信息。 注册完信息后,调用了replicateToPeers方法,向其他Eureka Server转发该注册信息,以便实现信息的同步。进到这个方法里面看下:

private void replicateToPeers(Action action, String appName, String id,
                                 InstanceInfo info /* optional */,
                                 InstanceStatus newStatus /* optional */, boolean isReplication) {
       Stopwatch tracer = action.getTimer().start();
       try {
           if (isReplication) {
               numberOfReplicationsLastMin.increment();
           }
           // If it is a replication already, do not replicate again as this will create a poison replication
           if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
               return;
           }

           for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
               // If the url represents this host, do not replicate to yourself.
               if (peerEurekaNodes.isThisMe(node.getServiceUrl())) {
                   continue;
               }
               replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
           }
       } finally {
           tracer.stop();
       }
   }

方法中会通过for循环遍历所有的PeerEurekaNode,调用replicateInstanceActionsToPeers方法,把信息复制给其他的Eureka Server节点,下面是replicateInstanceActionsToPeers方法:

 private void replicateInstanceActionsToPeers(Action action, String appName,
                                                String id, InstanceInfo info, InstanceStatus newStatus,
                                                PeerEurekaNode node) {
       try {
           InstanceInfo infoFromRegistry = null;
           CurrentRequestVersion.set(Version.V2);
           switch (action) {
               case Cancel:
                   node.cancel(appName, id);
                   break;
               case Heartbeat:
                   InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                   infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                   node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                   break;
               case Register:
                   node.register(info);
                   break;
               case StatusUpdate:
                   infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                   node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                   break;
               case DeleteStatusOverride:
                   infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                   node.deleteStatusOverride(appName, id, infoFromRegistry);
                   break;
           }
       } catch (Throwable t) {
           logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
       }

方法中,会判断action具体的动作,如果是Register,就会调用node.register(info);

  public void register(final InstanceInfo info) throws Exception {
       long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
       batchingDispatcher.process(
               taskId("register", info),
               new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
                   public EurekaHttpResponse<Void> execute() {
                       return replicationClient.register(info);
                   }
               },
               expiryTime
       );
   }

在该方法中,是通过启动了一个任务,来向其它节点同步信息的,不是实时同步的。


第三章

总结

Eureka的整体处理逻辑---主要从Client和Server两个模块进行处理,客户端包括服务注册、服务心跳续约,服务列表拉取。服务端的主要工作包括接收注册服务、续约请求等等。在底层源码中大量使用了定时任务和线程来进行任务逻辑的处理。里面还试用了读写锁进行效率优化问题,ReentrantReadWriteLock的readLock()和writeLock()。下一篇,我们讲解Open Feign。