vlambda博客
学习文章列表

最新版Nacos 2.X服务端源码分析(一)


前言

从本篇开始我们进入到Nacos服务端相关源码的介绍,本系列基于Nacos2.x最新版相关内容,请大家持续关注。

知识准备

正式开始之前,先介绍下nacos相关知识点,以便阅读和理解源码。以下内容摘抄自《nacos架构与原理》

Nacos 服务发现模块

Nacos 注册中心服务数据模型

在上文 Nacos 注册中心的设计原理中,简要介绍了服务发现的背景、业界对动态服务发现的解决 方案及 Nacos 针对动态服务发现的总体设计思路,让读者对服务发现及 Nacos 的注册中心有了⼀ 个框架性的了解。从本文开始,本书将展开介绍 Nacos 注册中心中的各种技术概念、细节及设计, 帮助读者更好地理解 Nacos 注册中心。

本节将较为详尽的展开介绍 Nacos 注册中心中的服务数据模型内容。主要会为读者详细介绍 Nacos2.0 版本中注册中心所涉及到的数据模型、各个数据模型的含义及各个数据模型的生命周期, 并介绍 Nacos2.0 版本和 Nacos1.0 版本中,服务数据模型的差异点。

服务(Service)和服务实例(Instance)

在生活中,我们被各式各样的服务包围,例如:如果生病了会到医院找医生诊断、如果网购遇到了 问题会找客服咨询,医生提供了诊断服务,客服提供了咨询服务,这位为你诊断病症的医生和为你 解答问题的客服,都是该服务的具体提供者。

在程序世界中也存在类似的情形,例如:在使用支付宝进行付款的时候,或许会要求你先登陆,验 证你的身份信息,最后才能进行支付。而这其中,可能涉及到了支付服务,登陆服务,信息验证服 务等等。而这些,都离不开服务的发现。

在服务发现领域中,服务指的是由应用程序提供的⼀个或⼀组软件功能的⼀种抽象概念(例如上述 例子的登陆或支付)。它和应用有所不同,应用的范围更广,和服务属于包含关系,即⼀个应用可 能会提供多个服务。为了能够更细粒度地区分和控制服务,Nacos 选择服务作为注册中心的最基本 概念。

而服务实例(以下简称实例)是某个服务的具体提供能力的节点,⼀个实例仅从属于⼀个服务,而 ⼀个服务可以包含⼀个或多个实例。在许多场景下,实例又被称为服务提供者(Provider),而使用该服务的实例被称为服务消费者(Consumer)。

定义服务

在 Nacos 中,服务的定义包括以下几个内容:

  • 命名空间(Namespace):Nacos 数据模型中最顶层、也是包含范围最广的概念,用于在类似 环境或租户等需要强制隔离的场景中定义。Nacos 的服务也需要使用命名空间来进行隔离。
  • 分组(Group):Nacos 数据模型中次于命名空间的⼀种隔离概念,区别于命名空间的强制隔离 属性,分组属于⼀个弱隔离概念,主要用于逻辑区分⼀些服务使用场景或不同应用的同名服务, 最常用的情况主要是同⼀个服务的测试分组和生产分组、或者将应用名作为分组以防止不同应用 提供的服务重名。
  • 服务名(Name):该服务实际的名字,⼀般用于描述该服务提供了某种功能或能力。
定义服务

之所以 Nacos 将服务的定义拆分为命名空间、分组和服务名,除了方便隔离使用场景外,还有方 便用户发现唯⼀服务的优点。在注册中心的实际使用场景上,同个公司的不同开发者可能会开发出 类似作用的服务,如果仅仅使用服务名来做服务的定义和表示,容易在⼀些通用服务上出现冲突, 比如登陆服务等。

通常推荐使用由运行环境作为命名空间、应用名作为分组和服务功能作为服务名的组合来确保该服 务的天然唯⼀性,当然使用者可以忽略命名空间和分组,仅使用服务名作为服务唯⼀标示,这就需 要使用者在定义服务名时额外增加自己的规则来确保在使用中能够唯⼀定位到该服务而不会发现到 错误的服务上。

