vlambda博客
学习文章列表

用过Nacos,那就说说Nacos服务注册的原理吧!

做积极的人,而不是积极废人!

每天 14:00 更新文章,每天掉亿点点头发...

源码精品专栏

 




Nacos 是目前国内非常火的一个服务注册与发现的中间件,有不少公司都在采用 Nacos,因此面试中被问到的概率也是非常高的!

  • 服务提供者把自己的协议地址注册到Nacos server
  • 服务消费者需要从Nacos Server上去查询服务提供者的地址(根据服务名称)
  • Nacos Server需要感知到服务提供者的上下线的变化
  • 服务消费者需要动态感知到Nacos Server端服务地址的变化

作为注册中心所需要的能力大多如此,我们需要做的是理解各种注册中心的独有特性,总结他们的共性。

推荐下自己做的 Spring Boot 的实战项目:

https://github.com/YunaiV/ruoyi-vue-pro

下面我们先来了解一下 Nacos 注册中心的实现原理,通过下面这张图来说明。

Nacos 注册中心的实现原理

推荐下自己做的 Spring Cloud 的实战项目:

https://github.com/YunaiV/onemall

「服务注册的流程:」

在基于Dubbo服务发布的过程中, 自动装配是走的事件监听机制,在 DubboServiceRegistrationNonWebApplicationAutoConfiguration 这个类中,这个类会监听 ApplicationStartedEvent 事件,这个事件是spring boot在2.0新增的,就是当spring boot应用启动完成之后会发布这个时间。而此时监听到这个事件之后,会触发注册的动作。

@EventListener(ApplicationStartedEvent.class)
public void onApplicationStarted() 
{
    setServerPort();
    register();
}

private void register() {
    if (registered) {
        return;
    }
    serviceRegistry.register(registration);
    registered = true;
}

serviceRegistry是 spring-cloud 提供的接口实现(org.springframework.cloud.client.serviceregistry.ServiceRegistry),很显然注入的实例是:NacosServiceRegistry。

NacosServiceRegistry

然后进入到实现类的注册方法:

    @Override
    public void register(Registration registration) {
        if (StringUtils.isEmpty(registration.getServiceId())) {
            log.warn("No service to register for nacos client...");
            return;
        }
        //对应当前应用的application.name
        String serviceId = registration.getServiceId();
        //表示nacos上的分组配置
        String group = nacosDiscoveryProperties.getGroup();
        //表示服务实例信息
        Instance instance = getNacosInstanceFromRegistration(registration);

        try {
            //通过命名服务进行注册
            namingService.registerInstance(serviceId, group, instance);
            log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
                    instance.getIp(), instance.getPort());
        } catch (Exception e) {
            log.error("nacos registry, {} register failed...{},", serviceId,
                    registration.toString(), e);
            // rethrow a RuntimeException if the registration is failed.
            // issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132
            rethrowRuntimeException(e);
        }
    }    

接下去就是开始注册实例,主要做两个动作

  1. 如果当前注册的是临时节点,则构建心跳信息,通过beat反应堆来构建心跳任务
  2. 调用registerService发起服务注册
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
        ////是否是临时节点,如果是临时节点,则构建心跳信息
        if (instance.isEphemeral()) {
            BeatInfo beatInfo = new BeatInfo();
            beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
            beatInfo.setIp(instance.getIp());
            beatInfo.setPort(instance.getPort());
            beatInfo.setCluster(instance.getClusterName());
            beatInfo.setWeight(instance.getWeight());
            beatInfo.setMetadata(instance.getMetadata());
            beatInfo.setScheduled(false);

            //beatReactor, 添加心跳信息进行处理
        beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
        }
          //调用服务代理类进行注册   
          serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}

然后调用 NamingProxy  的注册方法进行注册,代码逻辑很简单,构建请求参数,发起请求。

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {

        NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",
            namespaceId, serviceName, instance);

        final Map<String, String> params = new HashMap<String, String>(8);
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.SERVICE_NAME, serviceName);
        params.put(CommonParams.GROUP_NAME, groupName);
        params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
        params.put("ip", instance.getIp());
        params.put("port", String.valueOf(instance.getPort()));
        params.put("weight", String.valueOf(instance.getWeight()));
        params.put("enable", String.valueOf(instance.isEnabled()));
        params.put("healthy", String.valueOf(instance.isHealthy()));
        params.put("ephemeral", String.valueOf(instance.isEphemeral()));
        params.put("metadata", JSON.toJSONString(instance.getMetadata()));

        reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);

    }
