vlambda博客
学习文章列表

Nacos源码(九)2.0注册中心

前言

本章学习Nacos2.0版本的注册中心,比较与1.4.1版本的区别。

历史相关文章:




一、模型变更

客户端

对于客户端来说,调用的API没有变化,命名服务实现类仍然是NacosNamingService,服务实例仍然是com.alibaba.nacos.api.naming.pojo.Instance。

public class Instance implements Serializable {
    private String instanceId;
    private String ip;
    private int port;
    private double weight = 1.0D;
    private boolean healthy = true;
    private boolean enabled = true;
    private boolean ephemeral = true;
    private String clusterName;
    private String serviceName;
    private Map<String, String> metadata = new HashMap<String, String>();
}

ServiceInfo也没有变,分组+服务+集群对应一组Instance。

public class ServiceInfo {
    // 服务名
    private String name;
    // 分组名
    private String groupName;
    // 集群,逗号分割
    private String clusters;
    // Instance实例
    private List<Instance> hosts = new ArrayList<Instance>();
}

存储服务注册表的容器由HostReactor改为了ServiceInfoHolder,主要是负责管理failover注册表和内存注册表。1.x中HostReactor调用远程接口的逻辑都放到了NamingClientProxy中。

public class ServiceInfoHolder implements Closeable {
    // key=groupName@@serviceName@@clusterName
    private final ConcurrentMap<String, ServiceInfo> serviceInfoMap;

    private final FailoverReactor failoverReactor;
}

2.0ServiceInfoHolder逻辑简单。

// ServiceInfoHolder
public ServiceInfo getServiceInfo(final String serviceName, final String groupName, final String clusters) {
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    String key = ServiceInfo.getKey(groupedServiceName, clusters);
    // 1. 如果failover开启,取failover注册表
    if (failoverReactor.isFailoverSwitch()) {
        return failoverReactor.getService(key);
    }
    // 2. 内存注册表
    return serviceInfoMap.get(key);
}

服务端

模型

服务端不再使用Service、Cluster、Instance模型,改为Service、Instance、Client、Connection。

Nacos源码(九)2.0注册中心
Service&Instance&Client.png

2.0Service使用com.alibaba.nacos.naming.core.v2.pojo.Service。

public class Service {
    private final String namespace;
    private final String group;
    private final String name;
    private final boolean ephemeral;
}

2.0Service区别于1.x:

  • Instance的临时与持久概念上浮到Service,是否临时取决于首次注册的实例是否临时。

  • 1.x用Service管理下面的Cluster,2.0Service与Cluster无关了,Cluster下沉到Instance。

  • 1.x的Service.name=groupName@@serviceName,2.0将groupName和serviceName独立了。

2.0承载Service的容器仍然是ServiceManager,但是在com.alibaba.nacos.naming.core.v2包下,容器中Service都是单例。

public class ServiceManager {
    private static final ServiceManager INSTANCE = new ServiceManager();
    // 单例Service,见Service的equals和hasCode方法
    private final ConcurrentHashMap<Service, Service> singletonRepository;
    // namespace-下属所有Service
    private final ConcurrentHashMap<String, Set<Service>> namespaceSingletonMaps;
}

关注Service的equals和hasCode方法,namespace+group+name在服务端是一个单例Service。

@Override
public boolean equals(Object o) {
    if (this == o) {
        return true;
    }
    if (!(o instanceof Service)) {
        return false;
    }
    Service service = (Service) o;
    return namespace.equals(service.namespace) && group.equals(service.group) && name.equals(service.name);
}

@Override
public int hashCode() {
    return Objects.hash(namespace, group, name);
}

2.0没有再使用Cluster模型,cluster成为Instance里的属性。服务端Instance模型是InstancePublishInfo。

public class InstancePublishInfo implements Serializable {
    private String ip;
    private int port;
    private boolean healthy;
    private String cluster;
    // 元数据 metadata
    private Map<String, Object> extendDatum;
}

从Service和Instance来看,两者并没有建立关系。2.0新增新增Client模型。一个客户端gRPC长连接对应一个Client,每个Client有自己唯一的id(clientId)。Client负责管理一个客户端的服务实例注册Publish和服务订阅Subscribe。

/**
 * Nacos naming client.
 *
 * <p>The abstract concept of the client stored by on the server of Nacos naming module. It is used to store which
 * services the client has published and subscribed.
 */

public interface Client {
    // 客户端id/gRPC的connectionId
    String getClientId();

    // 是否临时客户端
    boolean isEphemeral();
    // 客户端更新时间
    void setLastUpdatedTime();
    long getLastUpdatedTime();

    // 服务实例注册/注销/查询
    boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo);
    InstancePublishInfo removeServiceInstance(Service service);
    InstancePublishInfo getInstancePublishInfo(Service service);
    Collection<Service> getAllPublishedService();

    // 服务订阅/取消订阅/查询订阅
    boolean addServiceSubscriber(Service service, Subscriber subscriber);
    boolean removeServiceSubscriber(Service service);
    Subscriber getSubscriber(Service service);
    Collection<Service> getAllSubscribeService();
    // 生成同步给其他节点的client数据
    ClientSyncData generateSyncData();
    // 是否过期
    boolean isExpire(long currentTime);
    // 释放资源
    void release();
}

ConnectionManager管理所有连接Connection。

@Service
public class ConnectionManager extends Subscriber<ConnectionLimitRuleChangeEvent{
    // clientid -> connection
    Map<String, Connection> connections = new ConcurrentHashMap<String, Connection>();
}

服务注册