服务元数据

服务的定义只是为服务设置了⼀些基本的信息,用于描述服务以及方便快速的找到服务,而服务的 元数据是进⼀步定义了 Nacos 中服务的细节属性和描述信息。主要包含:

  • 健康保护阈值(ProtectThreshold):为了防止因过多实例故障,导致所有流量全部流入剩余实 例,继而造成流量压力将剩余实例被压垮形成的雪崩效应。应将健康保护阈值定义为⼀个 0 到 1 之间的浮点数。当域名健康实例数占总服务实例数的比例小于该值时,无论实例是否健康,都会 将这个实例返回给客户端。这样做虽然损失了⼀部分流量,但是保证了集群中剩余健康实例能正 常工作。
  • 实例选择器(Selector):用于在获取服务下的实例列表时,过滤和筛选实例。该选择器也被称 为路由器,目前 Nacos 支持通过将实例的部分信息存储在外部元数据管理 CMDB 中,并在发现 服务时使用 CMDB 中存储的元数据标签来进行筛选的能力。
  • 拓展数据(extendData):用于用户在注册实例时自定义扩展的元数服务中拓展服务的元数据信息,方便用户实现自己的自定义逻辑。
最新版Nacos 2.X服务端源码分析(一)
服务元数据

定义实例

由于服务实例是具体提供服务的节点,因此 Nacos 在设计实例的定义时,主要需要存储该实例的 ⼀些网络相关的基础信息,主要包含以下内容:

  • 网络 IP 地址:该实例的 IP 地址,在 Nacos2.0 版本后支持设置为域名。
  • 网络端口:该实例的端口信息。
  • 健康状态(Healthy):用于表示该实例是否为健康状态,会在 Nacos 中通过健康检查的手段进 行维护,具体内容将在 Nacos 健康检查机制章节中详细说明,读者目前只需要该内容的含义即 可。
  • 集群(Cluster):用于标示该实例归属于哪个逻辑集群,有关于集群的相关内容,将在后文详细 说明。
  • 拓展数据(extendData):用于用户自定义扩展的元数据内容,形式为 K-V。可以在实例中拓展该 实例的元数据信息,方便用户实现自己的自定义逻辑和标示该实例。

实例元数据

和服务元数据不同,实例的元数据主要作用于实例运维相关的数据信息。主要包含:

  • 权重(Weight):实例级别的配置。权重为浮点数,范围为 0-10000。权重越大,分配给该实例 的流量越大。
  • 上线状态(Enabled):标记该实例是否接受流量,优先级大于权重和健康状态。用于运维人员 在不变动实例本身的情况下,快速地手动将某个实例从服务中移除。
  • 拓展数据(extendData):不同于实例定义中的拓展数据,这个拓展数据是给予运维人员在不变动 实例本身的情况下,快速地修改和新增实例的扩展数据,从而达到运维实例的作用。

在 Nacos2.0 版本中,实例数据被拆分为实例定义和实例元数据,主要是因为这两类数据其实是同 ⼀个实例的两种不同场景:开发运行场景及运维场景。对于上下线及权重这种属性,⼀般认为在实 例已经在运行时,需要运维人员手动修改和维护的数据,而 IP,端口和集群等信息,⼀般情况下在实例启动并注册后,则不会在进行变更。将这两部分数据合并后,就能够得到实例的完整信息,也 是 Nacos1.0 版本中的实例数据结构。

同时在 Nacos2.0 版本中,定义实例的这部分数据,会受到持久化属性的的影响,而实例元数据部 分,则⼀定会进行持久化;这是因为运维操作需要保证操作的原子性,不能够因为外部环境的影响 而导致操作被重置,例如在 Nacos1.0 版本中,运维人员因为实例所处的网络存在问题,操作⼀个 实例下线以此摘除流量,但是同样因为网络问题,该实例与 Nacos 的通信也收到影响,导致实例 注销后重新注册,这可能导致上线状态被重新注册而覆盖,失去了运维人员操作的优先级。

