Nacos(2)——Nacos服务端服务注册处理
上一节回顾:
Nacos客户端通过NacosDiscoveryAutoConfiguration实现了@EnableDiscoveryClient,完成了自动配置。
通过NacosNamingService完成服务的信息构建。
通过NacosServiceRegistry向Nacos服务端发起注册。
NamingProxy是发起HTTP请求的媒介。
客户端注册到Nacos服务端是通过HTTP请求注册的:
1post http://naocs-server/nacos/v1/ns/instance
接下来我们会对Nacos服务端收到服务注册请求后的原理进行分析。
1 InstanceController
1.1 服务注册入口
既然是来自NamingProxy发起的HTTP请求,则提供注册服务的是nacos-naming模块下的InstanceController,其中InstanceController#register()方法负责客户端实例的注册服务:
1public String register(HttpServletRequest request) throws Exception {
2 // 从请求中获取注册的服务名称
3 String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
4 // 从请求中获取注册的namespaceId
5 String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
6 // 进行服务注册
7 serviceManager.registerInstance(namespaceId, serviceName, parseInstance(request));
8 return "ok";
9}
注册主要分为三个步骤:
获取能确定实例所属service的namespaceId和serviceName。
注册实例。
返回注册结果,成功注册则为"ok",注册出现异常则抛出异常。
1.2 处理服务注册
接下来进入服务注册的核心逻辑,ServiceManager#registerInstance()方法:
1public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
2 // 如果指定service不存在,则需要使用注册参数创建service
3 createEmptyService(namespaceId, serviceName, instance.isEphemeral());
4 // 从缓存获取指定service
5 Service service = getService(namespaceId, serviceName);
6
7 if (service == null) {
8 throw new NacosException(NacosException.INVALID_PARAM,
9 "service not found, namespace: " + namespaceId + ", service: " + serviceName);
10 }
11 // 向指定service中添加注册的实例
12 addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
13}
获取或创建实例需要注册到的service。
向指定service注册实例。
1.2.1 获取service
我们继续看Nacos服务端创建service的过程,ServiceManager#createServiceIfAbsent()方法:
1public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException {
2 // 从缓存中获取service,当前service实例是否已经注册,维度是namespaceId+serviceName
3 Service service = getService(namespaceId, serviceName);
4 // 需要创建新的service
5 if (service == null) {
6
7 Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
8 service = new Service();
9 service.setName(serviceName);
10 service.setNamespaceId(namespaceId);
11 service.setGroupName(NamingUtils.getGroupName(serviceName));
12 service.setLastModifiedMillis(System.currentTimeMillis());
13 service.recalculateChecksum();
14 if (cluster != null) {
15 cluster.setService(service);
16 service.getClusterMap().put(cluster.getName(), cluster);
17 }
18 // 开始校验service,如果校验不通过,则抛出异常
19 service.validate();
20 // 添加新创建的service,并进行初始化
21 // 主要工作:开始service的心跳任务调度,初始化关联的每个cluster
22 putServiceAndInit(service);
23 // 如果是持久service,则还需要进行持久化,一般情况下会使用Raft一致性协议实现进行持久化
24 if (!local) {
25 addOrReplaceService(service);
26 }
27 }
28}
再次从缓存中获取service。
如果仍没有获取Service,则开始创建service,否则完成创建动作。
设置service创建需要用的到的信息:
serviceName。
service所属的namespace。
serviceName中包含了所属的group,从serviceName中提取groupName,默认是DEFAULT_GROUP。
初始化service的更新时间戳。
如果注册的实例需要由cluster进行管理,则还需要为service关联cluster。
对service进行校验,主要校验serviceName和关联的每个clusterName命名是否符合规定。
初始化service,主要工作是开启service的心跳检查任务,初始化每一个关联的cluster。
如果注册的是持久节点,则还需要将service进行持久化。
这里需要提醒一下,一致性协议将会在后续文章进行分析,本篇文章专注于服务注册的原理,但是我们需要了解基本信息:
我们的服务注册,注册节点是临时节点,使用的是alibaba自研的Distro一致性协议算法。
如果需要将节点进行持久化,则alibaba实现了Raft协议,使用Raft协议进行持久化。
最后,创建的service会放在以namespaceId为维度的serviceName=>service的缓存中。
1.3 服务注册
1.3.1 服务注册流程
创建完service后, 会重新从缓存中获取service,之后便会向service中注册服务,ServiceManager#addInstance()方法:
1public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {
2 // 生成服务唯一的key:"com.alibaba.nacos.naming.iplist."+(是否为瞬时节点)"ephemeral"+"namespaceId"+"##"+"service:spring.application.name"
3 String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
4 // 获取服务列表,需要将实例放入到指定的服务下
5 Service service = getService(namespaceId, serviceName);
6 // 进行同步操作
7 synchronized (service) {
8 // 添加到service所在的实例列表下,返回的是更新后新的实例列表
9 List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
10 // 创建新的Instances持久化对象
11 Instances instances = new Instances();
12 instances.setInstanceList(instanceList);
13 // 使用一致性协议,存储更新后的实例数据
14 consistencyService.put(key, instances);
15 }
16}
生成服务唯一的key,命名格式为"com.alibaba.nacos.naming.iplist."+"ephemeral."+namespaceId+"##"+"服务的spring.application.name",举一个例子可以是:"com.alibaba.nacos.naming.iplist.ephemeral.public##sunshine-taurus"。
获取需要将服务注册到的service。
注册服务时,需要对service进行同步操作。
将注册的服务实例添加到service的实例列表中,并返回最新的service服务列表。
使用Distro协议,存储更新后的实例数据。
注意,构建的key不是为了注册的服务实例而用,而是为了service的服务列表而用。
因为每个服务spring.application.name是相同的。
1.3.2 注册服务到服务列表
通过调用ServiceManager#addIpAddress()方法,将注册服务添加到当前service的服务列表中:
1public List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {
2 return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);
3}
使用UPDATE_INSTANCE_ACTION_ADD添加操作,调用ServiceManager#updateIpAddresses()方法更新服务实例列表。
接下来会对ServiceManager#updateIpAddresses()方法进行逐步分析。
1.3.2.1 获取已有服务实例
首先,Nacos服务端会从一致性协议,对于临时节点,也就是从内存中获取指定service临时实例节点数据:
1// 从一致性协议中读取当前service的实例数据,如果是临时节点使用Distro协议,如果是持久节点使用Raft协议
2Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));
3// 获取当前Nacos上所有的临时实例列表
4List<Instance> currentIPs = service.allIPs(ephemeral);
5Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());
6Set<String> currentInstanceIds = Sets.newHashSet();
7// 遍历当前所有的服务列表,放入到"ip:port"=>instance字典表,和实例ID集合中
8for (Instance instance : currentIPs) {
9 currentInstances.put(instance.toIPAddr(), instance);
10 currentInstanceIds.add(instance.getInstanceId());
11}
首先,从缓存中获取指定serviceKey的实例信息。
接着,从service缓存中获取所有临时节点信息。
将service缓存中的实例信息,转换为"ip:port"=>instance集合,instanceId集合。
1.3.2.2 更新缓存信息
接着,Nacos会以一致性协议中取出的实例数据为主,缓存的中的数据为辅,对实例数据进行更新:
1Map<String, Instance> instanceMap;
2if (datum != null) {
3 // 当前存在指定service的数据,则对集群的数据进行更新后,使用此缓存
4 instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);
5} else {
6 // 否则会创建一个新缓存
7 instanceMap = new HashMap<>(ips.length);
8}
接着调用ServiceManager#setValid()方法对数据进行更新:
1private Map<String, Instance> setValid(List<Instance> oldInstances, Map<String, Instance> map) {
2
3 Map<String, Instance> instanceMap = new HashMap<>(oldInstances.size());
4 // 迭代缓存中所有指定service一致性协议数据中所有的实例
5 for (Instance instance : oldInstances) {
6 // 从集群缓存中获取实例信息
7 Instance instance1 = map.get(instance.toIPAddr());
8 if (instance1 != null) {
9 // 使用集群中信息,更新数据中的实例信息
10 instance.setHealthy(instance1.isHealthy());
11 instance.setLastBeat(instance1.getLastBeat());
12 }
13 // 重新放入到集群实例缓存中
14 instanceMap.put(instance.getDatumKey(), instance);
15 }
16 return instanceMap;
17}
在ServiceManager#setValid()方法中,主要做了:
遍历一致性协议中存储的实例列表。
使用service缓存中存储的实例信息,更新一致性协议中存储的实例信息。
返回更新后的一致性协议中存储的实例列表。
此时,已有数据列表的信息已经是最新的了。
1.3.2.3 添加实例
接下来,需要遍历所有需要添加的实例,添加到已有的实例集合中:
1for (Instance instance : ips) {
2 // 如果需要更新的实例所在集群不存在,则需要创建一个新集群
3 if (!service.getClusterMap().containsKey(instance.getClusterName())) {
4 Cluster cluster = new Cluster(instance.getClusterName(), service);
5 // 初始化集群,开始进行集群的健康检查任务
6 cluster.init();
7 // 将新创建的集群添加到指定service中
8 service.getClusterMap().put(instance.getClusterName(), cluster);
9 Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
10 instance.getClusterName(), instance.toJSON());
11 }
12 if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
13 // 如果是移除操作,则移除当前的实例
14 instanceMap.remove(instance.getDatumKey());
15 } else {
16 // 其他操作,也就是添加或更新,则直接覆盖缓存的实例对象
17 instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));
18 instanceMap.put(instance.getDatumKey(), instance);
19 }
20
21}
22// 如果在添加操作后,指定service中没有实例,证明出现了异常,需要排查问题
23if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
24 throw new IllegalArgumentException("ip list can not be empty, service: " + service.getName() + ", ip list: "
25 + JSON.toJSONString(instanceMap.values()));
26}
如果需要注册服务实例所在的cluster不存在,则会创建一个新的cluster,并关联到service上。
如果此次更新操作类型是移除操作,则从已有实例集合中,移除指定实例。
除了移除操作,其他情况下只能是添加操作,会生成实例的新instanceId,紧接着添加或更新实例集合中的实例数据。
在ServiceManager#updateIpAddresses()方法中开始部分创建的instanceId集合,用于在生成新实例的instanceId时,为SnowFlake算法提供不重复的序号。
注册完服务实例后,则返回更新后实例列表。
1.3.3 更新一致性协议数据
在注册服务实例过程中,实例列表是从一致性协议缓存中获取的。
但是在注册完成之后,并没有在ServiceManager#updateIpAddresses()方法中并没有对一致性协议中的数据进行更新,我个人认为是方法职责问题。
所以在注册完服务实例后,紧接着就是对新的service服务列表的数据进行一致性协议的更新。
注册成功后,即完成服务的service注册。
2 service管理
从我们的角度来看,添加到service的服务列表中,服务就已经注册成功了。
但是Nacos也做了很多工作,来维护我们注册的服务列表。
2.1 service初始化
从上文得知,在创建service后,我们会对service进行初始化,ServiceManager#putServiceAndInit()方法:
1// 将service放入到指定缓存中,namespaceId=>HashMap(service)
2putService(service);
3// 初始化服务
4service.init();
5// 一致性协议实现监听service的数据变更,瞬时节点或者是持久节点
6consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
7consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
8Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJSON());
2.1.1 service内部管理
继续调用Service#init()方法,完成service的初始化:
1public void init() {
2 // 开始调度心跳检查任务
3 HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
4
5 for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
6 // 设定cluster和service的绑定关系
7 entry.getValue().setService(this);
8 // 初始化每个绑定的cluster
9 entry.getValue().init();
10 }
11}
使用健康检查调度器,开始调度service内部的健康检查。
遍历service中所有的cluster,并对关联的cluster进行初始化。
2.1.2 service的心跳任务
service的心跳任务在service初始化后5s开始调度,并在之后每5s调度一次,每次执行的任务是ClientBeatCheckTask。
我们来看ClientBeatCheckTask#run()方法:
1@Override
2public void run() {
3 try {
4 // 指定service不是当前Nacos服务节点负责,则不进行任务处理
5 if (!getDistroMapper().responsible(service.getName())) {
6 return;
7 }
8 // 如果关闭了健康检查功能,也不进行任务处理
9 if (!getSwitchDomain().isHealthCheckEnabled()) {
10 return;
11 }
12 // 获取当前service下,所有的实例信息,包括属于集群的部分
13 List<Instance> instances = service.allIPs(true);
14
15 // 首先设置所有实例节点的健康状态
16 for (Instance instance : instances) {
17 // 如果心跳响应已经超过了要求的心跳时间间隔
18 if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
19 if (!instance.isMarked()) {
20 if (instance.isHealthy()) {
21 // 将当前节点的健康状态置为false
22 instance.setHealthy(false);
23 Loggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
24 instance.getIp(), instance.getPort(), instance.getClusterName(), service.getName(),
25 UtilsAndCommons.LOCALHOST_SITE, instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
26 // 发送给定service变动和实例变动事件
27 getPushService().serviceChanged(service);
28 SpringContext.getAppContext().publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
29 }
30 }
31 }
32 }
33
34 // 如果全局设置无需关心过期节点,则不对过期节点进行处理
35 if (!getGlobalConfig().isExpireInstance()) {
36 return;
37 }
38
39 // 移除过期节点
40 for (Instance instance : instances) {
41
42 if (instance.isMarked()) {
43 continue;
44 }
45 // 如果心跳时间间隔已经超出了可以移除的等待时间
46 if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
47 Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(), JSON.toJSONString(instance));
48 deleteIP(instance);
49 }
50 }
51
52 } catch (Exception e) {
53 Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
54 }
55
56}
判断当前Nacos服务节点是否负责此service的数据更新。
判断是否开启了全局健康检查功能。
在通过上面的短路校验之后,获取当前service中,每个cluster下所有的服务实例信息。
接下来会遍历每个服务实例,标记每个服务实例的健康状态,判断方式为实例的最后一次心跳间隔是否已经超出了全局配置中设定的最大心跳时间间隔。
如果有的服务实例过长时间没有上报心跳,则将此实例置为非健康状态,并发布ServiceChangeEvent和InstanceHeartbeatTimeoutEvent两个事件。
接下来还会遍历一次每个服务实例,移除已经太久没有上报心跳的服务实例,判断方式为实例的最后一次心跳间隔是否已经超出了可移除的等待时间。
2.1.3 cluster的初始化
在service的初始化过程中,也会将关联的cluster进行初始化,Cluster#init()方法:
1public void init() {
2 // 如果已经初始化,直接返回
3 if (inited) {
4 return;
5 }
6 // 构建健康检查任务
7 checkTask = new HealthCheckTask(this);
8 // 开始调度健康检查任务
9 HealthCheckReactor.scheduleCheck(checkTask);
10 inited = true;
11}
代码看起来和service类似,也是开启cluster的健康检查任务,我们继续看HealthCheckTask#run()方法:
1@Override
2public void run() {
3
4 try {
5 // 如果当前的distro负责本数据块的任务,并且开启了当前service支持健康检查
6 if (distroMapper.responsible(cluster.getService().getName()) &&
7 switchDomain.isHealthCheckEnabled(cluster.getService().getName())) {
8 // 调度执行健康检查任务
9 healthCheckProcessor.process(this);
10 if (Loggers.EVT_LOG.isDebugEnabled()) {
11 Loggers.EVT_LOG.debug("[HEALTH-CHECK] schedule health check task: {}", cluster.getService().getName());
12 }
13 }
14 } catch (Throwable e) {
15 Loggers.SRV_LOG.error("[HEALTH-CHECK] error while process health check for {}:{}",
16 cluster.getService().getName(), cluster.getName(), e);
17 } finally {
18 // 如果任务没有取消
19 if (!cancelled) {
20 // 调度下一次健康检查任务
21 HealthCheckReactor.scheduleCheck(this);
22
23 // worst == 0意味着没有进行过健康检查
24 if (this.getCheckRTWorst() > 0
25 && switchDomain.isHealthCheckEnabled(cluster.getService().getName())
26 && distroMapper.responsible(cluster.getService().getName())) {
27 // 计算两次健康检查时间间隔
28 long diff = ((this.getCheckRTLast() - this.getCheckRTLastLast()) * 10000)
29 / this.getCheckRTLastLast();
30
31 this.setCheckRTLastLast(this.getCheckRTLast());
32
33 Cluster cluster = this.getCluster();
34
35 if (Loggers.CHECK_RT.isDebugEnabled()) {
36 Loggers.CHECK_RT.debug("{}:{}@{}->normalized: {}, worst: {}, best: {}, last: {}, diff: {}", cluster.getService().getName(), cluster.getName(), cluster.getHealthChecker().getType(), this.getCheckRTNormalized(), this.getCheckRTWorst(), this.getCheckRTBest(), this.getCheckRTLast(), diff);
37 }
38 }
39 }
40 }
41}
cluster的健康检查任务调度频率不是一个固定的值,是一个2s+(0~5)s之间的一个随机数。
判断当前Nacos服务节点是否负责此service的数据更新,以及当前service是否开启了健康检查。
如果可以进行健康检查,则执行心跳任务。
在进行完此次的心跳检查之后,会对调度下一次的心跳任务,其余的逻辑则是输出一些日志信息。
在执行cluster的心跳任务时,首先调用HealthCheckProcessorDelegate#process()方法,也就是心跳代理进行处理。
心跳代理包括:
HttpHealthCheckProcessor。
MysqlHealthCheckProcessor。
NoneHealthCheckProcessor。
TcpSuperSenseProcessor。
一般情况下,cluster使用的是TcpSuperSenseProcessor处理器进行心跳任务处理,TcpSuperSenseProcessor#process()方法:
1@Override
2public void process(HealthCheckTask task) {
3 // 获取当前cluster下所有持久实例
4 List<Instance> ips = task.getCluster().allIPs(false);
5
6 if (CollectionUtils.isEmpty(ips)) {
7 return;
8 }
9 // 遍历所有持久实例
10 for (Instance ip : ips) {
11
12 if (ip.isMarked()) {
13 if (SRV_LOG.isDebugEnabled()) {
14 SRV_LOG.debug("tcp check, ip is marked as to skip health check, ip:" + ip.getIp());
15 }
16 continue;
17 }
18 // 已经标记为正在检查,则不进行处理
19 if (!ip.markChecking()) {
20 SRV_LOG.warn("tcp check started before last one finished, service: "
21 + task.getCluster().getService().getName() + ":"
22 + task.getCluster().getName() + ":"
23 + ip.getIp() + ":"
24 + ip.getPort());
25
26 healthCheckCommon.reEvaluateCheckRT(task.getCheckRTNormalized() * 2, task, switchDomain.getTcpHealthParams());
27 continue;
28 }
29 // 构建心跳任务,并添加到心跳队列中
30 Beat beat = new Beat(ip, task);
31 taskQueue.add(beat);
32 // 递增TCP健康检查计数器
33 MetricsMonitor.getTcpHealthCheckMonitor().incrementAndGet();
34 }
35}
首先,获取cluster下的服务实例列表。
接着,遍历所有的服务实例,构建服务实例的心跳任务,并添加到任务队列中。
递增TCP处理器下的健康检查计数器。
到这里大家一定非常想看任务消费和如何进行心跳任务的,但是其实cluster的心跳任务处理的是持久节点,也就是不会对我们的服务实例进行心跳请求的。
这时候你肯定会有一个疑问,那么我们的服务实例是如何进行心跳上报的?
3 客户端心跳上报
回顾上一节的服务注册,我们在初始化NacosNamingService过程中,创建了一个BeatReactor对象,所以我们从BeatRector入手,继续分析。
3.1 BeanReactor的实例化
我们先来看BeatReactor的实例化过程:
1public BeatReactor(NamingProxy serverProxy, int threadCount) {
2 // 用于发起心跳请求的命名代理
3 this.serverProxy = serverProxy;
4 // 初始化调度器
5 executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {
6 @Override
7 public Thread newThread(Runnable r) {
8 Thread thread = new Thread(r);
9 thread.setDaemon(true);
10 thread.setName("com.alibaba.nacos.naming.beat.sender");
11 return thread;
12 }
13 });
14}
使用在NacosNamingService初始化过程中创建的NamingProxy发起心跳请求,此NamingProxy对象同时也是发送服务注册请求的NamingProxy。
初始化心跳请求调度器,第一次调度时间默认为5s,不同的组件,客户端上报时间不同。
添加心跳任务是在上一篇关于服务注册文章中的NacosNamingService#registerInstance()中添加的。
3.2 心跳任务
我们继续来看心跳任务的执行,BeatReactor#BeatTask#run()方法:
1@Override
2public void run() {
3 if (beatInfo.isStopped()) {
4 return;
5 }
6 long nextTime = beatInfo.getPeriod();
7 try {
8 // 使用NamingProxy发送心跳请求
9 JSONObject result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
10 // Nacos服务端会返回下一次心跳时间间隔
11 long interval = result.getIntValue("clientBeatInterval");
12 boolean lightBeatEnabled = false;
13 if (result.containsKey(CommonParams.LIGHT_BEAT_ENABLED)) {
14 lightBeatEnabled = result.getBooleanValue(CommonParams.LIGHT_BEAT_ENABLED);
15 }
16 BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
17 // 设置下一次进行心跳的时间
18 if (interval > 0) {
19 nextTime = interval;
20 }
21 int code = NamingResponseCode.OK;
22 if (result.containsKey(CommonParams.CODE)) {
23 code = result.getIntValue(CommonParams.CODE);
24 }
25 // 如果返回没有找到对应的资源,则会重新进行注册
26 if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
27 Instance instance = new Instance();
28 instance.setPort(beatInfo.getPort());
29 instance.setIp(beatInfo.getIp());
30 instance.setWeight(beatInfo.getWeight());
31 instance.setMetadata(beatInfo.getMetadata());
32 instance.setClusterName(beatInfo.getCluster());
33 instance.setServiceName(beatInfo.getServiceName());
34 instance.setInstanceId(instance.getInstanceId());
35 instance.setEphemeral(true);
36 try {
37 serverProxy.registerService(beatInfo.getServiceName(),
38 NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
39 } catch (Exception ignore) {
40 }
41 }
42 } catch (NacosException ne) {
43 NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
44 JSON.toJSONString(beatInfo), ne.getErrCode(), ne.getErrMsg());
45
46 }
47 // 调度下一次心跳任务
48 executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
49}
首先,使用NamingProxy发送心跳请求。
接着,执行收到心跳响应之后的处理,心跳响应中包含下一次进行心跳的时间间隔。
如果Nacos服务端返回没有找到服务实例,则会进行重新注册。
最后,调度下一次的心跳任务,使用响应中返回的下一次进行心跳的时间间隔。
4 Nacos处理心跳请求
1put http://naocs-server/nacos/v1/ns/instance/beat
2
看起来和本篇文章开头的URL类型,没错,服务端心跳请求的处理也是在InstanceController中。
4.1 收到心跳请求
我们逐步来看InstanceController#beat()方法。
4.2 解析心跳请求
首先是对心跳请求的内容进行解析:
1JSONObject result = new JSONObject();
2// 先将下一次心跳请求的时间间隔写入到响应中
3result.put("clientBeatInterval", switchDomain.getClientBeatInterval());
4// 获取心跳请求实例的serviceName,namespaceId和集群ID
5String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
6String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID,
7 Constants.DEFAULT_NAMESPACE_ID);
8String clusterName = WebUtils.optional(request, CommonParams.CLUSTER_NAME,
9 UtilsAndCommons.DEFAULT_CLUSTER_NAME);
10String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
11int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
12// 获取service实例注册信息
13String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
14
15RsInfo clientBeat = null;
16if (StringUtils.isNotBlank(beat)) {
17 // 解析为RsInfo对象
18 clientBeat = JSON.parseObject(beat, RsInfo.class);
19}
20
21if (clientBeat != null) {
22 if (StringUtils.isNotBlank(clientBeat.getCluster())) {
23 // 更新集群名称
24 clusterName = clientBeat.getCluster();
25 } else {
26 // 解决#2533问题,避免集群名称为null
27 clientBeat.setCluster(clusterName);
28 }
29 ip = clientBeat.getIp();
30 port = clientBeat.getPort();
31}
获取的信息有namespaceId,serviceName,clusterName,ip,port,心跳实例信息:
namespaceId+serviceName用于确定实例所属的service。
clusterName用于获取service下的服务实例列表。
ip+port用于锁定指定服务实例。
4.3 没有服务实例注册服务实例
如果当前service中没有此实例,而心跳请求时已经将所有的实例信息带过来了,直接注册即可:
1// 从service缓存中获取指定实例信息
2Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
3// 如果实例信息为null,则可能数据还没有更新,会注册实例
4if (instance == null) {
5 if (clientBeat == null) {
6 result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
7 return result;
8 }
9 instance = new Instance();
10 instance.setPort(clientBeat.getPort());
11 instance.setIp(clientBeat.getIp());
12 instance.setWeight(clientBeat.getWeight());
13 instance.setMetadata(clientBeat.getMetadata());
14 instance.setClusterName(clusterName);
15 instance.setServiceName(serviceName);
16 instance.setInstanceId(instance.getInstanceId());
17 instance.setEphemeral(clientBeat.isEphemeral());
18 // 向Nacos集群中注册实例
19 serviceManager.registerInstance(namespaceId, serviceName, instance);
20}
4.4 心跳请求处理
最后,则是正常的业务逻辑,对心跳请求进行处理:
1// 重新获取service
2Service service = serviceManager.getService(namespaceId, serviceName);
3
4if (service == null) {
5 throw new NacosException(NacosException.SERVER_ERROR,
6 "service not found: " + serviceName + "@" + namespaceId);
7}
8if (clientBeat == null) {
9 clientBeat = new RsInfo();
10 clientBeat.setIp(ip);
11 clientBeat.setPort(port);
12 clientBeat.setCluster(clusterName);
13}
14// 使用service来处理心跳请求
15service.processClientBeat(clientBeat);
16
17result.put(CommonParams.CODE, NamingResponseCode.OK);
18result.put("clientBeatInterval", instance.getInstanceHeartBeatInterval());
19result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
20return result;
我们继续看Service#processClientBeat()方法:
1public void processClientBeat(final RsInfo rsInfo) {
2 // 创建客户端心跳请求任务
3 ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
4 clientBeatProcessor.setService(this);
5 clientBeatProcessor.setRsInfo(rsInfo);
6 // 使用健康检查调度模型调度客户端的心跳请求任务
7 HealthCheckReactor.scheduleNow(clientBeatProcessor);
8}
同样是异步处理,创建一个ClientBeatProcessor异步处理对象,并立即调度处理。
我们继续看ClientBeatProcessor#run()方法来分析具体的执行过程:
1@Override
2public void run() {
3 Service service = this.service;
4 if (Loggers.EVT_LOG.isDebugEnabled()) {
5 Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
6 }
7 // 获取ip,port,clusterName
8 String ip = rsInfo.getIp();
9 String clusterName = rsInfo.getCluster();
10 int port = rsInfo.getPort();
11 Cluster cluster = service.getClusterMap().get(clusterName);
12 // 获取指定service,指定cluster下的所有临时节点
13 List<Instance> instances = cluster.allIPs(true);
14 // 遍历所有的临时节点
15 for (Instance instance : instances) {
16 // 如果找到了心跳上报的客户端节点
17 if (instance.getIp().equals(ip) && instance.getPort() == port) {
18 if (Loggers.EVT_LOG.isDebugEnabled()) {
19 Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
20 }
21 // 设置实例的最新心跳信息
22 instance.setLastBeat(System.currentTimeMillis());
23 // 如果节点没有被标记,并且处于非健康状态,则更新节点状态为健康状态
24 if (!instance.isMarked()) {
25 if (!instance.isHealthy()) {
26 instance.setHealthy(true);
27 Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
28 cluster.getService().getName(), ip, port, cluster.getName(), UtilsAndCommons.LOCALHOST_SITE);
29 getPushService().serviceChanged(service);
30 }
31 }
32 }
33 }
34}
获取给定service下,指定cluster的临时服务实例列表。
遍历cluster下所有临时服务列表,根据ip+port确定心跳请求的服务实例对象。
更新服务实例的最新心跳时间。
如果服务实例未被标记,并且处于非健康状态,则发送ServiceChangeEvent事件。
未被标记的服务实例无需进行心跳更新和心跳检查。
总结
本篇主要对Nacos服务端对服务注册请求的处理进行分析:
Nacos服务端采用了service=>cluster=>instance的模型管理服务实例。
service和cluster均有心跳任务用于维护服务实例的正常运行,也就是我们经常说的AP模式。
在存储服务实例数据时,Nacos使用了阿里巴巴自研的Distro算法。
客户端的心跳上报不由Nacos服务端发起,而是由客户端自行调度心跳任务。