如何注册Instance,建立三者关系?

从注册中心的传统概念来说,服务注册是把Instance注册到Service之下,服务发现是根据Service找到Instance,2.0nacos没有让Instance与Service直接发生关联。

从服务端服务注册处理来看看,Service与Instance的关联关系如何建立。

InstanceRequestHandler负责接收InstanceRequest,接入服务注册与注销请求。

@Component
public class InstanceRequestHandler extends RequestHandler<InstanceRequestInstanceResponse{
    private final EphemeralClientOperationServiceImpl clientOperationService;
    @Override
    public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
        Service service = Service
                .newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
        switch (request.getType()) {
            // 注册
            case NamingRemoteConstants.REGISTER_INSTANCE:
                return registerInstance(service, request, meta);
            // 注销
            case NamingRemoteConstants.DE_REGISTER_INSTANCE:
                return deregisterInstance(service, request, meta);
            default:
                throw new NacosException(NacosException.INVALID_PARAM);
        }
    }
    private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta) {
        clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());
        return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
    }
    private InstanceResponse deregisterInstance(Service service, InstanceRequest request, RequestMeta meta) {
        clientOperationService.deregisterInstance(service, request.getInstance(), meta.getConnectionId());
        return new InstanceResponse(NamingRemoteConstants.DE_REGISTER_INSTANCE);
    }
}

EphemeralClientOperationServiceImpl实际负责处理服务注册。

@Override
public void registerInstance(Service service, Instance instance, String clientId) {
    // 1. 确保Service单例存在,注意Service的equals和hasCode方法
    Service singleton = ServiceManager.getInstance().getSingleton(service);
    // 2. 根据长连接id,找到客户端,这个关系在连接建立的时候存储
    Client client = clientManager.getClient(clientId);
    // 3. 客户端Instance模型,转换为服务端Instance模型
    InstancePublishInfo instanceInfo = getPublishInfo(instance);
    // 4. 将Instance存储到Client里
    client.addServiceInstance(singleton, instanceInfo);
    client.setLastUpdatedTime();
    // 5. 建立service->clientId的关系
    NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
    NotifyCenter
            .publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
}

第一步:ServiceManager负责管理Service单例

map存储单例Service,不同于1.x版本需要拼接key,这里Service重写equals和hasCode方法作为key。

// ServiceManager.java
// 单例Service,见Service的equals和hasCode方法
private final ConcurrentHashMap<Service, Service> singletonRepository;
// namespace-下属所有Service
private final ConcurrentHashMap<String, Set<Service>> namespaceSingletonMaps;
public Service getSingleton(Service service) {
    singletonRepository.putIfAbsent(service, service);
    Service result = singletonRepository.get(service);
    namespaceSingletonMaps.computeIfAbsent(result.getNamespace(), (namespace) -> new ConcurrentHashSet<>());
    namespaceSingletonMaps.get(result.getNamespace()).add(result);
    return result;
}

第二步:ConnectionBasedClientManager负责管理长连接clientId与Client模型的映射关系

Client实例(ConnectionBasedClient)是在gRPC长连接建立时保存到ConnectionBasedClientManager中的,这里省略代码。

@Component("connectionBasedClientManager")
public class ConnectionBasedClientManager extends ClientConnectionEventListener implements ClientManager {
    private final ConcurrentMap<String, ConnectionBasedClient> clients = new ConcurrentHashMap<>();
    // 长连接建立,存储
    public boolean clientConnected(Client client) {
        if (!clients.containsKey(client.getClientId())) {
            clients.putIfAbsent(client.getClientId(), (ConnectionBasedClient) client);
        }
        return true;
    }
      // 根据clientId查询Client
    public Client getClient(String clientId) {
        return clients.get(clientId);
    }
}

第三步:客户端Instance模型,转换为服务端Instance模型InstancePublishInfo

原来1.0的metadata->2.0的extendDataum。enable、weight等属性移动到元数据里。

// ClientOperationService
default InstancePublishInfo getPublishInfo(Instance instance) {
    InstancePublishInfo result = new InstancePublishInfo(instance.getIp(), instance.getPort());
    if (null != instance.getMetadata() && !instance.getMetadata().isEmpty()) {
        result.getExtendDatum().putAll(instance.getMetadata());
    }
    if (StringUtils.isNotEmpty(instance.getInstanceId())) {
        result.getExtendDatum().put(Constants.CUSTOM_INSTANCE_ID, instance.getInstanceId());
    }
    if (Constants.DEFAULT_INSTANCE_WEIGHT != instance.getWeight()) {
        result.getExtendDatum().put(Constants.PUBLISH_INSTANCE_WEIGHT, instance.getWeight());
    }
    if (!instance.isEnabled()) {
        result.getExtendDatum().put(Constants.PUBLISH_INSTANCE_ENABLE, instance.isEnabled());
    }
    String clusterName = StringUtils.isBlank(instance.getClusterName()) ? UtilsAndCommons.DEFAULT_CLUSTER_NAME
            : instance.getClusterName();
    result.setHealthy(instance.isHealthy());
    result.setCluster(clusterName);
    return result;
}

第四步:建立Client->Service->Instance的关系

ConnectionBasedClient的抽象父类,负责存储当前客户端的服务注册表,即Service与Instance的关系。注意对于单个客户端来说,同一个服务只能注册一个实例。

public abstract class AbstractClient implements Client {

    protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(160.75f1);

    @Override
    public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
        if (null == publishers.put(service, instancePublishInfo)) {
            MetricsMonitor.incrementInstanceCount();
        }
        NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
        return true;
    }
}