当然,这部分元数据也不应该无限制的存储下去,如果实例确实已经移除,元数据也应该移除,为 此,在 Nacos 2.0 版本后,通过该接口更新的元数据会在对应实例删除后,依旧存在⼀段时间,如 果在此期间实例重新注册,该元数据依旧生效;您可以通过 nacos.naming.clean.expired-metadat a.expired-time 及 nacos.naming.clean.expired-metadata.interval 对记忆时间进行修改。

最新版Nacos 2.X服务端源码分析(一)
实例

持久化属性

如 Nacos 注册中心的设计原理文中所述,Nacos 提供两种类型的服务:持久化服务和非持久化服 务,分别给类 DNS 的基础的服务组件场景和上层实际业务服务场景使用。为了标示该服务是哪种类 型的服务,需要在创建服务时选择服务的持久化属性。考虑到目前大多数使用动态服务发现的场景 为非持久化服务的类型(如 Spring Cloud,Dubbo,Service Mesh 等),Nacos 将缺醒值设置为 了非持久化服务。

在 Nacos2.0 版本后,持久化属性的定义被抽象到服务中,⼀个服务只能被定义成持久化服务或非 持久化服务,⼀旦定义完成,在服务生命周期结束之前,无法更改其持久化属性。

持久化属性将会影响服务及实例的数据是否会被 Nacos 进行持久化存储,设置为持久化之后,实 例将不会再被自动移除,需要使用者手动移除实例。

集群(Cluster)

集群是 Nacos 中⼀组服务实例的⼀个逻辑抽象的概念,它介于服务和实例之间,是⼀部分服务属 性的下沉和实例属性的抽象。

定义集群

在 Nacos 中,集群中主要保存了有关健康检查的⼀些信息和数据:

  • 健康检查类型(HealthCheckType):使用哪种类型的健康检查方式,MySQL;设置为 NONE 可以关闭健康检查。
  • 健康检查端口(HealthCheckPort):设置用于健康检查的端口。
  • 是否使用实例端口进行健康检查(UseInstancePort):如果使用实例使用实例定义中的网络端口进行健康检查,而不再使用上述设置的健康
  • 拓展数据(extendData):用于用户自定义扩展的元数据内容,形式为 K群的元数据信息,方便用户实现自己的自定义逻辑和标示该集群。
服务&集群&实例
生命周期

在注册中心中,实例数据都和服务实例的状态绑定,因此服务实例的状态直接决定了注册中心中实 例数据的生命周期。而服务作为实例的聚合抽象,生命周期也会由服务实例的状态来决定。

服务的⽣命周期

服务的生命周期相对比较简单,是从用户向注册中心发起服务注册的请求开始。在 Nacos 中,发起 服务注册有两种方式,⼀种是直接创建服务,⼀种是注册实例时自动创建服务;前者可以让发起者 在创建时期就制定⼀部分服务的元数据信息,而后者只会使用默认的元数据创建服务。

在生命周期期间,用户可以向服务中新增,删除服务实例,同时也能够对服务的元数据进行修改。

当用户主动发起删除服务的请求或⼀定时间内服务下没有实例(无论健康与否)后,服务才结束其 生命周期,等待下⼀次的创建。

实例的⽣命周期

实例的生命周期开始于注册实例的请求。但是根据不同的持久化属性,实例后续的生命周期有⼀定 的不同。

持久化的实例,会通过健康检查的状态维护健康状态,但是不会自动的终止该实例的生命周期;在 生命周期结束之前,持久化实例均可以被修改数据,甚至主动修改其健康状态。唯⼀终止持久化实 例生命周期的方式就是注销实例的请求。

