Nacos源码(九)2.0注册中心
前言
本章学习Nacos2.0版本的注册中心,比较与1.4.1版本的区别。
历史相关文章:
一、模型变更
客户端
对于客户端来说,调用的API没有变化,命名服务实现类仍然是NacosNamingService,服务实例仍然是com.alibaba.nacos.api.naming.pojo.Instance。
public class Instance implements Serializable {
private String instanceId;
private String ip;
private int port;
private double weight = 1.0D;
private boolean healthy = true;
private boolean enabled = true;
private boolean ephemeral = true;
private String clusterName;
private String serviceName;
private Map<String, String> metadata = new HashMap<String, String>();
}
ServiceInfo也没有变,分组+服务+集群对应一组Instance。
public class ServiceInfo {
// 服务名
private String name;
// 分组名
private String groupName;
// 集群,逗号分割
private String clusters;
// Instance实例
private List<Instance> hosts = new ArrayList<Instance>();
}
存储服务注册表的容器由HostReactor改为了ServiceInfoHolder,主要是负责管理failover注册表和内存注册表。1.x中HostReactor调用远程接口的逻辑都放到了NamingClientProxy中。
public class ServiceInfoHolder implements Closeable {
// key=groupName@@serviceName@@clusterName
private final ConcurrentMap<String, ServiceInfo> serviceInfoMap;
private final FailoverReactor failoverReactor;
}
2.0ServiceInfoHolder逻辑简单。
// ServiceInfoHolder
public ServiceInfo getServiceInfo(final String serviceName, final String groupName, final String clusters) {
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
String key = ServiceInfo.getKey(groupedServiceName, clusters);
// 1. 如果failover开启,取failover注册表
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
// 2. 内存注册表
return serviceInfoMap.get(key);
}
服务端
模型
服务端不再使用Service、Cluster、Instance模型,改为Service、Instance、Client、Connection。
2.0Service使用com.alibaba.nacos.naming.core.v2.pojo.Service。
public class Service {
private final String namespace;
private final String group;
private final String name;
private final boolean ephemeral;
}
2.0Service区别于1.x:
Instance的临时与持久概念上浮到Service,是否临时取决于首次注册的实例是否临时。
1.x用Service管理下面的Cluster,2.0Service与Cluster无关了,Cluster下沉到Instance。
1.x的Service.name=groupName@@serviceName,2.0将groupName和serviceName独立了。
2.0承载Service的容器仍然是ServiceManager,但是在com.alibaba.nacos.naming.core.v2包下,容器中Service都是单例。
public class ServiceManager {
private static final ServiceManager INSTANCE = new ServiceManager();
// 单例Service,见Service的equals和hasCode方法
private final ConcurrentHashMap<Service, Service> singletonRepository;
// namespace-下属所有Service
private final ConcurrentHashMap<String, Set<Service>> namespaceSingletonMaps;
}
关注Service的equals和hasCode方法,namespace+group+name在服务端是一个单例Service。
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof Service)) {
return false;
}
Service service = (Service) o;
return namespace.equals(service.namespace) && group.equals(service.group) && name.equals(service.name);
}
@Override
public int hashCode() {
return Objects.hash(namespace, group, name);
}
2.0没有再使用Cluster模型,cluster成为Instance里的属性。服务端Instance模型是InstancePublishInfo。
public class InstancePublishInfo implements Serializable {
private String ip;
private int port;
private boolean healthy;
private String cluster;
// 元数据 metadata
private Map<String, Object> extendDatum;
}
从Service和Instance来看,两者并没有建立关系。2.0新增新增Client模型。一个客户端gRPC长连接对应一个Client,每个Client有自己唯一的id(clientId)。Client负责管理一个客户端的服务实例注册Publish和服务订阅Subscribe。
/**
* Nacos naming client.
*
* <p>The abstract concept of the client stored by on the server of Nacos naming module. It is used to store which
* services the client has published and subscribed.
*/
public interface Client {
// 客户端id/gRPC的connectionId
String getClientId();
// 是否临时客户端
boolean isEphemeral();
// 客户端更新时间
void setLastUpdatedTime();
long getLastUpdatedTime();
// 服务实例注册/注销/查询
boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo);
InstancePublishInfo removeServiceInstance(Service service);
InstancePublishInfo getInstancePublishInfo(Service service);
Collection<Service> getAllPublishedService();
// 服务订阅/取消订阅/查询订阅
boolean addServiceSubscriber(Service service, Subscriber subscriber);
boolean removeServiceSubscriber(Service service);
Subscriber getSubscriber(Service service);
Collection<Service> getAllSubscribeService();
// 生成同步给其他节点的client数据
ClientSyncData generateSyncData();
// 是否过期
boolean isExpire(long currentTime);
// 释放资源
void release();
}
ConnectionManager管理所有连接Connection。
@Service
public class ConnectionManager extends Subscriber<ConnectionLimitRuleChangeEvent> {
// clientid -> connection
Map<String, Connection> connections = new ConcurrentHashMap<String, Connection>();
}
服务注册
如何注册Instance,建立三者关系?
从注册中心的传统概念来说,服务注册是把Instance注册到Service之下,服务发现是根据Service找到Instance,2.0nacos没有让Instance与Service直接发生关联。
从服务端服务注册处理来看看,Service与Instance的关联关系如何建立。
InstanceRequestHandler负责接收InstanceRequest,接入服务注册与注销请求。
@Component
public class InstanceRequestHandler extends RequestHandler<InstanceRequest, InstanceResponse> {
private final EphemeralClientOperationServiceImpl clientOperationService;
@Override
public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
Service service = Service
.newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
switch (request.getType()) {
// 注册
case NamingRemoteConstants.REGISTER_INSTANCE:
return registerInstance(service, request, meta);
// 注销
case NamingRemoteConstants.DE_REGISTER_INSTANCE:
return deregisterInstance(service, request, meta);
default:
throw new NacosException(NacosException.INVALID_PARAM);
}
}
private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta) {
clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());
return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
}
private InstanceResponse deregisterInstance(Service service, InstanceRequest request, RequestMeta meta) {
clientOperationService.deregisterInstance(service, request.getInstance(), meta.getConnectionId());
return new InstanceResponse(NamingRemoteConstants.DE_REGISTER_INSTANCE);
}
}
EphemeralClientOperationServiceImpl实际负责处理服务注册。
@Override
public void registerInstance(Service service, Instance instance, String clientId) {
// 1. 确保Service单例存在,注意Service的equals和hasCode方法
Service singleton = ServiceManager.getInstance().getSingleton(service);
// 2. 根据长连接id,找到客户端,这个关系在连接建立的时候存储
Client client = clientManager.getClient(clientId);
// 3. 客户端Instance模型,转换为服务端Instance模型
InstancePublishInfo instanceInfo = getPublishInfo(instance);
// 4. 将Instance存储到Client里
client.addServiceInstance(singleton, instanceInfo);
client.setLastUpdatedTime();
// 5. 建立service->clientId的关系
NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
NotifyCenter
.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
}
第一步:ServiceManager负责管理Service单例
map存储单例Service,不同于1.x版本需要拼接key,这里Service重写equals和hasCode方法作为key。
// ServiceManager.java
// 单例Service,见Service的equals和hasCode方法
private final ConcurrentHashMap<Service, Service> singletonRepository;
// namespace-下属所有Service
private final ConcurrentHashMap<String, Set<Service>> namespaceSingletonMaps;
public Service getSingleton(Service service) {
singletonRepository.putIfAbsent(service, service);
Service result = singletonRepository.get(service);
namespaceSingletonMaps.computeIfAbsent(result.getNamespace(), (namespace) -> new ConcurrentHashSet<>());
namespaceSingletonMaps.get(result.getNamespace()).add(result);
return result;
}
第二步:ConnectionBasedClientManager负责管理长连接clientId与Client模型的映射关系
Client实例(ConnectionBasedClient)是在gRPC长连接建立时保存到ConnectionBasedClientManager中的,这里省略代码。
@Component("connectionBasedClientManager")
public class ConnectionBasedClientManager extends ClientConnectionEventListener implements ClientManager {
private final ConcurrentMap<String, ConnectionBasedClient> clients = new ConcurrentHashMap<>();
// 长连接建立,存储
public boolean clientConnected(Client client) {
if (!clients.containsKey(client.getClientId())) {
clients.putIfAbsent(client.getClientId(), (ConnectionBasedClient) client);
}
return true;
}
// 根据clientId查询Client
public Client getClient(String clientId) {
return clients.get(clientId);
}
}
第三步:客户端Instance模型,转换为服务端Instance模型InstancePublishInfo
原来1.0的metadata->2.0的extendDataum。enable、weight等属性移动到元数据里。
// ClientOperationService
default InstancePublishInfo getPublishInfo(Instance instance) {
InstancePublishInfo result = new InstancePublishInfo(instance.getIp(), instance.getPort());
if (null != instance.getMetadata() && !instance.getMetadata().isEmpty()) {
result.getExtendDatum().putAll(instance.getMetadata());
}
if (StringUtils.isNotEmpty(instance.getInstanceId())) {
result.getExtendDatum().put(Constants.CUSTOM_INSTANCE_ID, instance.getInstanceId());
}
if (Constants.DEFAULT_INSTANCE_WEIGHT != instance.getWeight()) {
result.getExtendDatum().put(Constants.PUBLISH_INSTANCE_WEIGHT, instance.getWeight());
}
if (!instance.isEnabled()) {
result.getExtendDatum().put(Constants.PUBLISH_INSTANCE_ENABLE, instance.isEnabled());
}
String clusterName = StringUtils.isBlank(instance.getClusterName()) ? UtilsAndCommons.DEFAULT_CLUSTER_NAME
: instance.getClusterName();
result.setHealthy(instance.isHealthy());
result.setCluster(clusterName);
return result;
}
第四步:建立Client->Service->Instance的关系
ConnectionBasedClient的抽象父类,负责存储当前客户端的服务注册表,即Service与Instance的关系。注意对于单个客户端来说,同一个服务只能注册一个实例。
public abstract class AbstractClient implements Client {
protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);
@Override
public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
if (null == publishers.put(service, instancePublishInfo)) {
MetricsMonitor.incrementInstanceCount();
}
NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
return true;
}
}
第五步:建立Service与Client的关系
为什么要有这一步?
其实有上面几步就可以保证服务注册表能被查询了(Service-Instance)。
比如遍历ConnectionBasedClientManager的所有Client,再通过AbstractClient拿到每个客户端注册的服务,过滤目标服务得到最终Instance列表。伪代码如下:
for (Client c : ConnectionBasedClientManager.clients.values()) { // 遍历所有Client
for (Entry e : c.publishers.entrySet()) { // 遍历Client下注册的service-instance
if (e.key == targetService) { // 过滤service
result.add(e.value); // 得到instance
}
}
}
显然时间复杂度太高。此外,如果服务变更,对于服务订阅来说,需要直接获取Service与订阅Client也需要这样的关系,建立Service与Client的关系就是为了加速查询。
第五步发布ClientRegisterServiceEvent事件,ClientServiceIndexesManager监听,ClientServiceIndexesManager维护了两个索引:
Service与发布clientId
Service与订阅clientId
// ClientServiceIndexesManager.java
private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();
private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();
private void handleClientOperation(ClientOperationEvent event) {
Service service = event.getService();
String clientId = event.getClientId();
if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
addPublisherIndexes(service, clientId);
} else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
removePublisherIndexes(service, clientId);
} else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
addSubscriberIndexes(service, clientId);
} else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
removeSubscriberIndexes(service, clientId);
}
}
// 建立Service与发布Client的关系
private void addPublisherIndexes(Service service, String clientId) {
publisherIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
publisherIndexes.get(service).add(clientId);
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
}
这个索引关系建立以后,还会触发ServiceChangedEvent,代表服务注册表变更。对于注册表变更紧接着还要做两个事情:1)通知订阅客户端 2)Nacos集群数据同步。这两点后续再说。
持久实例注册怎么说?
注册持久实例需要走一遍Raft流程。其他逻辑与临时实例完全一致,只是模型不同,比如Client实例是IpPortBasedClient。
PersistentClientOperationServiceImpl#onInstanceRegister方法,当Raft流程走完以后,应用log到当前节点。
// PersistentClientOperationServiceImpl
private void onInstanceRegister(Service service, Instance instance, String clientId) {
// 1. 注册service
Service singleton = ServiceManager.getInstance().getSingleton(service);
// 2. 注册client
Client client = clientManager.computeIfAbsent(clientId, () -> new IpPortBasedClient(clientId, false));
// 3. 转换instance
InstancePublishInfo instancePublishInfo = getPublishInfo(instance);
// 4. 注册instance
client.addServiceInstance(singleton, instancePublishInfo);
client.setLastUpdatedTime();
// 5. 构建索引
NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
}
服务发现
如何根据service获取所有的instance?
对于服务监听来说,客户端最终肯定是将订阅服务Service与clientId存储在ClientServiceIndexesManager,可以参考上面一小节。
对于服务查询来说,虽然ClientServiceIndexesManager建立了Service与Client的关系,但是直接通过Service查Instance还是太慢了,伪代码如下:
// 1. 循环注册这个服务的所有客户端
for (String clientId : ClientServiceIndexesManager.publisherIndexes.get(targetService)) {
Client client = ConnectionBasedClientManager.get(clientId);
// 2. 获取客户端下这个服务的所有实例
result.addAll(client.publishers.values());
}
能否从传统概念,通过Service直接获取Instance列表,跳过Client关联关系,也是可以的,再做一层索引。
ServiceQueryRequestHandler负责处理客户端查询Service下Instance。分为三步:
根据service查询instance
根据service查询service的元数据,2.0将各种元数据都分开存储了,也是根据service单例找到对应元数据NamingMetadataManager维护了这种mapping关系
instance过滤,内部包括可用过滤、健康过滤、保护模式等逻辑
public class ServiceQueryRequestHandler extends RequestHandler<ServiceQueryRequest, QueryServiceResponse> {
private final ServiceStorage serviceStorage;
private final NamingMetadataManager metadataManager;
public QueryServiceResponse handle(ServiceQueryRequest request, RequestMeta meta) throws NacosException {
String namespaceId = request.getNamespace();
String groupName = request.getGroupName();
String serviceName = request.getServiceName();
Service service = Service.newService(namespaceId, groupName, serviceName);
String cluster = null == request.getCluster() ? "" : request.getCluster();
boolean healthyOnly = request.isHealthyOnly();
// 1. 根据service查询Instance
ServiceInfo result = serviceStorage.getData(service);
// 2. 根据service查询service的元数据
ServiceMetadata serviceMetadata = metadataManager.getServiceMetadata(service).orElse(null);
// 3. instance过滤 健康?可用?等等
result = ServiceUtil.selectInstancesWithHealthyProtection(result, serviceMetadata, cluster, healthyOnly, true);
return QueryServiceResponse.buildSuccessResponse(result);
}
}
ServiceStorage会根据上述伪代码逻辑,查询到service下的instance(ServiceInfo),对查询结果做缓存。
// service-instance
private final ConcurrentMap<Service, ServiceInfo> serviceDataIndexes;
// service-cluster
private final ConcurrentMap<Service, Set<String>> serviceClusterIndex;
public ServiceInfo getData(Service service) {
// 缓存中存在,直接返回,否则查询后返回
return serviceDataIndexes.containsKey(service)
? serviceDataIndexes.get(service)
: getPushData(service);
}
public ServiceInfo getPushData(Service service) {
ServiceInfo result = emptyServiceInfo(service);
if (!ServiceManager.getInstance().containSingleton(service)) {
return result;
}
// 1. 查询instance放入结果集
result.setHosts(getAllInstancesFromIndex(service));
// 2. 本地缓存service->instance
serviceDataIndexes.put(service, result);
return result;
}
二、长连接
注册中心客户端2.0之后使用gRPC代替http,会与服务端建立长连接,但仍然保留了对旧http客户端的支持。
NamingClientProxy接口负责底层通讯,调用服务端接口。有三个实现类:
NamingClientProxyDelegate:代理类,对所有NacosNamingService中的方法进行代理,根据实际情况选择http或gRPC协议请求服务端。
NamingGrpcClientProxy:底层通讯基于gRPC长连接。
NamingHttpClientProxy:底层通讯基于http短连接。使用的都是老代码基本没改,原来1.0NamingProxy重命名过来的。
以客户端服务注册为例,NamingClientProxyDelegate代理了registerService方法。
// NacosNamingService.java
private NamingClientProxy clientProxy; // NamingClientProxyDelegate
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
clientProxy.registerService(serviceName, groupName, instance);
}
NamingClientProxyDelegate会根据instance实例是否是临时节点而选择不同的协议。
临时instance:gRPC
持久instance:http
public class NamingClientProxyDelegate implements NamingClientProxy {
private final NamingHttpClientProxy httpClientProxy;
private final NamingGrpcClientProxy grpcClientProxy;
@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
}
// 临时节点,走grpc长连接;持久节点,走http短连接
private NamingClientProxy getExecuteClientProxy(Instance instance) {
return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;
}
}
服务查询和订阅全走的gRPC。
// NamingClientProxyDelegate
// 订阅
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
// 1. 获取缓存中service对应serviceInfo
ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
// 2. 如果缓存中没有,订阅服务
if (null == result) {
result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
}
// 3. 开启定时任务,持续更新订阅服务注册表
serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
// 4. 更新缓存serviceInfo,通知服务变更
serviceInfoHolder.processServiceInfo(result);
return result;
}
// 查询
public ServiceInfo queryInstancesOfService(String serviceName, String groupName, String clusters, int udpPort,
boolean healthyOnly) throws NacosException {
return grpcClientProxy.queryInstancesOfService(serviceName, groupName, clusters, udpPort, healthyOnly);
}
三、健康检查
回顾1.x版本。
临时实例走Distro协议内存存储,客户端向注册中心发送心跳来维持自身healthy状态;
持久实例走Raft协议持久化存储,服务端定时与客户端建立tcp连接做健康检查。
2.0版本对于持久实例还是老逻辑。持久实例用的少,不看了。
2.x临时实例不再使用心跳,而是通过长连接是否存活来判断实例是否健康。
服务端主动探活
ConnectionManager负责管理所有客户端的长连接。
每3s检测所有超过20s没发生过通讯的客户端,向客户端发起ClientDetectionRequest探测请求,如果客户端在1s内成功响应,则检测通过,否则执行unregister方法移除Connection。
// ConnectionManager
// clientid -> connection
Map<String, Connection> connections = new ConcurrentHashMap<String, Connection>();
@PostConstruct
public void start() {
RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
Set<Map.Entry<String, Connection>> entries = connections.entrySet();
// 统计过时(20s)连接
Set<String> outDatedConnections = new HashSet<>();
long now = System.currentTimeMillis();
for (Map.Entry<String, Connection> entry : entries) {
Connection client = entry.getValue();
// ...
if (now - client.getMetaInfo().getLastActiveTime() >= KEEP_ALIVE_TIME) {
outDatedConnections.add(client.getMetaInfo().getConnectionId());
}
}
// ...
// 异步请求所有需要检测的连接 ClientDetectionRequest
if (CollectionUtils.isNotEmpty(outDatedConnections)) {
Set<String> successConnections = new HashSet<>();
final CountDownLatch latch = new CountDownLatch(outDatedConnections.size());
for (String outDateConnectionId : outDatedConnections) {
try {
Connection connection = getConnection(outDateConnectionId);
if (connection != null) {
ClientDetectionRequest clientDetectionRequest = new ClientDetectionRequest();
connection.asyncRequest(clientDetectionRequest, new RequestCallBack() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public long getTimeout() {
return 1000L;
}
@Override
public void onResponse(Response response) {
latch.countDown();
// 统计成功响应探测请求的客户端id,刷新activeTime
if (response != null && response.isSuccess()) {
connection.freshActiveTime();
successConnections.add(outDateConnectionId);
}
}
@Override
public void onException(Throwable e) {
latch.countDown();
}
});
} else {
latch.countDown();
}
} catch (ConnectionAlreadyClosedException e) {
latch.countDown();
} catch (Exception e) {
latch.countDown();
}
}
latch.await(3000L, TimeUnit.MILLISECONDS);
// 对于没有成功响应的客户端,执行unregister方法
for (String outDateConnectionId : outDatedConnections) {
if (!successConnections.contains(outDateConnectionId)) {
unregister(outDateConnectionId);
}
}
}
} catch (Throwable e) {
Loggers.REMOTE.error("Error occurs during connection check... ", e);
}
}
}, 1000L, 3000L, TimeUnit.MILLISECONDS);
}
// 移除connection
public synchronized void unregister(String connectionId) {
Connection remove = this.connections.remove(connectionId);
if (remove != null) {
// ...
remove.close();
clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
}
}
移除connection后,继承ClientConnectionEventListener的ConnectionBasedClientManager会移除Client,发布ClientDisconnectEvent事件。
// clientId -> client
private final ConcurrentMap<String, ConnectionBasedClient> clients = new ConcurrentHashMap<>();
@Override
public boolean clientDisconnected(String clientId) {
ConnectionBasedClient client = clients.remove(clientId);
if (null == client) {
return true;
}
client.release();
NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
return true;
}
ClientDisconnectEvent会触发几个点:
1)Distro协议:同步移除的client数据
2)清除两个索引缓存:ClientServiceIndexesManager中Service与发布Client的关系;ServiceStorage中Service与Instance的关系
3)服务订阅:ClientDisconnectEvent会间接触发ServiceChangedEvent事件,将服务变更通知客户端。
Connection是否需要检测?
Connection是否需要检测,关注的是他的元数据中的lastActiveTime属性,代表上次活跃时间。
public class ConnectionMeta {
// 上次活跃时间
long lastActiveTime;
}
如果客户端持续与服务端通讯,服务端是不需要主动探活的。见GrpcRequestAcceptor会调用ConnectionManager的refreshActiveTime刷新连接的lastActivetime。
// GrpcRequestAcceptor
public void request(Payload grpcRequest, StreamObserver<Payload> responseObserver) {
String type = grpcRequest.getMetadata().getType();
// 请求处理器
RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);
// 解析请求参数
Object parseObj = GrpcUtils.parse(grpcRequest);
// ..
Request request = (Request) parseObj;
try {
Connection connection = connectionManager.getConnection(CONTEXT_KEY_CONN_ID.get());
RequestMeta requestMeta = new RequestMeta();
requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
requestMeta.setConnectionId(CONTEXT_KEY_CONN_ID.get());
requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
requestMeta.setLabels(connection.getMetaInfo().getLabels());
// 刷新长连接active time
connectionManager.refreshActiveTime(requestMeta.getConnectionId());
Response response = requestHandler.handleRequest(request, requestMeta);
Payload payloadResponse = GrpcUtils.convert(response);
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
} catch (Throwable e) {
//...
}
}
客户端订阅服务,会定时更新服务注册表,在1.x是每10s调用服务端查一次注册表,这意味着不会触发服务端健康检查,因为小于20s。但是在2.x中,客户端的服务注册表更新任务UpdateTask,改为了60s。
// UpdateTask
public void run() {
long delayTime = DEFAULT_DELAY;
try {
if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(serviceKey)) {
return;
}
ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
if (serviceObj == null) {
serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
serviceInfoHolder.processServiceInfo(serviceObj);
lastRefTime = serviceObj.getLastRefTime();
return;
}
if (serviceObj.getLastRefTime() <= lastRefTime) {
serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
serviceInfoHolder.processServiceInfo(serviceObj);
}
lastRefTime = serviceObj.getLastRefTime();
if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
return;
}
// 服务端控制cacheMillis=10s 这里乘以6 所以60s更新一次
delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;
resetFailCount();
} catch (Throwable e) {
incFailCount();
} finally {
// 60s后再次执行
executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
}
}
这样服务端可能要对注册在当前节点的所有长连接,每分钟做2-3次健康检查。事实上也并非如此,接下去看。
客户端重连
不光服务端会主动探测客户端存活,客户端也会每5s主动探测超过5s空闲的长连接是否存活。
当长连接断开,客户端会主动向服务端发起重连。
public final void start() throws NacosException {
// 5s发送一次HealthCheckRequest,发现响应失败则重连
clientEventExecutor.submit(new Runnable() {
@Override
public void run() {
while (true) {
try {
// 5s超时
ReconnectContext reconnectContext = reconnectionSignal.poll(keepAliveTime, TimeUnit.MILLISECONDS);
if (reconnectContext == null) {
// 超过5s空闲
if (System.currentTimeMillis() - lastActiveTimeStamp >= keepAliveTime) {
// HealthCheckRequest
boolean isHealthy = healthCheck();
if (!isHealthy) {
if (currentConnection == null) {
continue;
}
rpcClientStatus.set(RpcClientStatus.UNHEALTHY);
reconnectContext = new ReconnectContext(null, false);
} else {
lastActiveTimeStamp = System.currentTimeMillis();
continue;
}
} else {
continue;
}
}
// 发起重连,选择下一个nacos节点
// ...
reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);
} catch (Throwable throwable) {
//Do nothing
}
}
}
});
}
综上,2.x使用长连接,通过双向健康检查来判断对端存活。客户端5s空闲主动发起健康检查,服务端20s空闲主动发起健康检查。
客户端重连还需要解决重新注册和重新监听等问题。所以每次客户端发起这些操作都会记录在内存里,当重连时会重放所有注册和监听请求。
public class NamingGrpcConnectionEventListener implements ConnectionEventListener {
// 当前客户端注册的service-instance
private final ConcurrentMap<String, Set<Instance>> registeredInstanceCached = new ConcurrentHashMap<String, Set<Instance>>();
// 当前客户端订阅的服务
private final Set<String> subscribes = new ConcurrentHashSet<String>();
@Override
public void onConnected() {
// 1. 重新订阅
redoSubscribe();
// 2. 重新注册
redoRegisterEachService();
}
}
四、客户端数据同步
客户端数据同步,仍然采用推拉结合的方式。
拉:客户端会每60s执行一个UpdateTask,查询服务端注册表,更新一个订阅Serivce,逻辑与1.x一致,只是时间由10s改为60s。
推:服务端的Service发生变更,通知所有监听这个Service的Client。这里不再使用1.x的UDP协议,走长连接推送。
服务端当Service发生变更,触发ServiceChangedEvent。NamingSubscriberServiceV2Impl监听ServiceChangedEvent事件,提交PushDelayTask。
// NamingSubscriberServiceV2Impl
public void onEvent(Event event) {
if (event instanceof ServiceEvent.ServiceChangedEvent) {
ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
Service service = serviceChangedEvent.getService();
delayTaskEngine.addTask(service, new PushDelayTask(service, 500L));
}
// ...
}
PushExecuteTask执行客户端数据同步,首先通过ServiceStorage获取ServiceInfo,并更新ServiceStorage缓存。
// PushExecuteTask
public void run() {
try {
// 1. 更新ServiceStorage中service->instance缓存,查询ServiceInfo
PushDataWrapper wrapper = generatePushData();
// 2. 循环所有订阅客户端,将ServiceInfo发给订阅客户端
for (String each : getTargetClientIds()) {
Client client = delayTaskEngine.getClientManager().getClient(each);
Subscriber subscriber = delayTaskEngine.getClientManager().getClient(each).getSubscriber(service);
delayTaskEngine.getPushExecutor().doPushWithCallback(each, subscriber, wrapper,
new NamingPushCallback(each, subscriber, wrapper.getOriginalData()));
}
} catch (Exception e) {
delayTaskEngine.addTask(service, new PushDelayTask(service, 1000L));
}
}
private PushDataWrapper generatePushData() {
// 查询ServiceInfo,并更新ServiceStorage缓存
ServiceInfo serviceInfo = delayTaskEngine.getServiceStorage().getPushData(service);
ServiceMetadata serviceMetadata = delayTaskEngine.getMetadataManager().getServiceMetadata(service).orElse(null);
serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceInfo, serviceMetadata, false, true);
return new PushDataWrapper(serviceInfo);
}
五、集群数据同步
当一个客户端发布或注销服务,会在Client模型里存储发布Service对应的Instance信息。
public abstract class AbstractClient implements Client {
protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);
@Override
public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
if (null == publishers.put(service, instancePublishInfo)) {
MetricsMonitor.incrementInstanceCount();
}
NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
return true;
}
@Override
public InstancePublishInfo removeServiceInstance(Service service) {
InstancePublishInfo result = publishers.remove(service);
if (null != result) {
NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
}
return result;
}
}
责任节点
完成本地写入注册信息后,触发ClientChangedEvent事件,DistroClientDataProcessor只会处理当前节点负责的client。
// DistroClientDataProcessor.java
private void syncToAllServer(ClientEvent event) {
Client client = event.getClient();
// Only ephemeral data sync by Distro, persist client should sync by raft.
if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
return;
}
if (event instanceof ClientEvent.ClientDisconnectEvent) {
// 客户端断开连接
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
distroProtocol.sync(distroKey, DataOperation.DELETE);
} else if (event instanceof ClientEvent.ClientChangedEvent) {
// 客户端新增/修改
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
distroProtocol.sync(distroKey, DataOperation.CHANGE);
}
}
DistroProtocol循环所有其他nacos节点,提交一个异步任务,这个异步任务会延迟1s(nacos.core.protocol.distro.data.sync_delay_ms)执行。
// DistroProtocol.java
public void sync(DistroKey distroKey, DataOperation action, long delay) {
for (Member each : memberManager.allMembersWithoutSelf()) {
syncToTarget(distroKey, action, each.getAddress(), delay);
}
}
public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {
DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
targetServer);
DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
}
对于DELETE操作,由DistroSyncDeleteTask处理;
对于CHANGE操作,由DistroSyncChangeTask处理。
public class DistroSyncChangeTask extends AbstractDistroExecuteTask {
private static final DataOperation OPERATION = DataOperation.CHANGE;
// 无callback
@Override
protected boolean doExecute() {
String type = getDistroKey().getResourceType();
DistroData distroData = getDistroData(type);
if (null == distroData) {
return true;
}
return getDistroComponentHolder().findTransportAgent(type)
.syncData(distroData, getDistroKey().getTargetServer());
}
// 有callback
@Override
protected void doExecuteWithCallback(DistroCallback callback) {
String type = getDistroKey().getResourceType();
DistroData distroData = getDistroData(type);
getDistroComponentHolder().findTransportAgent(type)
.syncData(distroData, getDistroKey().getTargetServer(), callback);
}
// 从DistroClientDataProcessor获取DistroData
private DistroData getDistroData(String type) {
DistroData result = getDistroComponentHolder().findDataStorage(type).getDistroData(getDistroKey());
if (null != result) {
result.setType(OPERATION);
}
return result;
}
}
从DistroClientDataProcessor获取DistroData,是从ClientManager实时获取Client。
// DistroClientDataProcessor
public DistroData getDistroData(DistroKey distroKey) {
Client client = clientManager.getClient(distroKey.getResourceKey());
if (null == client) {
return null;
}
byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(client.generateSyncData());
return new DistroData(distroKey, data);
}
AbstractClient给DistroClientDataProcessor提供Client的注册的所有信息,包括客户端注册了哪些namespace,哪些group,哪些service,哪些instance。
// AbstractClient
protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);
public ClientSyncData generateSyncData() {
List<String> namespaces = new LinkedList<>();
List<String> groupNames = new LinkedList<>();
List<String> serviceNames = new LinkedList<>();
List<InstancePublishInfo> instances = new LinkedList<>();
for (Map.Entry<Service, InstancePublishInfo> entry : publishers.entrySet()) {
namespaces.add(entry.getKey().getNamespace());
groupNames.add(entry.getKey().getGroup());
serviceNames.add(entry.getKey().getName());
instances.add(entry.getValue());
}
return new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances);
}
最终DistroClientTransportAgent封装为DistroDataRequest调用其他Nacos节点。
// DistroClientTransportAgent
public void syncData(DistroData data, String targetServer, DistroCallback callback) {
if (isNoExistTarget(targetServer)) {
callback.onSuccess();
}
DistroDataRequest request = new DistroDataRequest(data, data.getType());
Member member = memberManager.find(targetServer);
try {
clusterRpcClientProxy.asyncRequest(member, request, new DistroRpcCallbackWrapper(callback));
} catch (NacosException nacosException) {
callback.onFailed(nacosException);
}
}
非责任节点
非责任节点处理责任节点同步过来的Client数据。
DistroClientDataProcessor处理责任节点同步过来的数据。
// DistroClientDataProcessor
public boolean processData(DistroData distroData) {
switch (distroData.getType()) {
case ADD:
case CHANGE:
ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class)
.deserialize(distroData.getContent(), ClientSyncData.class);
handlerClientSyncData(clientSyncData);
return true;
case DELETE:
String deleteClientId = distroData.getDistroKey().getResourceKey();
clientManager.clientDisconnected(deleteClientId);
return true;
default:
return false;
}
}
private void handlerClientSyncData(ClientSyncData clientSyncData) {
// 1. 保存ConnectionBasedClient,这类ConnectionBasedClient的isNative=false
clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());
Client client = clientManager.getClient(clientSyncData.getClientId());
// 2. 更新Client信息
upgradeClient(client, clientSyncData);
}
注意这里Client的实现类仍然是ConnectionBasedClient,只不过它的isNative属性为false,这是非责任节点与责任节点的主要区别。
DistroClientDataProcessor的upgradeClient方法,更新Client里的注册表信息,发布对应事件。
// DistroClientDataProcessor
private void upgradeClient(Client client, ClientSyncData clientSyncData) {
List<String> namespaces = clientSyncData.getNamespaces();
List<String> groupNames = clientSyncData.getGroupNames();
List<String> serviceNames = clientSyncData.getServiceNames();
List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();
Set<Service> syncedService = new HashSet<>();
for (int i = 0; i < namespaces.size(); i++) {
Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));
Service singleton = ServiceManager.getInstance().getSingleton(service);
syncedService.add(singleton);
InstancePublishInfo instancePublishInfo = instances.get(i);
if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {
client.addServiceInstance(singleton, instancePublishInfo);
NotifyCenter.publishEvent(
new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));
}
}
for (Service each : client.getAllPublishedService()) {
if (!syncedService.contains(each)) {
client.removeServiceInstance(each);
NotifyCenter.publishEvent(
new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));
}
}
}
DistroFilter?
1.x版本,所有客户端的写请求都会经过DistroFilter。
如果hash(服务名)%nacos节点列表大小==当前节点所处下标,则当前节点是责任节点,处理客户端写请求。
// DistroMapper
public boolean responsible(String responsibleTag) {
final List<String> servers = healthyList;
if (!switchDomain.isDistroEnabled() || EnvUtil.getStandaloneMode()) {
return true;
}
if (CollectionUtils.isEmpty(servers)) {
return false;
}
int index = servers.indexOf(EnvUtil.getLocalAddress());
int lastIndex = servers.lastIndexOf(EnvUtil.getLocalAddress());
if (lastIndex < 0 || index < 0) {
return true;
}
int target = distroHash(responsibleTag) % servers.size();
return target >= index && target <= lastIndex;
}
否则,1.x中需要将客户端请求交由责任节点处理,责任节点处理后,由当前节点返回客户端。
而在2.x中,DistroFilter对于客户端就没用了,因为客户端与服务端会建立长连接,当前nacos节点是否是责任节点,取决于Client身上的isNative属性。如果是客户端直接注册在这个nacos节点上的ConnectionBasedClient,它的isNative属性为true;如果是由Distro协议,同步到这个nacos节点上的ConnectionBasedClient,它的isNative属性为false。
public class ConnectionBasedClient extends AbstractClient {
/**
* {@code true} means this client is directly connect to current server. {@code false} means this client is synced
* from other server.
*/
private final boolean isNative;
}
综上,2.x减少了1.x当中写请求转发的步骤,通过长连接建立在哪个节点上,哪个节点就是责任节点,客户端也只会向这个责任节点发送请求。
Verify
Distro为了确保集群间数据一致,不仅仅依赖于数据发生改变时的实时同步,后台有定时任务做数据同步。
在1.x版本中,责任节点每5s同步所有Service的Instance列表的摘要(md5)给非责任节点。
非责任节点用对端传来的服务md5比对本地服务的md5,如果发生改变,需要反查责任节点。
在2.x版本中,对这个流程做了改造,责任节点会发送Client全量数据,非责任节点定时检测同步过来的Client是否过期,减少1.x版本中的反查。
责任节点每5s向其他节点发送DataOperation=VERIFY类型的DistroData,来维持非责任节点的Client数据不过期。
public class DistroVerifyTimedTask implements Runnable {
@Override
public void run() {
try {
// 1. 所有其他节点
List<Member> targetServer = serverMemberManager.allMembersWithoutSelf();
for (String each : distroComponentHolder.getDataStorageTypes()) {
// 2. 向这些节点发送Client.isNative=true的DistroData,type = VERIFY
verifyForDataStorage(each, targetServer);
}
} catch (Exception e) {
Loggers.DISTRO.error("[DISTRO-FAILED] verify task failed.", e);
}
}
}
非责任节点每5s扫描isNative=false的client,如果client30s内没有被VERIFY的DistroData更新过续租时间,会删除这个同步过来的Client数据。
private static class ExpiredClientCleaner implements Runnable {
private final ConnectionBasedClientManager clientManager;
@Override
public void run() {
long currentTime = System.currentTimeMillis();
for (String each : clientManager.allClientId()) {
ConnectionBasedClient client = (ConnectionBasedClient) clientManager.getClient(each);
if (null != client && isExpireClient(currentTime, client)) {
clientManager.clientDisconnected(each);
}
}
}
private boolean isExpireClient(long currentTime, ConnectionBasedClient client) {
// 同步client 且 30s内没有续租 认为过期
return !client.isNative() && currentTime - client.getLastRenewTime() > Constants.DEFAULT_IP_DELETE_TIMEOUT;
}
}
总结
模型:2.x相对于1.x主要的优化点在于使用长连接代替短连接,基于长连接对注册中心的模型进行了调整。
模型:
Service:服务,namespace+group+name=单例Service。Service与Instance不会直接发生关系,由ServiceManager管理
Instance:实例,InstancePublishInfo,由Client管理。
Client:一个客户端长连接对应一个Client,一个Client持有对应客户端注册和监听的的Service&Instance。Client使Service和Instance发生关联,由ClientManager管理。
Connection:连接(长连接),一个Connection对应一个Client,由ConnectionManager管理。
模型索引:为了加速查询,提供了两个索引服务
ClientServiceIndexesManager:Service->Client,服务与发布这个服务&服务与监听这个服务的客户端的关联关系。
ServiceStorage:Service->Instance,服务与服务下实例的关联关系。
长连接:并非所有客户端请求,都走了gRPC,唯独持久实例的注册,走了http。
健康检查:
1.x:临时实例客户端定时向服务端发送心跳,确保Instance存活。
2.x:临时实例客户端与服务端建立长连接,通过双向健康检查确保Client存活。服务端检测20s空闲连接,向客户端发起探测请求,如果客户端1s内响应,认为健康检查通过;客户端检测5s空闲连接,向服务端发起健康检查请求,如果服务端3s内响应,认为健康检查通过。
2.x服务端如果发现Client不健康,会从ClientManager移除Client,进而触发各种事件(集群/客户端数据同步)。
客户端数据同步:仍然采用推拉结合的方式。
拉:客户端会每60s执行一个UpdateTask,查询服务端注册表,更新一个订阅Serivce,逻辑与1.x一致,只是时间由10s改为60s。
推:服务端的Service发生变更,通知所有监听这个Service的Client。
集群数据同步:
DistroFilter:1.x使用DistroFilter来确保写请求只能由服务对应责任节点处理,通过哈希运算确认这个服务所属责任节点;2.x因为使用了长连接,只要ConnectionBasedClient.isNative=true,代表Client与这个节点直连,Client所在的节点就是责任节点,减少了写请求重定向其他节点的损耗。
责任节点的Client数据发生变更后,会同步这个Client的全量数据给其他非责任节点。非责任节点更新ClientManager中的Client信息。
为了避免非责任节点的isNative=false的Client数据不一致,责任节点每5s向非责任节点发送VERIFY数据,续租这些Client,包含了Client全量数据;非责任节点定时扫描isNative=false的Client数据,如果超过30s没有续租,移除这些非native的client。