vlambda博客
学习文章列表

[源码阅读] Dubbo注册中心模块实现原理

前言


  • 当注册中心挂掉后,Dubbo还可以订阅和调用服务吗?


  • 当注册中心恢复并与其重连后,Dubbo是如何重新发布和订阅服务的?


简介




Dubbo 注册中心的几个特点:



  • 注册中心全部宕机,不影响已运行的提供者和消费者,消费者在本地缓存了提供者列表


  • 注册中心是可选的,服务消费者可以直连服务提供者


Dubbo 中注册中心模块 dubbo-registry 的源码项目结构如下:

+dubbo-registry -dubbo-registry-api -dubbo-registry-consul -dubbo-registry-default -dubbo-registry-etcd3 -dubbo-registry-eureka -dubbo-registry-multicast -dubbo-registry-multiple -dubbo-registry-nacos -dubbo-registry-redis -dubbo-registry-sofa -dubbo-registry-zookeeper


dubbo-registry 中包含了多种注册中心的具体实现,其中核心模块为dubbo-registry-api,其他模块均为Dubbo内置的不同注册中心的实现,本文会跟随Dubbo源码来看下registry模块的实现原理以及是怎样支撑多种注册中心的。


基础定义层


抽象的注册服务类

org.apache.dubbo.registry.support.AbstractRegistry