而非持久化的实例,会根据版本的不同,采用不同的方式维持健康状态:如果是 Nacos1.0 的版本, 会通过定时的心跳请求来进行续约,当超过⼀定时间内没有心跳进行续约时,该非持久化实例则终 止生命周期;如果是 Nacos2.0 的版本,会通过 gRPC 的长连接来维持状态,当连接发生中断时, 该非持久化实例则终止生命周期。当然,非持久化实例也可以通过注销实例的请求,主动终止其生 命周期,但是由于长连接和心跳续约的存在,可能导致前⼀个实例数据的生命周期刚被终止移除, 立刻又因为心跳和长连接的补偿请求,再次开启实例的生命周期,给人⼀种注销失败的假象。

集群的⽣命周期

集群的生命周期则相对复杂,由于集群作为服务和实例的⼀个中间层,因此集群的生命周期与实例 和服务的生命周期均有关。

集群的生命周期开始与该集群第⼀个实例的生命周期同时开始,因为⼀个实例必定归属于⼀个集群, 哪怕是默认的集群,因此当第⼀个实例的生命周期开始时,也就是集群生命周期的开始;

当⼀个集群下不存在实例时,集群的生命周期也不会立刻结束,而是会等到这个服务的生命周期结 束时,才会⼀起结束生命周期。

元数据的⽣命周期

由于元数据的其对应的数据模型是紧密关联的,所以元数据的生命周期基本和对应的数据模型保持 ⼀致。但是也如前文所说,元数据通常为运维人员的主动操作的数据,会被 Nacos 进行⼀段时间 内的记忆,因此元数据的生命周期的终止相比对应的数据要滞后;若这滞后期间内,对应的数据又 重新开始生命周期,则该元数据的生命周期将被立刻重置,不再终止。

各数据的生命周期图

源码分析

服务注册

InstanceController

@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.classaction = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
    
    //从request中获取客户端传递的namespaceId
    final String namespaceId = WebUtils
            .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    //从request中获取客户端传递的serviceName
    final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    //检查服务名是否合法
    NamingUtils.checkServiceNameFormat(serviceName);
    
    //从request中获取客户端传递的参数构造instance对象
    final Instance instance = HttpRequestInstanceBuilder.newBuilder()
            .setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();
    
    //此处2.x版本与1.x版本有差异,增加了一个服务实例操作者的具体实现。由于我们是通过spring cloud alibaba nacos client的1.4.1版本发起的注册请求,所以此处仍使用1.x版本的实现InstanceOperatorServiceImpl
    getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
    return "ok";
}

InstanceOperatorServiceImpl

@Override
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
 //将v2版本的instance转换为v1版本的,原因详见概述
    com.alibaba.nacos.naming.core.Instance coreInstance = parseInstance(instance);
    //委托给内部的serviceManager注册实例
    serviceManager.registerInstance(namespaceId, serviceName, coreInstance);
}
private com.alibaba.nacos.naming.core.Instance parseInstance(Instance apiInstance) throws NacosException {
    com.alibaba.nacos.naming.core.Instance result = instanceUpgradeHelper.toV1(apiInstance);
    result.setApp(apiInstance.getMetadata().getOrDefault("app""DEFAULT"));
    result.validate();
    return result;
}

ServiceManager

关于实例注册的逻辑都在这个方法中,下面将重点分析这个方法:

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
    
    //1.创建一个空的服务
    createEmptyService(namespaceId, serviceName, instance.isEphemeral());
    
 //2.从缓存中获取服务
    Service service = getService(namespaceId, serviceName);
    
    //3.检查服务是否为null
    checkServiceIsNull(service, namespaceId, serviceName);
    
 //4.添加实例
    addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}

在正式介绍之前,先看一下nacos内部存储服务的结构,以便于理解下面要介绍的内容。

private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();

在第2节概述中也有介绍,nacos内部通过一个双层的map来缓存服务。外层使用namespaceId作为key,value为map;内层的map使用serviceName作为key,Service对象作为value存储。