public String reqAPI(String api, Map<String, String> params, List<String> servers, String method) {

        params.put(CommonParams.NAMESPACE_ID, getNamespaceId());

        if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(nacosDomain)) {
            throw new IllegalArgumentException("no server available");
        }

        Exception exception = new Exception();
        //如果服务地址不为空
        if (servers != null && !servers.isEmpty()) {
            //随机获取一台服务器节点
            Random random = new Random(System.currentTimeMillis());
            int index = random.nextInt(servers.size());
            // 遍历服务列表
            for (int i = 0; i < servers.size(); i++) {
                String server = servers.get(index);//获得索引位置的服务节点
                try {//调用指定服务
                    return callServer(api, params, server, method);
                } catch (NacosException e) {
                    exception = e;
                    NAMING_LOGGER.error("request {} failed.", server, e);
                } catch (Exception e) {
                    exception = e;
                    NAMING_LOGGER.error("request {} failed.", server, e);
                }
               //轮询
                index = (index + 1) % servers.size();
            }
       // ..........
}

最后通过 callServer(api, params, server, method) 发起调用,这里通过 JSK自带的 HttpURLConnection 进行发起调用。我们可以通过断点的方式来看到这里的请求参数:

HttpURLConnection 进行发起调用

「Nacos服务端的处理:」

服务端提供了一个InstanceController类,在这个类中提供了服务注册相关的API,而服务端发起注册时,调用的接口是:[post]: /nacos/v1/ns/instance ,serviceName: 代表客户端的项目名称 ,namespace: nacos 的namespace。

@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.classaction = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
        
        final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        final String namespaceId = WebUtils
                .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        // 从请求中解析出instance实例
        final Instance instance = parseInstance(request);
        
        serviceManager.registerInstance(namespaceId, serviceName, instance);
        return "ok";
}

然后调用 ServiceManager 进行服务的注册

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
        //创建一个空服务,在Nacos控制台服务列表展示的服务信息,实际上是初始化一个serviceMap,它是一个ConcurrentHashMap集合
        createEmptyService(namespaceId, serviceName, instance.isEphemeral());
        //从serviceMap中,根据namespaceId和serviceName得到一个服务对象
        Service service = getService(namespaceId, serviceName);
        
        if (service == null) {
            throw new NacosException(NacosException.INVALID_PARAM,
                    "service not found, namespace: " + namespaceId + ", service: " + serviceName);
        }
        //调用addInstance创建一个服务实例
        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}

在创建空的服务实例的时候我们发现了存储实例的map:

public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
            throws NacosException 
{
        //从serviceMap中获取服务对象
        Service service = getService(namespaceId, serviceName);
        if (service == null) {//如果为空。则初始化
      Loggers.SRV\_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
      service = new Service();
      service.setName(serviceName);
      service.setNamespaceId(namespaceId);
      service.setGroupName(NamingUtils.getGroupName(serviceName));
      // now validate the service. if failed, exception will be thrown
      service.setLastModifiedMillis(System.currentTimeMillis());
      service.recalculateChecksum();
      if (cluster != null) {
          cluster.setService(service);
          service.getClusterMap().put(cluster.getName(), cluster);
      }
      service.validate();
      putServiceAndInit(service);
      if (!local) {
          addOrReplaceService(service);
      }
}

在 getService 方法中我们发现了Map:

/*
* Map(namespace, Map(group::serviceName, Service)).
*/

private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();

通过注释我们可以知道,Nacos是通过不同的 namespace 来维护服务的,而每个namespace下有不同的group,不同的group下才有对应的Service ,再通过这个 serviceName 来确定服务实例。

第一次进来则会进入初始化,初始化完会调用 putServiceAndInit

private void putServiceAndInit(Service service) throws NacosException {
        putService(service);//把服务信息保存到serviceMap集合
        service.init();//建立心跳检测机制
        //实现数据一致性监听,ephemeral(标识服务是否为临时服务,默认是持久化的,也就是true)=true表示采用raft协议,false表示采用Distro
        consistencyService
                .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
        consistencyService
                .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
        Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
    }

获取到服务以后把服务实例添加到集合中,然后基于一致性协议进行数据的同步。然后调用 addInstance

public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
            throws NacosException 
{
        // 组装key
        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
        // 获取刚刚组装的服务
        Service service = getService(namespaceId, serviceName);
        
        synchronized (service) {
            List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
            
            Instances instances = new Instances();
            instances.setInstanceList(instanceList);
            // 也就是上一步实现监听的类里添加注册服务
            consistencyService.put(key, instances);
        }
    }

然后给服务注册方发送注册成功的响应,结束服务注册流程。以上内容,希望大家有一个大概的认识,收藏起来,后面慢慢多看几次,牢记心中,面试中肯定是加分项。



已在知识星球更新源码解析如下:

最近更新《芋道 SpringBoot 2.X 入门》系列,已经 101 余篇,覆盖了 MyBatis、Redis、MongoDB、ES、分库分表、读写分离、SpringMVC、Webflux、权限、WebSocket、Dubbo、RabbitMQ、RocketMQ、Kafka、性能测试等等内容。

提供近 3W 行代码的 SpringBoot 示例,以及超 6W 行代码的电商微服务项目。

文章有帮助的话,在看,转发吧。

谢谢支持哟 (*^__^*)