第五步:建立Service与Client的关系

为什么要有这一步?

其实有上面几步就可以保证服务注册表能被查询了(Service-Instance)。

比如遍历ConnectionBasedClientManager的所有Client,再通过AbstractClient拿到每个客户端注册的服务,过滤目标服务得到最终Instance列表。伪代码如下:

for (Client c : ConnectionBasedClientManager.clients.values()) { // 遍历所有Client
  for (Entry e : c.publishers.entrySet()) { // 遍历Client下注册的service-instance
    if (e.key == targetService) { // 过滤service
      result.add(e.value); // 得到instance
    } 
  } 
}

显然时间复杂度太高。此外,如果服务变更,对于服务订阅来说,需要直接获取Service与订阅Client也需要这样的关系,建立Service与Client的关系就是为了加速查询。

第五步发布ClientRegisterServiceEvent事件,ClientServiceIndexesManager监听,ClientServiceIndexesManager维护了两个索引:

  • Service与发布clientId

  • Service与订阅clientId

// ClientServiceIndexesManager.java
private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();
private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();
private void handleClientOperation(ClientOperationEvent event) {
    Service service = event.getService();
    String clientId = event.getClientId();
    if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
        addPublisherIndexes(service, clientId);
    } else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
        removePublisherIndexes(service, clientId);
    } else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
        addSubscriberIndexes(service, clientId);
    } else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
        removeSubscriberIndexes(service, clientId);
    }
}
// 建立Service与发布Client的关系
private void addPublisherIndexes(Service service, String clientId) {
  publisherIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
  publisherIndexes.get(service).add(clientId);
  NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
}

这个索引关系建立以后,还会触发ServiceChangedEvent,代表服务注册表变更。对于注册表变更紧接着还要做两个事情:1)通知订阅客户端 2)Nacos集群数据同步。这两点后续再说。

持久实例注册怎么说?

注册持久实例需要走一遍Raft流程。其他逻辑与临时实例完全一致,只是模型不同,比如Client实例是IpPortBasedClient。

PersistentClientOperationServiceImpl#onInstanceRegister方法,当Raft流程走完以后,应用log到当前节点。

// PersistentClientOperationServiceImpl
private void onInstanceRegister(Service service, Instance instance, String clientId) {
  // 1. 注册service
  Service singleton = ServiceManager.getInstance().getSingleton(service);
  // 2. 注册client
  Client client = clientManager.computeIfAbsent(clientId, () -> new IpPortBasedClient(clientId, false));
  // 3. 转换instance
  InstancePublishInfo instancePublishInfo = getPublishInfo(instance);
  // 4. 注册instance
  client.addServiceInstance(singleton, instancePublishInfo);
  client.setLastUpdatedTime();
  // 5. 构建索引
  NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
}

服务发现

如何根据service获取所有的instance?

对于服务监听来说,客户端最终肯定是将订阅服务Service与clientId存储在ClientServiceIndexesManager,可以参考上面一小节。

对于服务查询来说,虽然ClientServiceIndexesManager建立了Service与Client的关系,但是直接通过Service查Instance还是太慢了,伪代码如下:

// 1. 循环注册这个服务的所有客户端
for (String clientId : ClientServiceIndexesManager.publisherIndexes.get(targetService)) {
  Client client = ConnectionBasedClientManager.get(clientId);
    // 2. 获取客户端下这个服务的所有实例
  result.addAll(client.publishers.values());
}

能否从传统概念,通过Service直接获取Instance列表,跳过Client关联关系,也是可以的,再做一层索引。

ServiceQueryRequestHandler负责处理客户端查询Service下Instance。分为三步:

  • 根据service查询instance

  • 根据service查询service的元数据,2.0将各种元数据都分开存储了,也是根据service单例找到对应元数据NamingMetadataManager维护了这种mapping关系

  • instance过滤,内部包括可用过滤、健康过滤、保护模式等逻辑

public class ServiceQueryRequestHandler extends RequestHandler<ServiceQueryRequestQueryServiceResponse{
    private final ServiceStorage serviceStorage;
    private final NamingMetadataManager metadataManager;
    public QueryServiceResponse handle(ServiceQueryRequest request, RequestMeta meta) throws NacosException {
        String namespaceId = request.getNamespace();
        String groupName = request.getGroupName();
        String serviceName = request.getServiceName();
        Service service = Service.newService(namespaceId, groupName, serviceName);
        String cluster = null == request.getCluster() ? "" : request.getCluster();
        boolean healthyOnly = request.isHealthyOnly();
        // 1. 根据service查询Instance
        ServiceInfo result = serviceStorage.getData(service);
        // 2. 根据service查询service的元数据
        ServiceMetadata serviceMetadata = metadataManager.getServiceMetadata(service).orElse(null);
        // 3. instance过滤 健康?可用?等等
        result = ServiceUtil.selectInstancesWithHealthyProtection(result, serviceMetadata, cluster, healthyOnly, true);
        return QueryServiceResponse.buildSuccessResponse(result);
    }
}

ServiceStorage会根据上述伪代码逻辑,查询到service下的instance(ServiceInfo),对查询结果做缓存。

// service-instance
private final ConcurrentMap<Service, ServiceInfo> serviceDataIndexes;
// service-cluster
private final ConcurrentMap<Service, Set<String>> serviceClusterIndex;
public ServiceInfo getData(Service service) {
    // 缓存中存在,直接返回,否则查询后返回
    return serviceDataIndexes.containsKey(service)
      ? serviceDataIndexes.get(service) 
      : getPushData(service);
}