createEmptyService

ServiceManager

public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {
 //如果不存在就创建一个服务
    createServiceIfAbsent(namespaceId, serviceName, local, null);
}
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
            throws NacosException 
{
    //从内部的serviceMap中获取service
    Service service = getService(namespaceId, serviceName);
    //假如我们是第一次注册服务,那么这里从缓存中得到的就是null
    if (service == null) {
        
        Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
  //实例化service
        service = new Service();
        //设置服务名
        service.setName(serviceName);
        //设置服务归属的命名空间
        service.setNamespaceId(namespaceId);
        //设置服务归属的组
        service.setGroupName(NamingUtils.getGroupName(serviceName));
        // now validate the service. if failed, exception will be thrown
        //设置服务的最后修改时间为当前系统时间
        service.setLastModifiedMillis(System.currentTimeMillis());
        //重新计算服务的MD5值
        service.recalculateChecksum();
        //由于上一步传入进来的cluster等于null,所以此处不执行
        if (cluster != null) {
            cluster.setService(service);
            service.getClusterMap().put(cluster.getName(), cluster);
        }
        //验证服务是否合法,比如服务名
        service.validate();
        
        //添加服务并且初始化服务
        putServiceAndInit(service);
        //local等于true,临时实例,以下代码不执行
        if (!local) {
            addOrReplaceService(service);
        }
    }
}

进入putServiceAndInit(service)方法

private void putServiceAndInit(Service service) throws NacosException {
  //添加服务,就是往serviceMap中设置服务
     putService(service);
     //从缓存中获取服务
     service = getService(service.getNamespaceId(), service.getName());
     //初始化服务
     service.init();
     //添加监听,先有个印象,下文会用到,Service实现了RecordListener接口
     consistencyService
             .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
     consistencyService
             .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
     Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
 }
public void putService(Service service) {
    if (!serviceMap.containsKey(service.getNamespaceId())) {
        serviceMap.putIfAbsent(service.getNamespaceId(), new ConcurrentSkipListMap<>());
    }
    serviceMap.get(service.getNamespaceId()).putIfAbsent(service.getName(), service);
}

由于Service实现了RecordListener接口,将当前service添加到DistroConsistencyServiceImpl实例内部的listeners成员变量中,下文会用到,先记住哦

@Override
public void listen(String key, RecordListener listener) throws NacosException {
    if (!listeners.containsKey(key)) {
        listeners.put(key, new ConcurrentLinkedQueue<>());
    }
    
    if (listeners.get(key).contains(listener)) {
        return;
    }
    
    listeners.get(key).add(listener);
}

进入service.init()方法

Service

public void init() {
 //添加一个心跳检查的定时任务
    HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
    //遍历service内部的clusterMap并初始化集群,由于我们是第一次注册实例,所以此时clusterMap还是空,暂不分析
    for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
        entry.getValue().setService(this);
        entry.getValue().init();
    }
}

到这里createEmptyService方法就分析完了,这个方法就是创建一个空的service对象并添加到内部的serviceMap中缓存起来,以备下文使用。由于此篇我们的主线任务是服务注册,所以服务初始化时添加的心跳检查定时任务暂不分析,下回分解,此处留个印象☺。回到ServiceManager类的registerInstance方法继续向下分析,为了方便阅读,再把代码贴一下:

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
    
    createEmptyService(namespaceId, serviceName, instance.isEphemeral());
    
    Service service = getService(namespaceId, serviceName);
    
    checkServiceIsNull(service, namespaceId, serviceName);
    
    addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}

中间2行代码很简单,请自行阅读,直接进入addInstance(namespaceId, serviceName, instance.isEphemeral(), instance)方法

addInstance

ServiceManager