AbstractRegistry 是所有注册中心的父类,提供了子类公用的基础逻辑,先看下构造函数的代码

 
public AbstractRegistry(URL url) { setUrl(url); //读取持久化开关配置 if (url.getParameter(REGISTRY__LOCAL_FILE_CACHE_ENABLED, true)) { //读取持久化方式配置,默认是异步线程持久化 syncSaveFile = url.getParameter(REGISTRY_FILESAVE_SYNC_KEY, false); //如果没有指定文件名称则使用默认规则生成文件名称 String defaultFilename = System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(APPLICATION_KEY) + "-" + url.getAddress().replaceAll(":", "-") + ".cache"; String filename = url.getParameter(FILE_KEY, defaultFilename); File file = null; if (ConfigUtils.isNotEmpty(filename)) { file = new File(filename); if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) { if (!file.getParentFile().mkdirs()) { throw new IllegalArgumentException("Invalid registry cache file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!"); } } } this.file = file; //当启动注册时 //先读取本地缓存文件,这样当注册中心宕机,也可以正常工作从本地订阅服务 loadProperties(); //通知Listeners notify(url.getBackupUrls()); }}

在构造函数中,如果打开了持久化开关会从本地的缓存文件来读取服务列表并加载到内存中,加载缓存的作用是当去注册中心 subcribe 服务而注册中心不可用时从本地缓存读取订阅服务信息的一种容错设计,具体实现是提供了一个getCacheUrls方法从缓存中订阅服务


//从缓存文件中根据URL获取服务列表//在服务向注册中心订阅失败时会通过本地缓存数据订阅服务public List<URL> getCacheUrls(URL url) { for (Map.Entry<Object, Object> entry : properties.entrySet()) { String key = (String) entry.getKey(); String value = (String) entry.getValue(); if (StringUtils.isNotEmpty(key) && key.equals(url.getServiceKey()) && (Character.isLetter(key.charAt(0)) || key.charAt(0) == '_') && StringUtils.isNotEmpty(value)) { String[] arr = value.trim().split(URL_SPLIT); List<URL> urls = new ArrayList<>(); for (String u : arr) { urls.add(URL.valueOf(u)); } return urls; } } return null;}

AbstractRegistry 实现了Registry 的基础功能,主要是在内存和文件中记录下了所有注册、订阅的服务以及当 Registry初始化时会从文件中加载服务列表到内存中


AbstractRegistry 类的是通过 AbstractRegistryFactory 工厂类创建的,下面我们看下 AbstractRegistryFactory 工厂类的实现


注册中心工厂类

org.apache.dubbo.registry.support.AbstractRegistryFactory


AbstractRegistryFactory提供了 getRegistry 方法用来创建 Registry 对象,其中 getRegistry 方法会调用模版方法createRegistry创建子类的Registry对象


在 Dubbo 中支持多种注册中心,如:Zookeeper、Nacos、Redis 等,因此 getRegistry 方法会根据运行时参数会加载相应的注册中心实例

// 获取注册中心对象@Overridepublic Registry getRegistry(URL url) { if (destroyed.get()) { LOGGER.warn("All registry instances have been destroyed, failed to fetch any instance. " + "Usually, this means no need to try to do unnecessary redundant resource clearance, all registries has been taken care of."); return DEFAULT_NOP_REGISTRY; } url = URLBuilder.from(url) .setPath(RegistryService.class.getName()) .addParameter(INTERFACE_KEY, RegistryService.class.getName()) .removeParameters(EXPORT_KEY, REFER_KEY) .build(); String key = createRegistryCacheKey(url); // 加锁,确保registry对象是单例 LOCK.lock(); try { // 先从内存获取registry对象 Registry registry = REGISTRIES.get(key); if (registry != null) { return registry; } //创建registry对象 //原理是根据URL中的参数通过SPI方式来创建注册中心对象 registry = createRegistry(url); if (registry == null) { throw new IllegalStateException("Can not create registry " + url); } REGISTRIES.put(key, registry); return registry; } finally { // Release the lock LOCK.unlock(); }}

为了保证注册中心的单例,getRegistry 方法会先从缓存中获取注册中心实例,如果缓存中没有才会通过模版方法 createRegistry去创建Registry对象


// 抽象的创建Registry方法// 包含了各种注册中心的实现,其子类有:// ConsulRegistryFactory// AbstractRegistryFactory// ConsulRegistryFactory// DubboRegistryFactory// EtcdRegistryFactory// MulticastRegistryFactory// MultipleRegistryFactory// NacosRegistryFactory// RedisRegistryFactory// ServiceDiscoveryRegistryFactory// SofaRegistryFactory// ZookeeperRegistryFactoryprotected abstract Registry createRegistry(URL url);

因此父类 AbstractRegistryFactory 的作用就是对注册中心实例进行缓存,并根据运行时根据参数 URL 来动态加载相应的 Registry 实例


下面看下注册中心对象 Registry 的创建和执行过程。


注册中心工厂接口

org.apache.dubbo.registry.RegistryFactory

@SPI("dubbo")public interface RegistryFactory { @Adaptive({"protocol"}) Registry getRegistry(URL url);}
  • @SPI("dubbo") 注解表示这是一个拓展接口,采用SPI方式加载类,默认值为dubbo


  • @Adaptive({"protocol"}) 注册表示根据URL中protocol内容来加载拓展类。


RegistryFactory 对象的创建采用的是 Dubbo 的自适应 SPI 拓展机制,其核心流程先生成一个 RegistryFactory 的代理类,在代理类的 getRegistry 方法中在根据 URL 中的 protocol 属性通过 SPI 机制加载真正的 RegistryFactory 实例,为什么要生成代理类?


在 Dubbo 中,很多拓展都是通过 SPI 机制进行加载的,比如 Protocol、Cluster、LoadBalance 等。有时,有些拓展并不想在框架启动阶段被加载,而是希望在拓展方法被调用时,根据运行时参数进行加载,因此 Dubbo 会为拓展接口生成具有代理功能的代码。然后通过 javassist 或 jdk 编译这段代码,得到 Class 类。最后再通过反射创建代理类,整个过程比较复杂。


详细的实现原理可以前往:

http://dubbo.apache.org/zh/docs/v2.7/dev/source/adaptive-extension/


自适应 SPI 加载 RegistryFactory 的调用过程如下:


首先通过 ExtensionLoader 类获得 RegistryFacory 的代理类

ExtensionLoader.getExtensionLoader(RegistryFacory.class).getAdaptiveExtension();
得到的代理类 RegistryFacory@Adaptive 代码如下:
public class RegistryFactory$Adaptive implements org.apache.dubbo.registry.RegistryFactory { public org.apache.dubbo.registry.Registry getRegistry(org.apache.dubbo.common.URL arg0) { if (arg0 == null) throw new IllegalArgumentException("url == null"); org.apache.dubbo.common.URL url = arg0; //从URL参数中获取protocol的值,缺省为dubbo String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() ); if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.registry.RegistryFactory) name from url (" + url.toString() + ") use keys([protocol])"); //再次通过SPI加载得到真正的RegistryFactory拓展对象 org.apache.dubbo.registry.RegistryFactory extension = (org.apache.dubbo.registry.RegistryFactory)ExtensionLoader.getExtensionLoader(org.apache.dubbo.registry.RegistryFactory.class).getExtension(extName); //得到真正的RegistryFactory拓展对象的getRegistry方法,得到Registry对象 return extension.getRegistry(arg0); }}

在代理类的 getRegistry 方法中根据 url 中的 protocol 属性通过 Dubbo SPI 加载出真正的 RegistryFacory 对象,SPI 的配置信息存放在 META-INF/dubbo/接口全限定名 中,其内容为:

dubbo=org.apache.dubbo.registry.dubbo.DubboRegistryFactorymulticast=org.apache.dubbo.registry.multicast.MulticastRegistryFactoryzookeeper=org.apache.dubbo.registry.zookeeper.ZookeeperRegistryFactoryredis=org.apache.dubbo.registry.redis.RedisRegistryFactoryconsul=org.apache.dubbo.registry.consul.ConsulRegistryFactoryetcd3=org.apache.dubbo.registry.etcd.EtcdRegistryFactorynacos=org.apache.dubbo.registry.nacos.NacosRegistryFactorysofa=org.apache.dubbo.registry.sofa.SofaRegistryFactorymultiple=org.apache.dubbo.registry.multiple.MultipleRegistryFactory


Dubbo SPI 和 Dubbo 自适应 SPI 的区别:


Dubbo SPI 改进了 JDK 标准的 SPI,在扩展类的 jar 包内 ,放置扩展点配置文件 META-INF/dubbo/接口全限定名,内容为:配置名=扩展实现类全限定名,多个实现类用换行符分隔,通过 ExtensionLoader 的 getExtension(配置名) 来加载一个拓展点


Dubbo 自适应 SPI 是为了解决只有当拓展方法被调用时再去加载而提供的一种懒加载机制,目的是节省系统资源,在运行时根据参数来决定加载哪一个拓展点,通过 ExtensionLoader 的 getAdaptiveExtension 方法来自适应加载一个拓展点


SPI 自适应拓展运行时的 debug 情况

调用 RegistryFactory@Adpative 代理类的 getRegistry 方法,当使用Zookeeper作为注册中心时运行时参数URL中的protocol 等于 zookeeper


此时 getRegistry(url) 方法会创建 ZookeeperRegistryFactory 对象,这样就完成了注册中心实例的创建


自动恢复注册中心类

org.apache.dubbo.registry.support.FailbackRegistry


Dubbo在本地文件和内存中都保存了已经订阅的provider信息,当注册中心出现故障时,一般不会影响服务的调用,只需要在注册中心恢复时把之前请求失败的动作重新请求一遍即可


因此子类 FailbackRegistry 对父类 AbstractRegistry 的 register、unregister、subscribe、unsubscribe 等方法增加了自动恢复特性:


  • 注册中心出现故障时会把订阅、发布请求创建重试任务并在后台重试


  • 当与注册中心断开并重新建立连接时调用recover方法从内存中恢复已经订阅和已经注册的服务。


FailbackRegistry类是AbstractRegistry的增强类,因此继承了AbstractRegistry类

public abstract class FailbackRegistry extends AbstractRegistry

构造函数中初始化了时间轮调度器,用于执行自动恢复任务

public FailbackRegistry(URL url) { super(url); this.retryPeriod = url.getParameter(REGISTRY_RETRY_PERIOD_KEY, DEFAULT_REGISTRY_RETRY_PERIOD); // since the retry task will not be very much. 128 ticks is enough. retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128);}

register重写了父类的register方法并对父类进行了功能增强

 
//注册服务@Overridepublic void register(URL url) { if (!acceptable(url)) { logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type."); return; } super.register(url); // 从注册失败列表中移除 removeFailedRegistered(url); // 从取消注册失败列表中移除 removeFailedUnregistered(url); try { // 发送注册请求 doRegister(url); } catch (Exception e) { Throwable t = e; // 在注册失败后,检查是否有配置check=true的配置,如果不是consumer方并且check=true boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true) && !CONSUMER_PROTOCOL.equals(url.getProtocol()); // 判断异常类型是不是SkipFailbackWrapperException boolean skipFailback = t instanceof SkipFailbackWrapperException; // 满足任何一个条件,则注册失败并抛出异常 if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t); } // 否则记录注册失败的url,加入到retryTimer的重试任务中进行重试 addFailedRegistered(url); }}

首先会调用父类的register方法,然后调用doRegister方法向注册中心发起注册,当注册失败时如果是注册provider并且开启了check配置则抛出注册失败异常,否则就记录下失败的url,在addFailedRegistered方法中,会创建一个task用于后台重试

 
private void addFailedRegistered(URL url) { FailedRegisteredTask oldOne = failedRegistered.get(url); if (oldOne != null) { return; } FailedRegisteredTask newTask = new FailedRegisteredTask(url, this); oldOne = failedRegistered.putIfAbsent(url, newTask); if (oldOne == null) { // never has a retry task. then start a new task for retry. retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS); }}

subcribe方法逻辑与register基本一致,唯一不同的地方是在向注册中心订阅失败后的处理。

//订阅服务@Overridepublic void subscribe(URL url, NotifyListener listener) { super.subscribe(url, listener); // 从订阅失败列表中移除 removeFailedSubscribed(url, listener); try { // 发送订阅请求 doSubscribe(url, listener); } catch (Exception e) { Throwable t = e; // 如果订阅失败,则从本地缓存中获取服务列表 AbstractRegistry.getCacheUrls List<URL> urls = getCacheUrls(url); if (CollectionUtils.isNotEmpty(urls)) { // 通知被订阅服务 notify(url, listener, urls); logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t); } else { // 如果本地缓存中也没有数据,则检查是否有配置check=true的配置 boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true); boolean skipFailback = t instanceof SkipFailbackWrapperException; // 满足任何一个条件,则订阅失败并抛出异常 if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t); } } // 否则记录订阅失败的url,加入到retryTimer的重试任务中进行重试 addFailedSubscribed(url, listener); }}

subscribe在向注册中心订阅失败后,会从本地缓存中去寻找provider的信息,如果本地缓存也没有则加入到重试任务中,因此当注册中心出现故障只要本地缓存中有provider信息,还是可以正常订阅服务的


当与注册中心重新建立连接后会调用recover方法,并将所有已订阅、已注册的服务全部重新订阅和注册,具体实现也是将所有服务加入到重试任务中,这样就可以确保当重连注册中心时能够及时发布和订阅服务。

//恢复服务//当与注册中心断开并重新建立连接时会调用recover方法//从内存中恢复已经订阅和已经注册的服务@Overrideprotected void recover() throws Exception { // 所有已注册服务 Set<URL> recoverRegistered = new HashSet<URL>(getRegistered()); if (!recoverRegistered.isEmpty()) { if (logger.isInfoEnabled()) { logger.info("Recover register url " + recoverRegistered); } // 加入到retryTimer的重试任务中进行重试 for (URL url : recoverRegistered) { addFailedRegistered(url); } } // 所有已订阅服务 Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed()); if (!recoverSubscribed.isEmpty()) { if (logger.isInfoEnabled()) { logger.info("Recover subscribe url " + recoverSubscribed.keySet()); } for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) { URL url = entry.getKey(); // 加入到retryTimer的重试任务中进行重试 for (NotifyListener listener : entry.getValue()) { addFailedSubscribed(url, listener); } } }}

retryTimer中有四种Task,分别是FailedRegisteredTask;FailedSubscribedTask;FailedUnregisteredTask;FailedUnsubscribedTask,里面的代码逻辑基本一致


以FailedSubscribedTask为例,doRetry调用的是doSubscribe方法并在订阅成功后将重试任务删除

public final class FailedSubscribedTask extends AbstractRetryTask { private static final String NAME = "retry subscribe"; private final NotifyListener listener; public FailedSubscribedTask(URL url, FailbackRegistry registry, NotifyListener listener) { super(url, registry, NAME); if (listener == null) { throw new IllegalArgumentException(); } this.listener = listener; } //重试 @Override protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) { //重新订阅 registry.doSubscribe(url, listener); //从订阅失败Map中删除任务 registry.removeFailedSubscribedTask(url, listener); }}

总结下来自动恢复的原理是当注册、订阅失败时将失败事件封装成一个 Task 任务并放入到重试任务中进行后台重试。


以上是 dubbo-registry-api 的核心相关类,大概内容可以总结为:


  • 定义了注册中心、注册中心工厂的接口规范(AbstractRegistry、AbstractRegistryFactory


  • 提供了相关增强型的抽象父类,包括服务列表对内存缓存、文件持久化等功能(AbstractRegistry)


  • 注册中心对象缓存,保证注册中心单例(AbstractRegistryFactory)


  • 自动恢复注册服务等高可用特性(FailbackRegistry)


下面我们以Zookeeper注册中心为例来看下具体实现


拓展实现层


Zookeeper注册中心类

org.apache.dubbo.registry.zookeeper.ZookeeperRegistry


ZookeeperRegistry 是 自动恢复类 FailbackRegistry 的子类,也就是说 ZookeeperRegistry 拥有上面所提到的自动恢复注册订阅服务、服务缓存等特性


因此 ZookeeperRegistry 主要功能是实现了父类 FailbackRegistry doXXX 模版方法,在父类基础上通过向 Zookeeper 创建、删除节点来实现服务的订阅与发布


public class ZookeeperRegistry extends FailbackRegistry { //注册服务 @Override public void doRegister(URL url) { try { //创建zk节点,是否是临时节点 由DYNAMIC_KEY参数决定 zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } } //取消注册 @Override public void doUnregister(URL url) { try { //删除zk节点 zkClient.delete(toUrlPath(url)); } catch (Throwable e) { throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } } //订阅服务 @Override public void doSubscribe(final URL url, final NotifyListener listener) { try { //判断是否订阅全部服务(ANY_VALUE=*) if (ANY_VALUE.equals(url.getServiceInterface())) { //Root节点 String root = toRootPath(); ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());  //订阅全部服务,从ROOT节点开始订阅所有子节点服务 ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> { for (String child : currentChilds) { child = URL.decode(child); if (!anyServices.contains(child)) { anyServices.add(child); subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child, Constants.CHECK_KEY, String.valueOf(false)), k); } } }); zkClient.create(root, false); List<String> services = zkClient.addChildListener(root, zkListener); if (CollectionUtils.isNotEmpty(services)) { for (String service : services) { service = URL.decode(service); anyServices.add(service); subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service, Constants.CHECK_KEY, String.valueOf(false)), listener); } } }  //不是订阅全部服务 else { List<URL> urls = new ArrayList<>(); for (String path : toCategoriesPath(url)) { ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());  //订阅单个服务节点 ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds))); zkClient.create(path, false); List<String> children = zkClient.addChildListener(path, zkListener); if (children != null) { urls.addAll(toUrlsWithEmpty(url, path, children)); } } notify(url, listener, urls); } } catch (Throwable e) { throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } } //取消订阅服务 @Override public void doUnsubscribe(URL url, NotifyListener listener) { ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); if (listeners != null) { ChildListener zkListener = listeners.get(listener); if (zkListener != null) { //判断是否取消订阅全部服务(ANY_VALUE=*) if (ANY_VALUE.equals(url.getServiceInterface())) { String root = toRootPath(); //删除root下所有子节点 zkClient.removeChildListener(root, zkListener); } else { //不是取消订阅全部服务 for (String path : toCategoriesPath(url)) { //删除单个服务节点 zkClient.removeChildListener(path, zkListener); } } } } }}


ZookeeperRegistry类是FailbackRegistry的一种实现,使用zk实现了服务注册、订阅方法并且当zk出现重连、重新建立回话时会触发 FailbackRegistry 中的自动恢复方法进行重新注册或者订阅服务


Zookeeper注册中心工厂类

org.apache.dubbo.registry.zookeeper.ZookeeperRegistryFactory

public class ZookeeperRegistryFactory extends AbstractRegistryFactory { private ZookeeperTransporter zookeeperTransporter; public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) { this.zookeeperTransporter = zookeeperTransporter; } //创建ZookeeperRegistry对象 @Override public Registry createRegistry(URL url) { return new ZookeeperRegistry(url, zookeeperTransporter); }}

ZookeeperRegistryFactory工厂对象由前面提到的通过自适应SPI根据根据 protocol=zookeeper 参数加载得到


createRegistry 方法会生产真正的 ZookeeperRegistry 注册中心对象


总结


  • Dubbo中支持多种注册中心,通过自适应 SPI 机制在运行时加载注册中心拓展实例



  • 当与注册中心断开重新建立会话时会重新发布与订阅相关服务


参考


http://dubbo.apache.org/zh/docs/v2.7/user/