public ServiceInfo getPushData(Service service) {
    ServiceInfo result = emptyServiceInfo(service);
    if (!ServiceManager.getInstance().containSingleton(service)) {
        return result;
    }
    // 1. 查询instance放入结果集
    result.setHosts(getAllInstancesFromIndex(service));
    // 2. 本地缓存service->instance
    serviceDataIndexes.put(service, result);
    return result;
}

二、长连接

注册中心客户端2.0之后使用gRPC代替http,会与服务端建立长连接,但仍然保留了对旧http客户端的支持。

NamingClientProxy接口负责底层通讯,调用服务端接口。有三个实现类:

  • NamingClientProxyDelegate:代理类,对所有NacosNamingService中的方法进行代理,根据实际情况选择http或gRPC协议请求服务端。

  • NamingGrpcClientProxy:底层通讯基于gRPC长连接。

  • NamingHttpClientProxy:底层通讯基于http短连接。使用的都是老代码基本没改,原来1.0NamingProxy重命名过来的。

以客户端服务注册为例,NamingClientProxyDelegate代理了registerService方法。

// NacosNamingService.java
private NamingClientProxy clientProxy; // NamingClientProxyDelegate
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    NamingUtils.checkInstanceIsLegal(instance);
    clientProxy.registerService(serviceName, groupName, instance);
}

NamingClientProxyDelegate会根据instance实例是否是临时节点而选择不同的协议。

  • 临时instance:gRPC

  • 持久instance:http

public class NamingClientProxyDelegate implements NamingClientProxy {
   private final NamingHttpClientProxy httpClientProxy;
   private final NamingGrpcClientProxy grpcClientProxy;
   @Override
    public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
      getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
    }
  // 临时节点,走grpc长连接;持久节点,走http短连接
  private NamingClientProxy getExecuteClientProxy(Instance instance) {
      return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;
  }
}

服务查询和订阅全走的gRPC。

// NamingClientProxyDelegate
// 订阅
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
  String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
  String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
  // 1. 获取缓存中service对应serviceInfo
  ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
  // 2. 如果缓存中没有,订阅服务
  if (null == result) {
    result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
  }
  // 3. 开启定时任务,持续更新订阅服务注册表
  serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
  // 4. 更新缓存serviceInfo,通知服务变更
  serviceInfoHolder.processServiceInfo(result);
  return result;
}

// 查询
public ServiceInfo queryInstancesOfService(String serviceName, String groupName, String clusters, int udpPort,
                                           boolean healthyOnly)
 throws NacosException 
{
  return grpcClientProxy.queryInstancesOfService(serviceName, groupName, clusters, udpPort, healthyOnly);
}

三、健康检查

回顾1.x版本。

临时实例走Distro协议内存存储,客户端向注册中心发送心跳来维持自身healthy状态;

持久实例走Raft协议持久化存储,服务端定时与客户端建立tcp连接做健康检查。

2.0版本对于持久实例还是老逻辑。持久实例用的少,不看了。

2.x临时实例不再使用心跳,而是通过长连接是否存活来判断实例是否健康。

服务端主动探活

ConnectionManager负责管理所有客户端的长连接。

每3s检测所有超过20s没发生过通讯的客户端,向客户端发起ClientDetectionRequest探测请求,如果客户端在1s内成功响应,则检测通过,否则执行unregister方法移除Connection。

// ConnectionManager
// clientid -> connection
Map<String, Connection> connections = new ConcurrentHashMap<String, Connection>();
@PostConstruct
public void start() {
    RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            try {
                Set<Map.Entry<String, Connection>> entries = connections.entrySet();
                // 统计过时(20s)连接
                Set<String> outDatedConnections = new HashSet<>();
                long now = System.currentTimeMillis();
                for (Map.Entry<String, Connection> entry : entries) {
                    Connection client = entry.getValue();
                    // ...
                     if (now - client.getMetaInfo().getLastActiveTime() >= KEEP_ALIVE_TIME) {
                        outDatedConnections.add(client.getMetaInfo().getConnectionId());
                    }

                }
                // ...
                // 异步请求所有需要检测的连接 ClientDetectionRequest
                if (CollectionUtils.isNotEmpty(outDatedConnections)) {
                    Set<String> successConnections = new HashSet<>();
                    final CountDownLatch latch = new CountDownLatch(outDatedConnections.size());
                    for (String outDateConnectionId : outDatedConnections) {
                        try {
                            Connection connection = getConnection(outDateConnectionId);
                            if (connection != null) {
                                ClientDetectionRequest clientDetectionRequest = new ClientDetectionRequest();
                                connection.asyncRequest(clientDetectionRequest, new RequestCallBack() {
                                    @Override
                                    public Executor getExecutor() {
                                        return null;
                                    }

                                    @Override
                                    public long getTimeout() {
                                        return 1000L;
                                    }

                                    @Override
                                    public void onResponse(Response response) {
                                        latch.countDown();
                                        // 统计成功响应探测请求的客户端id,刷新activeTime
                                        if (response != null && response.isSuccess()) {
                                            connection.freshActiveTime();
                                            successConnections.add(outDateConnectionId);
                                        }
                                    }

                                    @Override
                                    public void onException(Throwable e) {
                                        latch.countDown();
                                    }
                                });

                            } else {
                                latch.countDown();
                            }

                        } catch (ConnectionAlreadyClosedException e) {
                            latch.countDown();
                        } catch (Exception e) {
                            latch.countDown();
                        }
                    }
                    latch.await(3000L, TimeUnit.MILLISECONDS);
                    // 对于没有成功响应的客户端,执行unregister方法
                    for (String outDateConnectionId : outDatedConnections) {
                        if (!successConnections.contains(outDateConnectionId)) {
                            unregister(outDateConnectionId);
                        }
                    }
                }
            } catch (Throwable e) {
                Loggers.REMOTE.error("Error occurs during connection check... ", e);
            }
        }
    }, 1000L3000L, TimeUnit.MILLISECONDS);

}
// 移除connection
public synchronized void unregister(String connectionId) {
    Connection remove = this.connections.remove(connectionId);
    if (remove != null) {
        // ...
        remove.close();
        clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
    }
}