public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
            throws NacosException 
{
    //根据namespaceId、serviceName以及是否是临时实例,创建一个key,由于此时我们注册的是临时实例,所以key为com.alibaba.nacos.naming.iplist.ephemeral.{namespaceId}.##.{serviceName}
    String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
    
    //从缓存中获取上一步中创建的空service
    Service service = getService(namespaceId, serviceName);
    
    synchronized (service) {
     //更新实例并返回该服务下的所有实例列表
        List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
        
        //创建一个instances对象,用于传递实例列表
        Instances instances = new Instances();
        instances.setInstanceList(instanceList);
        
        //触发一个服务变更的通知以及向集群中的其他nacos节点同步实例数据
        consistencyService.put(key, instances);
    }
}

进入addIpAddresses(service, ephemeral, ips)方法

private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {
    return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);
}
public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips)
            throws NacosException 
{
    //从consistencyService内部的DataStore中获取datum,由于是第一次注册服务,先不管,此处得到的datum等于null
    Datum datum = consistencyService
            .get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));
    
    //由于ephemeral等于true, 所以从service中获取的是所有的临时实例,又由于是第一次注册,currentIPs列表等于空
    List<Instance> currentIPs = service.allIPs(ephemeral);
    Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());
    Set<String> currentInstanceIds = CollectionUtils.set();
    
    //遍历currentIPs列表,由于currentIPs列表为空,此时不执行内部的循环
    for (Instance instance : currentIPs) {
        currentInstances.put(instance.toIpAddr(), instance);
        currentInstanceIds.add(instance.getInstanceId());
    }
    
    Map<String, Instance> instanceMap;
    //首次注册datum等于null
    if (datum != null && null != datum.value) {
        instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);
    } else {
     //创建instanceMap
        instanceMap = new HashMap<>(ips.length);
    }
    
    //ips.length等于1
    for (Instance instance : ips) {
     //首次注册,clusterMap中不包含当前实例的clusterName的key
        if (!service.getClusterMap().containsKey(instance.getClusterName())) {
         //实例化cluster,并通过构造函数传递集群的名字,以及当前集群归属于哪一个service
            Cluster cluster = new Cluster(instance.getClusterName(), service);
            //集群初始化,内部还是开启了一个心跳检查的定时任务,先不管
            cluster.init();
            //向service内部的clusterMap中设置cluster
            service.getClusterMap().put(instance.getClusterName(), cluster);
            Loggers.SRV_LOG
                    .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                            instance.getClusterName(), instance.toJson());
        }
        
        //此时action等于add,所以执行else逻辑
        if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
            instanceMap.remove(instance.getDatumKey());
        } else {
         //此处instanceMap刚创建,oldInstance等于null
            Instance oldInstance = instanceMap.get(instance.getDatumKey());
            if (oldInstance != null) {
                instance.setInstanceId(oldInstance.getInstanceId());
            } else {
                instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));
            }
            //将当前实例设置到instanceMap中
            instanceMap.put(instance.getDatumKey(), instance);
        }
        
    }
    
    if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
        throw new IllegalArgumentException(
                "ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils
                        .toJson(instanceMap.values()));
    }
    //返回所有实例
    return new ArrayList<>(instanceMap.values());
}

然后回到addInstance方法,继续进入consistencyService.put(key, instances)方法,我们注册的是临时实例,所以进入的是DistroConsistencyServiceImpl实现。

DistroConsistencyServiceImpl

@Override
public void put(String key, Record value) throws NacosException {
 //设置一个新的记录并添加一个通知
    onPut(key, value);
    // If upgrade to 2.0.X, do not sync for v1.
    if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) {
        return;
    }
    //同步实例数据到其他节点,先不管
    distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
            DistroConfig.getInstance().getSyncDelayMillis());
}

进入onPut(key, value)方法

public void onPut(String key, Record value) {
    
    //由于当前是临时实例,所以匹配
    if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
     //实例化datum
        Datum<Instances> datum = new Datum<>();
        datum.value = (Instances) value;
        datum.key = key;
        datum.timestamp.incrementAndGet();
        //设置到内部的dataStore对象中
        dataStore.put(key, datum);
    }
    
    //在createEmptyService方法中,添加了监听器,所以此处返回true
    if (!listeners.containsKey(key)) {
        return;
    }
    
    //向内部的notifier对象中添加一个任务
    notifier.addTask(key, DataOperation.CHANGE);
}

DistroConsistencyServiceImpl@Notifier

public void addTask(String datumKey, DataOperation action) {
   //如果Notifier内部的services中已经包含datumKey并且action等于DataOperation.CHANGE,什么也不做,说明已经添加过了
   if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
       return;
   }
   //如果action == DataOperation.CHANGE
   if (action == DataOperation.CHANGE) {
    //向services中添加数据
       services.put(datumKey, StringUtils.EMPTY);
   }
   //创建一个Pair并添加到内部的ArrayBlockingQueue中
   tasks.offer(Pair.with(datumKey, action));
}

因为DistroConsistencyServiceImpl内部的init方法被@PostConstruct注解标注,所以在初始化阶段,会执行init方法。

@PostConstruct
public void init() {
 //定时任务
    GlobalExecutor.submitDistroNotifyTask(notifier);
}

Notifier实现了Runnable接口,所以看一下它的run方法

@Override
public void run() {
    Loggers.DISTRO.info("distro notifier started");
    
    //循环
    for (; ; ) {
        try {
         //如果ArrayBlockingQueue中没有数据,take方法会阻塞,直到从队列中取到数据
            Pair<String, DataOperation> pair = tasks.take();
            //调用handle方法
            handle(pair);
        } catch (Throwable e) {
            Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
        }
    }
}

进入handle(pair)方法

private void handle(Pair<String, DataOperation> pair) {
    try {
        String datumKey = pair.getValue0();
        DataOperation action = pair.getValue1();
        
        //从services中移除标识
        services.remove(datumKey);
        
        int count = 0;
        
        if (!listeners.containsKey(datumKey)) {
            return;
        }
        
        //遍历listeners
        for (RecordListener listener : listeners.get(datumKey)) {
            
            count++;
            
            try {
                if (action == DataOperation.CHANGE) {
                 //调用listener的onChange方法
                    listener.onChange(datumKey, dataStore.get(datumKey).value);
                    continue;
                }
                
                if (action == DataOperation.DELETE) {
                    listener.onDelete(datumKey);
                    continue;
                }
            } catch (Throwable e) {
                Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
            }
        }
        
        if (Loggers.DISTRO.isDebugEnabled()) {
            Loggers.DISTRO
                    .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
                            datumKey, count, action.name());
        }
    } catch (Throwable e) {
        Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
    }
}

createEmptyService方法中已经将service添加到了listeners中,所以进入serviceonChange方法

Service

@Override
public void onChange(String key, Instances value) throws Exception {
    
    Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
    
    //从Instances实例中获取实例列表并遍历
    for (Instance instance : value.getInstanceList()) {
        
        if (instance == null) {
            // Reject this abnormal instance list:
            throw new RuntimeException("got null instance " + key);
        }
        
        if (instance.getWeight() > 10000.0D) {
            instance.setWeight(10000.0D);
        }
        
        if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
            instance.setWeight(0.01D);
        }
    }
    
    //更新所有实例
    updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
    
    recalculateChecksum();
}

进入updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key))方法