移除connection后,继承ClientConnectionEventListener的ConnectionBasedClientManager会移除Client,发布ClientDisconnectEvent事件。

// clientId -> client
private final ConcurrentMap<String, ConnectionBasedClient> clients = new ConcurrentHashMap<>();
@Override
public boolean clientDisconnected(String clientId) {
  ConnectionBasedClient client = clients.remove(clientId);
  if (null == client) {
    return true;
  }
  client.release();
  NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
  return true;
}

ClientDisconnectEvent会触发几个点:

1)Distro协议:同步移除的client数据

2)清除两个索引缓存:ClientServiceIndexesManager中Service与发布Client的关系;ServiceStorage中Service与Instance的关系

3)服务订阅:ClientDisconnectEvent会间接触发ServiceChangedEvent事件,将服务变更通知客户端。

Connection是否需要检测?

Connection是否需要检测,关注的是他的元数据中的lastActiveTime属性,代表上次活跃时间。

public class ConnectionMeta {
    // 上次活跃时间
    long lastActiveTime;
}

如果客户端持续与服务端通讯,服务端是不需要主动探活的。见GrpcRequestAcceptor会调用ConnectionManager的refreshActiveTime刷新连接的lastActivetime。

// GrpcRequestAcceptor
public void request(Payload grpcRequest, StreamObserver<Payload> responseObserver) {
    String type = grpcRequest.getMetadata().getType();
    // 请求处理器
    RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);
    // 解析请求参数
    Object parseObj = GrpcUtils.parse(grpcRequest);
      // ..
    Request request = (Request) parseObj;
    try {
        Connection connection = connectionManager.getConnection(CONTEXT_KEY_CONN_ID.get());
        RequestMeta requestMeta = new RequestMeta();
        requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
        requestMeta.setConnectionId(CONTEXT_KEY_CONN_ID.get());
        requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
        requestMeta.setLabels(connection.getMetaInfo().getLabels());
        // 刷新长连接active time
        connectionManager.refreshActiveTime(requestMeta.getConnectionId());
        Response response = requestHandler.handleRequest(request, requestMeta);
        Payload payloadResponse = GrpcUtils.convert(response);
        traceIfNecessary(payloadResponse, false);
        responseObserver.onNext(payloadResponse);
        responseObserver.onCompleted();
    } catch (Throwable e) {
       //...
    }
}

客户端订阅服务,会定时更新服务注册表,在1.x是每10s调用服务端查一次注册表,这意味着不会触发服务端健康检查,因为小于20s。但是在2.x中,客户端的服务注册表更新任务UpdateTask,改为了60s。

// UpdateTask
public void run() {
    long delayTime = DEFAULT_DELAY;
    try {
        if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(serviceKey)) {
            return;
        }
        ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
        if (serviceObj == null) {
            serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0false);
            serviceInfoHolder.processServiceInfo(serviceObj);
            lastRefTime = serviceObj.getLastRefTime();
            return;
        }

        if (serviceObj.getLastRefTime() <= lastRefTime) {
            serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0false);
            serviceInfoHolder.processServiceInfo(serviceObj);
        }
        lastRefTime = serviceObj.getLastRefTime();
        if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
            return;
        }
        // 服务端控制cacheMillis=10s 这里乘以6 所以60s更新一次
        delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;
        resetFailCount();
    } catch (Throwable e) {
        incFailCount();
    } finally {
        // 60s后再次执行
        executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
    }
}

这样服务端可能要对注册在当前节点的所有长连接,每分钟做2-3次健康检查。事实上也并非如此,接下去看。

客户端重连

不光服务端会主动探测客户端存活,客户端也会每5s主动探测超过5s空闲的长连接是否存活。

当长连接断开,客户端会主动向服务端发起重连。

public final void start() throws NacosException {
    // 5s发送一次HealthCheckRequest,发现响应失败则重连
    clientEventExecutor.submit(new Runnable() {
        @Override
        public void run() {
            while (true) {
                try {
                    // 5s超时
                    ReconnectContext reconnectContext = reconnectionSignal.poll(keepAliveTime, TimeUnit.MILLISECONDS);
                    if (reconnectContext == null) {
                        // 超过5s空闲
                        if (System.currentTimeMillis() - lastActiveTimeStamp >= keepAliveTime) {
                            // HealthCheckRequest
                            boolean isHealthy = healthCheck();
                            if (!isHealthy) {
                                if (currentConnection == null) {
                                    continue;
                                }
                                rpcClientStatus.set(RpcClientStatus.UNHEALTHY);
                                reconnectContext = new ReconnectContext(nullfalse);

                            } else {
                                lastActiveTimeStamp = System.currentTimeMillis();
                                continue;
                            }
                        } else {
                            continue;
                        }

                    }
                    // 发起重连,选择下一个nacos节点
                    // ...
                    reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);
                } catch (Throwable throwable) {
                    //Do nothing
                }
            }
        }
    });
}