public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
 //创建一个空的map,用户存储实例数据
    Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
    //遍历clusterMap,copyOnWrite,考虑并发读写问题
    for (String clusterName : clusterMap.keySet()) {
        ipMap.put(clusterName, new ArrayList<>());
    }
    
    //遍历待添加的实例列表
    for (Instance instance : instances) {
        try {
            if (instance == null) {
                Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
                continue;
            }
            
            //如果实例归属的集群名等于空
            if (StringUtils.isEmpty(instance.getClusterName())) {
             //则使用默认的集群名DEFAULT
                instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
            }
            
            //如果服务的clusterMap中不包含clusterName key,由于在前几步中已经添加过了
            if (!clusterMap.containsKey(instance.getClusterName())) {
                Loggers.SRV_LOG
                        .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                                instance.getClusterName(), instance.toJson());
                //实例化cluster
                Cluster cluster = new Cluster(instance.getClusterName(), this);
                //初始化
                cluster.init();
                //添加到clusterMap中
                getClusterMap().put(instance.getClusterName(), cluster);
            }
            
            //首次注册,等于null
            List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
            if (clusterIPs == null) {
                clusterIPs = new LinkedList<>();
                ipMap.put(instance.getClusterName(), clusterIPs);
            }
            
            //将实例添加到对应的cluster中
            clusterIPs.add(instance);
        } catch (Exception e) {
            Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
        }
    }
    
    //遍历ipMap,并将新注册的实例添加到clusterMap中
    for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
        //make every ip mine
        List<Instance> entryIPs = entry.getValue();
        //请自行跟踪updateIps方法,下文的问题2的解答在此处
        clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
    }
    
    //更新service的最后修改时间为当前系统时间
    setLastModifiedMillis(System.currentTimeMillis());
    //底层通过UDP socket 向所有订阅了该服务的客户端推送服务,不在主线中,先不管
    getPushService().serviceChanged(this);
    
   //v2版本新增
    ApplicationUtils.getBean(DoubleWriteEventListener.class).doubleWriteToV2(thisephemeral);
    StringBuilder stringBuilder = new StringBuilder();
    
    //遍历所有实例,包含临时和持久化实例,输出日志
    for (Instance instance : allIPs()) {
        stringBuilder.append(instance.toIpAddr()).append('_').append(instance.isHealthy()).append(',');
    }
    
    Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),
            stringBuilder.toString());
    
}

可以看到这个方法,就是将新添加的实例合并到原先的clusterMap中。

至此关于服务注册的主线代码已经全部分析完毕。

总结

本篇分析了nacos2.x版本服务端服务注册相关源代码,相信小伙伴们阅读完,一定会对底层实现有一个初步的认识,希望大家跟着此篇的线索,自己花时间再跟读几遍,加深理解。本人也是花了几天的时间才有梳理了一个大概的脉络,nacos底层实现还是很复杂的,一定要花时间阅读哦!在跟踪代码的时候,服务端同时开启了心跳检查任务以及向nacos集群其他节点同步数据相关代码,不在本篇分析的范围内,以后单独开篇介绍。

最后感谢大家的阅读,如有错误,还请指正,谢谢!

补充

这里补充几个问题,希望引起大家的思考:

  1. 如何支持高并发注册(异步任务与内存队列设计原理及源码剖析)

    答案:采用内存队列的方式进行服务注册

    也就是说客户端在把自己的信息注册到Nacos Server的时候,并不是同步把信息写入到注册表中的,而且采取了先写入内存队列中,然后用独立的线程池来消费队列进行注册的。

    从源码可看出最终会执行listener.onChange()这个方法,并把Instances传入,然后进行真正的注册逻辑,这里的设计就是为了提高Nacos Server的并发注册量。这里再提一下,在进行队列消费的时候其实最终也是采用的JDK的线程池。

  2. 注册表如何防止多节点读写并发冲突(Copy On Write思想的实现)

    答案:Copy on write 思想

    updateIps方法中传入了一个List<Instance> ips,然后用ips跟之前注册表中的Instances进行比较,分别得出需要添加、更新、和删除的实例,然后做一些相关的操作,比如Instance的一些属性设置、启动心跳、删除心跳等等,最后把处理后的List<Instance> ips,直接替换内存注册表,这样如果同时有读的请求,其实读取是之前的老注册表的信息,这样就很好的控制了并发读写冲突问题,这个思想就是Copy On Write思想,在JDK源码中并发包里也有一些相关的实现,比如:CopyOnWriteArrayList