综上,2.x使用长连接,通过双向健康检查来判断对端存活。客户端5s空闲主动发起健康检查,服务端20s空闲主动发起健康检查。

客户端重连还需要解决重新注册和重新监听等问题。所以每次客户端发起这些操作都会记录在内存里,当重连时会重放所有注册和监听请求。

public class NamingGrpcConnectionEventListener implements ConnectionEventListener {
    // 当前客户端注册的service-instance
    private final ConcurrentMap<String, Set<Instance>> registeredInstanceCached = new ConcurrentHashMap<String, Set<Instance>>();
    // 当前客户端订阅的服务
    private final Set<String> subscribes = new ConcurrentHashSet<String>();

    @Override
    public void onConnected() {
        // 1. 重新订阅
        redoSubscribe();
        // 2. 重新注册
        redoRegisterEachService();
    }
}

四、客户端数据同步

客户端数据同步,仍然采用推拉结合的方式。

拉:客户端会每60s执行一个UpdateTask,查询服务端注册表,更新一个订阅Serivce,逻辑与1.x一致,只是时间由10s改为60s。

推:服务端的Service发生变更,通知所有监听这个Service的Client。这里不再使用1.x的UDP协议,走长连接推送。

服务端当Service发生变更,触发ServiceChangedEvent。NamingSubscriberServiceV2Impl监听ServiceChangedEvent事件,提交PushDelayTask。

// NamingSubscriberServiceV2Impl
public void onEvent(Event event) {
    if (event instanceof ServiceEvent.ServiceChangedEvent) {
        ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
        Service service = serviceChangedEvent.getService();
        delayTaskEngine.addTask(service, new PushDelayTask(service, 500L));
    } 
    // ...
}

PushExecuteTask执行客户端数据同步,首先通过ServiceStorage获取ServiceInfo,并更新ServiceStorage缓存。

// PushExecuteTask
public void run() {
    try {
        // 1. 更新ServiceStorage中service->instance缓存,查询ServiceInfo
        PushDataWrapper wrapper = generatePushData();
        // 2. 循环所有订阅客户端,将ServiceInfo发给订阅客户端
        for (String each : getTargetClientIds()) {
            Client client = delayTaskEngine.getClientManager().getClient(each);
            Subscriber subscriber = delayTaskEngine.getClientManager().getClient(each).getSubscriber(service);
            delayTaskEngine.getPushExecutor().doPushWithCallback(each, subscriber, wrapper,
                    new NamingPushCallback(each, subscriber, wrapper.getOriginalData()));
        }
    } catch (Exception e) {
        delayTaskEngine.addTask(service, new PushDelayTask(service, 1000L));
    }
}
private PushDataWrapper generatePushData() {
  // 查询ServiceInfo,并更新ServiceStorage缓存
  ServiceInfo serviceInfo = delayTaskEngine.getServiceStorage().getPushData(service);
  ServiceMetadata serviceMetadata = delayTaskEngine.getMetadataManager().getServiceMetadata(service).orElse(null);
  serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceInfo, serviceMetadata, falsetrue);
  return new PushDataWrapper(serviceInfo);
}

五、集群数据同步

当一个客户端发布或注销服务,会在Client模型里存储发布Service对应的Instance信息。

public abstract class AbstractClient implements Client {
    protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(160.75f1);
    @Override
    public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
        if (null == publishers.put(service, instancePublishInfo)) {
            MetricsMonitor.incrementInstanceCount();
        }
        NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
        return true;
    }

    @Override
    public InstancePublishInfo removeServiceInstance(Service service) {
        InstancePublishInfo result = publishers.remove(service);
        if (null != result) {
            NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
        }
        return result;
    }
}

责任节点

完成本地写入注册信息后,触发ClientChangedEvent事件,DistroClientDataProcessor只会处理当前节点负责的client。

// DistroClientDataProcessor.java
private void syncToAllServer(ClientEvent event) {
    Client client = event.getClient();
    // Only ephemeral data sync by Distro, persist client should sync by raft.
    if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
        return;
    }
    if (event instanceof ClientEvent.ClientDisconnectEvent) {
        // 客户端断开连接
        DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
        distroProtocol.sync(distroKey, DataOperation.DELETE);
    } else if (event instanceof ClientEvent.ClientChangedEvent) {
        // 客户端新增/修改
        DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
        distroProtocol.sync(distroKey, DataOperation.CHANGE);
    }
}

DistroProtocol循环所有其他nacos节点,提交一个异步任务,这个异步任务会延迟1s(nacos.core.protocol.distro.data.sync_delay_ms)执行。

// DistroProtocol.java
public void sync(DistroKey distroKey, DataOperation action, long delay) {
    for (Member each : memberManager.allMembersWithoutSelf()) {
        syncToTarget(distroKey, action, each.getAddress(), delay);
    }
}

public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {
    DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
            targetServer);
    DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
    distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
}

对于DELETE操作,由DistroSyncDeleteTask处理;

对于CHANGE操作,由DistroSyncChangeTask处理。

public class DistroSyncChangeTask extends AbstractDistroExecuteTask {
    private static final DataOperation OPERATION = DataOperation.CHANGE;
    // 无callback
    @Override
    protected boolean doExecute() {
        String type = getDistroKey().getResourceType();
        DistroData distroData = getDistroData(type);
        if (null == distroData) {
            return true;
        }
        return getDistroComponentHolder().findTransportAgent(type)
                .syncData(distroData, getDistroKey().getTargetServer());
    }
    // 有callback
    @Override
    protected void doExecuteWithCallback(DistroCallback callback) {
        String type = getDistroKey().getResourceType();
        DistroData distroData = getDistroData(type);
        getDistroComponentHolder().findTransportAgent(type)
                .syncData(distroData, getDistroKey().getTargetServer(), callback);
    }
    // 从DistroClientDataProcessor获取DistroData
    private DistroData getDistroData(String type) {
        DistroData result = getDistroComponentHolder().findDataStorage(type).getDistroData(getDistroKey());
        if (null != result) {
            result.setType(OPERATION);
        }
        return result;
    }
}

从DistroClientDataProcessor获取DistroData,是从ClientManager实时获取Client。

// DistroClientDataProcessor
public DistroData getDistroData(DistroKey distroKey) {
    Client client = clientManager.getClient(distroKey.getResourceKey());
    if (null == client) {
        return null;
    }
    byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(client.generateSyncData());
    return new DistroData(distroKey, data);
}

AbstractClient给DistroClientDataProcessor提供Client的注册的所有信息,包括客户端注册了哪些namespace,哪些group,哪些service,哪些instance。

// AbstractClient
protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(160.75f1);
public ClientSyncData generateSyncData() {
    List<String> namespaces = new LinkedList<>();
    List<String> groupNames = new LinkedList<>();
    List<String> serviceNames = new LinkedList<>();
    List<InstancePublishInfo> instances = new LinkedList<>();
    for (Map.Entry<Service, InstancePublishInfo> entry : publishers.entrySet()) {
        namespaces.add(entry.getKey().getNamespace());
        groupNames.add(entry.getKey().getGroup());
        serviceNames.add(entry.getKey().getName());
        instances.add(entry.getValue());
    }
    return new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances);
}

最终DistroClientTransportAgent封装为DistroDataRequest调用其他Nacos节点。

// DistroClientTransportAgent
public void syncData(DistroData data, String targetServer, DistroCallback callback) {
    if (isNoExistTarget(targetServer)) {
        callback.onSuccess();
    }
    DistroDataRequest request = new DistroDataRequest(data, data.getType());
    Member member = memberManager.find(targetServer);
    try {
        clusterRpcClientProxy.asyncRequest(member, request, new DistroRpcCallbackWrapper(callback));
    } catch (NacosException nacosException) {
        callback.onFailed(nacosException);
    }
}

非责任节点

非责任节点处理责任节点同步过来的Client数据。

DistroClientDataProcessor处理责任节点同步过来的数据。

// DistroClientDataProcessor
public boolean processData(DistroData distroData) {
  switch (distroData.getType()) {
    case ADD:
    case CHANGE:
      ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class)
        .deserialize(distroData.getContent(), ClientSyncData.class);
      handlerClientSyncData(clientSyncData);
      return true;
    case DELETE:
      String deleteClientId = distroData.getDistroKey().getResourceKey();
      clientManager.clientDisconnected(deleteClientId);
      return true;
    default:
      return false;
  }
}

private void handlerClientSyncData(ClientSyncData clientSyncData) {
  // 1. 保存ConnectionBasedClient,这类ConnectionBasedClient的isNative=false
  clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());
  Client client = clientManager.getClient(clientSyncData.getClientId());
  // 2. 更新Client信息
  upgradeClient(client, clientSyncData);
}

注意这里Client的实现类仍然是ConnectionBasedClient,只不过它的isNative属性为false,这是非责任节点与责任节点的主要区别。

DistroClientDataProcessor的upgradeClient方法,更新Client里的注册表信息,发布对应事件。

// DistroClientDataProcessor
private void upgradeClient(Client client, ClientSyncData clientSyncData) {
    List<String> namespaces = clientSyncData.getNamespaces();
    List<String> groupNames = clientSyncData.getGroupNames();
    List<String> serviceNames = clientSyncData.getServiceNames();
    List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();
    Set<Service> syncedService = new HashSet<>();
    for (int i = 0; i < namespaces.size(); i++) {
        Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));
        Service singleton = ServiceManager.getInstance().getSingleton(service);
        syncedService.add(singleton);
        InstancePublishInfo instancePublishInfo = instances.get(i);
        if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {
            client.addServiceInstance(singleton, instancePublishInfo);
            NotifyCenter.publishEvent(
                    new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));
        }
    }
    for (Service each : client.getAllPublishedService()) {
        if (!syncedService.contains(each)) {
            client.removeServiceInstance(each);
            NotifyCenter.publishEvent(
                    new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));
        }
    }
}

DistroFilter?

1.x版本,所有客户端的写请求都会经过DistroFilter。

Nacos源码(九)2.0注册中心
DistroFilter.png

如果hash(服务名)%nacos节点列表大小==当前节点所处下标,则当前节点是责任节点,处理客户端写请求。

// DistroMapper
public boolean responsible(String responsibleTag) {
    final List<String> servers = healthyList;
    if (!switchDomain.isDistroEnabled() || EnvUtil.getStandaloneMode()) {
        return true;
    }
    if (CollectionUtils.isEmpty(servers)) {
        return false;
    }
    int index = servers.indexOf(EnvUtil.getLocalAddress());
    int lastIndex = servers.lastIndexOf(EnvUtil.getLocalAddress());
    if (lastIndex < 0 || index < 0) {
        return true;
    }

    int target = distroHash(responsibleTag) % servers.size();
    return target >= index && target <= lastIndex;
}

否则,1.x中需要将客户端请求交由责任节点处理,责任节点处理后,由当前节点返回客户端。

而在2.x中,DistroFilter对于客户端就没用了,因为客户端与服务端会建立长连接,当前nacos节点是否是责任节点,取决于Client身上的isNative属性。如果是客户端直接注册在这个nacos节点上的ConnectionBasedClient,它的isNative属性为true;如果是由Distro协议,同步到这个nacos节点上的ConnectionBasedClient,它的isNative属性为false。

public class ConnectionBasedClient extends AbstractClient {
    /**
     * {@code true} means this client is directly connect to current server. {@code false} means this client is synced
     * from other server.
     */

    private final boolean isNative;
}

综上,2.x减少了1.x当中写请求转发的步骤,通过长连接建立在哪个节点上,哪个节点就是责任节点,客户端也只会向这个责任节点发送请求。

Verify

Distro为了确保集群间数据一致,不仅仅依赖于数据发生改变时的实时同步,后台有定时任务做数据同步。

1.x版本中,责任节点每5s同步所有Service的Instance列表的摘要(md5)给非责任节点。

非责任节点用对端传来的服务md5比对本地服务的md5,如果发生改变,需要反查责任节点。

在2.x版本中,对这个流程做了改造,责任节点会发送Client全量数据,非责任节点定时检测同步过来的Client是否过期,减少1.x版本中的反查。

  • 责任节点每5s向其他节点发送DataOperation=VERIFY类型的DistroData,来维持非责任节点的Client数据不过期。

public class DistroVerifyTimedTask implements Runnable {
    @Override
    public void run() {
        try {
            // 1. 所有其他节点
            List<Member> targetServer = serverMemberManager.allMembersWithoutSelf();
            for (String each : distroComponentHolder.getDataStorageTypes()) {
                // 2. 向这些节点发送Client.isNative=true的DistroData,type = VERIFY
                verifyForDataStorage(each, targetServer);
            }
        } catch (Exception e) {
            Loggers.DISTRO.error("[DISTRO-FAILED] verify task failed.", e);
        }
    }
}
  • 非责任节点每5s扫描isNative=false的client,如果client30s内没有被VERIFY的DistroData更新过续租时间,会删除这个同步过来的Client数据。

private static class ExpiredClientCleaner implements Runnable {
    private final ConnectionBasedClientManager clientManager;
    @Override
    public void run() {
        long currentTime = System.currentTimeMillis();
        for (String each : clientManager.allClientId()) {
            ConnectionBasedClient client = (ConnectionBasedClient) clientManager.getClient(each);
            if (null != client && isExpireClient(currentTime, client)) {
                clientManager.clientDisconnected(each);
            }
        }
    }
    private boolean isExpireClient(long currentTime, ConnectionBasedClient client) {
        // 同步client 且 30s内没有续租 认为过期
        return !client.isNative() && currentTime - client.getLastRenewTime() > Constants.DEFAULT_IP_DELETE_TIMEOUT;
    }
}

总结

模型:2.x相对于1.x主要的优化点在于使用长连接代替短连接,基于长连接对注册中心的模型进行了调整。

  • 模型:

    • Service:服务,namespace+group+name=单例Service。Service与Instance不会直接发生关系,由ServiceManager管理

    • Instance:实例,InstancePublishInfo,由Client管理。

    • Client:一个客户端长连接对应一个Client,一个Client持有对应客户端注册和监听的的Service&Instance。Client使Service和Instance发生关联,由ClientManager管理。

    • Connection:连接(长连接),一个Connection对应一个Client,由ConnectionManager管理。

  • 模型索引:为了加速查询,提供了两个索引服务

    • ClientServiceIndexesManager:Service->Client,服务与发布这个服务&服务与监听这个服务的客户端的关联关系。

    • ServiceStorage:Service->Instance,服务与服务下实例的关联关系。

长连接:并非所有客户端请求,都走了gRPC,唯独持久实例的注册,走了http。

健康检查:

  • 1.x:临时实例客户端定时向服务端发送心跳,确保Instance存活。

  • 2.x:临时实例客户端与服务端建立长连接,通过双向健康检查确保Client存活。服务端检测20s空闲连接,向客户端发起探测请求,如果客户端1s内响应,认为健康检查通过;客户端检测5s空闲连接,向服务端发起健康检查请求,如果服务端3s内响应,认为健康检查通过。

  • 2.x服务端如果发现Client不健康,会从ClientManager移除Client,进而触发各种事件(集群/客户端数据同步)。

客户端数据同步:仍然采用推拉结合的方式。

  • 拉:客户端会每60s执行一个UpdateTask,查询服务端注册表,更新一个订阅Serivce,逻辑与1.x一致,只是时间由10s改为60s。

  • 推:服务端的Service发生变更,通知所有监听这个Service的Client。

集群数据同步:

  • DistroFilter:1.x使用DistroFilter来确保写请求只能由服务对应责任节点处理,通过哈希运算确认这个服务所属责任节点;2.x因为使用了长连接,只要ConnectionBasedClient.isNative=true,代表Client与这个节点直连,Client所在的节点就是责任节点,减少了写请求重定向其他节点的损耗。

  • 责任节点的Client数据发生变更后,会同步这个Client的全量数据给其他非责任节点。非责任节点更新ClientManager中的Client信息。

  • 为了避免非责任节点的isNative=false的Client数据不一致,责任节点每5s向非责任节点发送VERIFY数据,续租这些Client,包含了Client全量数据;非责任节点定时扫描isNative=false的Client数据,如果超过30s没有续租,移除这些非native的client。