vlambda博客
学习文章列表

dubbo学习-服务暴露与注册源码剖析

1. 服务导出简介

服务导出的方法入口位于ServiceBean这个类,我们配置的<dubbo:service/>标签或者被@Service注解修饰的bean,都会转换成ServiceBean。ServiceBean继承自ServiceConfig,实现了InitializingBean、ApplicationListener等接口。ServiceBean重写了ApplicationListener的onApplicationEvent方法,在spring容器发布刷新事件时被触发执行这个方法,在onApplicationEvent方法中会执行服务的导出逻辑。整个导出流程可分为三个部分:

  • 第一部分是前置工作,主要对配置参数进行检查,组装成URL;

  • 第二部分是服务导出,服务导出分为本地导出(injvm)和远程导出两个过程;

  • 第三部分是服务注册,向注册中心注册服务信息,用于服务发现。

ServiceBean实现了InitializingBean接口,重写了afterPropertiesSet方法。在Bean的实例化过程中,最后会调用afterPropertiesSet方法,该方法的作用主要是设置相关的配置信息,如<dubbo:provider/><dubbo:applicaiton/><dubbo:registy/>等。在该方法最后,会判断supportApplicationListener的值,如果为false,则不会经过spring容器的刷新通过进行服务导出,而是直接进行服务导出,否则的话,通过spring容器发布刷新事件进行导出。supportedApplicationListener表示当前spring容器是否支持ApplicationListener,这个值的初始值为false,在spring容器将自己设置到ServiceBean的时候,ServiceBean 的 setApplicationContext 方法会检测 Spring 容器是否支持 ApplicationListener。若支持,则将 supportedApplicationListener 置为 true。ServiceBean 是 Dubbo 与 Spring 框架进行整合的关键,可以看做是两个框架之间的桥梁。具有同样作用的类还有 ReferenceBean。

2. 前置工作

从上面我们可以了解到服务的导出有两个地方,一个是afterPropertiesSet方法,如果不支持ApplicationListener,则会在该方法内部调用export方法进行服务导出。另一个地方则是,支持ApplicationListener,spring发布容器刷新机制,在ServiceBean的onApplicationEvent方法内部会调用export方法。而ServiceBean的export方法调用了其父类ServiceConfig的export方法来实现。

public synchronized void export() { checkAndUpdateSubConfigs(); // ①
if (!shouldExport()) { // ② return; }
if (shouldDelay()) { // ③ DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS); } else { doExport(); // ④ }}

doExport方法内部没有做实质性的逻辑,在方法内部主要调用了doExportUrls方法,进行多协议多注册中的服务导出。

①: 检测配置信息。

②: 判断是否需要导出服务。主要判断<dubbo: service/>标签或者<dubbo: provider/>标签配置的export值,缺省配置为true。如果配置的export为false,则不执行导出。

③: 判断是否进行延迟导出。主要判断<dubbo: service/>标签或者<dubbo: provider/>标签配置的delay值,缺省配置为空,表示不延迟暴露。如果delay的值大于0表示延迟暴露,否者不延迟暴露。延迟暴露会通过一个定时任务,延迟delay毫秒数调用doExport方法进行服务导出。

④: 执行服务导出。

doExport方法内部没有做实质性的逻辑,在方法内部主要调用了doExportUrls方法,进行多协议多注册中的服务导出。

2.1 doExportUrls方法实现

private void doExportUrls() { List<URL> registryURLs = loadRegistries(true); // ① for (ProtocolConfig protocolConfig : protocols) { // ② String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version); ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass); ApplicationModel.initProviderModel(pathKey, providerModel); doExportUrlsFor1Protocol(protocolConfig, registryURLs); // ③ }}

①: 加载配置中的<dubbo: registry/>配置,可以配置多个。将配置的信息组装成URL对象。

Registry相关的URL信息中,protocol=registry,表示是注册协议。

②: 循环每个协议,进行多协议多注册中心服务导出。

③: 调用doExportUrlsFor1Protocol方法,逐个协议暴露。

2.2 doExportUrlsFor1Protocol方法实现

在doExportUrlsFor1Protocol方法中,首先会构建服务导出的URL,然后执行服务导出和注册。

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { String name = protocolConfig.getName(); // 协议名称,默认为dubbo if (StringUtils.isEmpty(name)) { name = DUBBO; } // 将配置参数放到map中,key为对应的配置名称,value为配置的值 Map<String, String> map = new HashMap<String, String>(); map.put(SIDE_KEY, PROVIDER_SIDE); appendRuntimeParameters(map); appendParameters(map, metrics); appendParameters(map, application); appendParameters(map, module); appendParameters(map, provider); appendParameters(map, protocolConfig); appendParameters(map, this); // 解析MethodConfig对象设置的方法级别的配置,并将参数放到map中 if (CollectionUtils.isNotEmpty(methods)) { for (MethodConfig method : methods) { // 遍历每个method配置,添加方法级别的配置信息 } // end of methods for } // 判断调用类型,如果是泛化调用,则设置泛化类型 if (ProtocolUtils.isGeneric(generic)) { map.put(GENERIC_KEY, generic); map.put(METHODS_KEY, ANY_VALUE); } else { // 如果不是泛化调用,设置对应的配置值 String revision = Version.getVersion(interfaceClass, version); if (revision != null && revision.length() > 0) { map.put(REVISION_KEY, revision); } // 获取接口定义的所有方法,将所有的方法名用,分割作为value,key为methods,方法map String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); if (methods.length == 0) { logger.warn("No method found in service interface " + interfaceClass.getName()); map.put(METHODS_KEY, ANY_VALUE); } else { map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); } } // 判断调用该service服务是否需要token,如果需要将配置的token信息放入map if (!ConfigUtils.isEmpty(token)) { if (ConfigUtils.isDefault(token)) { map.put(TOKEN_KEY, UUID.randomUUID().toString()); } else { map.put(TOKEN_KEY, token); } } // 执行服务导出流程
// 获取服务host,默认为本机ip地址 String host = this.findConfigedHosts(protocolConfig, registryURLs, map); // 获取服务暴露的端口,默认为20880 Integer port = this.findConfigedPorts(protocolConfig, name, map); // 将map中的参数信息,构建成URL对象 URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map); if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .hasExtension(url.getProtocol())) { url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .getExtension(url.getProtocol()).getConfigurator(url).configure(url); } // 获取配置的scope值 String scope = url.getParameter(SCOPE_KEY); // 如果配置的scope值为none,则不暴露服务 if (!SCOPE_NONE.equalsIgnoreCase(scope)) { // 如果配置的scope值不为remote,则进行本地暴露 if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) { exportLocal(url); } // 如果配置的scope值不为local,则进行远程暴露 if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) { if (!isOnlyInJvm() && logger.isInfoEnabled()) { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } if (CollectionUtils.isNotEmpty(registryURLs)) { // 循环每一个注册地址,进行服务暴露 for (URL registryURL : registryURLs) { //if protocol is only injvm ,not register if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { continue; } url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY)); // 加载监控中心的配置信息,并放入到URL中 URL monitorUrl = loadMonitor(registryURL); if (monitorUrl != null) { url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString()); } if (logger.isInfoEnabled()) { logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); } // 获取配置的proxy值,用来指定用哪个proxy工厂的getInvoker方法生成invoker类的 String proxy = url.getParameter(PROXY_KEY); if (StringUtils.isNotEmpty(proxy)) { registryURL = registryURL.addParameter(PROXY_KEY, proxy); } // 将服务的实现类转换成为Invoker对象 Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())); // 对Invoker对象进行一次封装 DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); // 执行服务暴露 Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } } else { // ...省略不相关代码... } // ...省略不相关代码... } } this.urls.add(url);}

该方法在执行服务导出之前,先会读取服务的配置信息,将其放入map中,map的key为配置的名称, value为配置的值。在最后构建url的地方,将协议名称name、host,port、map等信息进行封装,转换为URL对象。

dubbo学习-服务暴露与注册源码剖析

将配置信息转换为URL对象之后,然后就进行导出。首先会获取服务的scope配置,如果配置的scope值为none,则不执行服务导出。如果scope的不是remote,则进行本地导出。如果scope的值不是local,则进行远程导出。所以scope的缺省配置下,会进行本地和远程导出。

3. 服务导出

服务的导出分为本地导出和远程导出,本地暴露使用injvm协议,不会开启端口,不发起远程调用,只在jvm内关联。相比远程调用而言,不需要注册到注册中心。绑定的host:port 为127.0.0.1:0。这里不聚焦此处。主要看远程导出的实现。这个步骤总体分两步,首先是将本地接口实现类转换成为Invoker类;然后将Invoker进行导出,转换为Exporter类。

3.1 本地接口实现类转换为Invoker实现

在进行服务导致之前,会将本地接口实现类转换成为Invoker对象。通过调用ProxyFactory的getInvoker方法来实现。

// 获取配置的proxy值,用来指定用哪个proxy工厂的getInvoker方法生成invoker类的String proxy = url.getParameter(PROXY_KEY);if (StringUtils.isNotEmpty(proxy)) { registryURL = registryURL.addParameter(PROXY_KEY, proxy);}Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));

配置的proxy的值表示使用哪个proxy工厂来生成Invoker对象。proxy的值也会通过放到URL中,进行传递。缺省配置为javassist。

3.1.1 ProxyFactory的getInoker方法实现

PROXY_FACTORY在ServiceConfig的定义如下;

private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

PROXY_FACTORY是ProxyFactory的自适应扩展类ProxyFactory$Adaptive,getInvoker方法被@Adaptive注解修饰,我们可以看看自适应扩展类ProxyFactory$Adaptive的getInvoker方法内容。

public org.apache.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, org.apache.dubbo.common.URL arg2) throws org.apache.dubbo.rpc.RpcException { if (arg2 == null) throw new IllegalArgumentException("url == null"); org.apache.dubbo.common.URL url = arg2; String extName = url.getParameter("proxy", "javassist"); if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])"); org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName); return extension.getInvoker(arg0, arg1, arg2); }

从getInvoker方法的内容可以看出,内部实现会通过proxy字段值来选择具体的扩展实现类的getInvoker实现。缺省配置的扩展了名称为javassist,也就是说ProxyFactory的默认自适应扩展类为JavassistFactory。

dubbo提供了基于jdk和javassit的ProxyFactory实现,JdkProxyFactory和JavassistProxyFactory。

3.1.1.1 JdkProxyFacotry的getInvoker实现

我们先来看下getInvoker方法在JdkProxyFactory里的实现。

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // 通过反射获取proxy的方法,然后进行调用 return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { Method method = proxy.getClass().getMethod(methodName, parameterTypes); return method.invoke(proxy, arguments); } };}

该方法返回一个AbstractProxyInvoker对象,重写了doInvoker方法。doInvoker方法的原理就是通过反射的方式来调用目标方法。

3.1.1.2 JavassitProxyFactory的getInvoker实现

基于javassist的getInvoker实现。

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { /** * JavassitProxyFactory作为ProxyFactory的默认自适应实现,getInvoker方法通过创建Wrapper子类,对proxy类进行包装,返回AbstractProxyInvoker对象。 * 在wrapper子类中实现invokerMethod方法,方法体内会为每个proxy的方法做方法名和方法参数匹配校验,匹配成共则进行调用。相比JdkProxyFactory实现,省去了反射调用的开销。 */ final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } };}

方法也返回一个AbstractProxyInvoker对象,重写了doInvoker方法。和JdkProxyFactory不同的是,JavassistProxyFactory会生成一个Wrapper类。Wrapper了是一个包装类,用于包装目标类,尽可以通过getWrapper方法创建子类。在创建wrapper子类的过程中,子类代码生成逻辑会对传入的class信息进行解析,拿到诸如方法、成员变量等信息。以及生成invokerMenthod方法和其它一些方法代码。代码生成完毕后,通过javassist生成class对象,最后再通过反射生成Wrapper实例。

和基于Jdk的实现不同的是,基于Javassit的实现,在调用过程中,jdk每次都是基于反射调用,而javassist通过生成包装类,通过包装类去完成实际调用,相比之下,javassist消耗性能更小。

为了携带服务的配置元数据信息,通过DelegateProviderMetaDataInvoker来对Invoker类进行封装,DelegateProviderMetaDataInvoker实现了Invoker接口,并持有Invoker对象的引用和配置元数据信息的引用。接下来的导出流程将基于这个类进行。

3.1.2 小结

本地接口进行导出之前,需要将接口实现类转换为Invoker对象。dubbo默认提供了基于Jdk和Javassit的两种实现,基于Javassit的实现减少了反射调用,性能更好。

3.2 将Invoker对象转换为Exporter

实现Invoker对象转换成为Exporter对象是在DubboProtocol中实现的,但是根据Invoker对象的URL信息可以发现,URL的信息中将registry的配置信息和服务的配置信息结合在了一起,完成这一操作的就是实现本地接口转换成Invoker的实现。

Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));

registryURL.addParameterAndEncoded这个方法,将url(也就是服务的配置信息)都结合到了一次。

dubbo学习-服务暴露与注册源码剖析

从图中可以看出,url信息中包含了注册信息和服务导出信息,使用export字段将服务信息结合,此时url的protocol值为registry。

服务导出的实现是调用Protocol的export方法,Protocol是一个扩展接口,export方法被@Adaptive注解修饰。

Exporter<?> exporter = protocol.export(wrapperInvoker);

3.2.1 Protocol的export方法实现

上面说到Invoker实现Expoter的转换是通过Protocol的export方法实现,protocol变量在ServiceConfig的定义如下:

private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

protocol是Protocol扩展接口的自适应扩展类Protocol$Adaptive,可以看下export方法在Protocol$Adaptive类中的内容:

public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");org.apache.dubbo.common.URL url = arg0.getUrl();String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);return extension.export(arg0);}

可以看到export方法会从url中获取protocol的值作为具体的扩展类名称,默认为dubbo。然后根据扩展类名称获取具体的扩展实现类,再调用这个类的export方法。

和服务导出相关的Protocol扩展实现类有RegistryProtocol、DubboProtocol、ProtocolFilterWrapper、ProtocolListenerWrapper等。

当然这里只针对基于dubbo协议的服务导出,如果是基于其它协议的,诸如WebServiceProtocol、HttpProtocol、RedisProtocol、XmlRpcProtocol等,可以看它们的具体实现。

3.2.1.1 RegistryProtocol的export方法实现

上面说到调用protocol.export方法是,invoker对象中的url的protocol值为registry,所以根据spi的自适应机制,此时export的具体实现在RegistryProtocol的export方法。但其实在进入到RegistryProtocol的export内部之前,现需要经过ProtocolFilterWrapper和ProtocolListenerWrapper的调用,这两个类是扩展接口的包装类,这个两个类的export方法实现,对于protocol值为registry不做任何操作,直接调用RegistryProtocol的export。

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { // 取出注册中心的url URL registryUrl = getRegistryUrl(originInvoker); // 取出服务暴露的url URL providerUrl = getProviderUrl(originInvoker);
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener); // 执行本地暴露,将Invoker转换为Exporter final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// 省略服务注册的代码}

该方法总体分为三个步骤: 1,将URL中的注册信息和服务导出信息分类;2,进行服务导出;3,进行服务注册。这里只关注1,2步,先不用关注服务注册的实现。

registryUrl是服务注册的URL信息。提取方法实现如下:

private URL getRegistryUrl(Invoker<?> originInvoker) { URL registryUrl = originInvoker.getUrl(); // 原始的url信息,registry开头 if (REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) { // 是registry协议才提取 // 获取url中参数名为registry的值,因为配置的<dubbo: registry="zookeeper://localhost:2181">,所以protocol值为zookeeper String protocol = registryUrl.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY);  // 将协议修改为具体的注册协议(zookeeper),并去除registry参数,形成新的registryUrl信息 registryUrl = registryUrl.setProtocol(protocol).removeParameter(REGISTRY_KEY); } return registryUrl;}

providerUrl是服务导出的URL信息,提取方法实现如下:

private URL getProviderUrl(final Invoker<?> originInvoker) { String export = originInvoker.getUrl().getParameterAndDecoded(EXPORT_KEY); // 取出URL中参数为export的值 if (export == null || export.length() == 0) { throw new IllegalArgumentException("The registry export url is null! registry: " + originInvoker.getUrl()); } return URL.valueOf(export);}

dubbo学习-服务暴露与注册源码剖析

从图可以看出,此时registryUrl的协议为zookeeper,providerUrl的协议是dubbo。

拿到providerUrl之后,就会调用doLocalExport方法尽心服务导出。

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) { // 服务暴露的key,服务暴露之后,会放入map缓存 String key = getCacheKey(originInvoker); 
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> { Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl); // protocol.export方法,配置的dubbo协议,所以调用DubboProtocol#export方法,但是会先经过ProtocolFilterWrapper和ProtocolListenerWrapper return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker); });}

因为我们知道此时providerUrl的协议是dubbo,所有将Invoker转换为Exporter的实现在DubboProtocol的export方法中。

3.2.1.1 DubboProtocol的export方法实现

在DubboProtocol的export方法内部,会开启NettyServer进行服务监听。而在进入DubboProtocol的export方法内部之前,也会进入到ProtocolFilterWrapper和ProtocolListenerWrapper的调用。

ProtocolListenerWrapper注入了监听器列表,ExporterListener也是一个SPI扩展接口,我们可自己实现该扩展点,然后在服务完成导出之后,回到ExporterListener的exported方法做些其它的事情。

ProtocolFilterWrapper的export方法会为Invoker对象构建一个调用链。最终返回一个 CallbackRegistrationInvoker对象。

private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { Invoker<T> last = invoker; // 加载所有的Filter自动激活扩展实现类,也就是所有的被@Activate注解修饰的扩展实现类 List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); // 构建调用链,filters或根据@Activate注解设置的排序字段进行排序,构建的调用链Filter实现类在前,invoker在最后 if (!filters.isEmpty()) { for (int i = filters.size() - 1; i >= 0; i--) { final Filter filter = filters.get(i); final Invoker<T> next = last; last = new Invoker<T>() { // 省略无关代码 @Override public Result invoke(Invocation invocation) throws RpcException { Result asyncResult; try { asyncResult = filter.invoke(next, invocation); } catch (Exception e) { // onError callback if (filter instanceof ListenableFilter) { Filter.Listener listener = ((ListenableFilter) filter).listener(); if (listener != null) { listener.onError(e, invoker, invocation); } } throw e; } return asyncResult; } // 省略无关代码 }; } }
return new CallbackRegistrationInvoker<>(last, filters);}

dubbo学习-服务暴露与注册源码剖析

最后方法返回的CallbackRegistrationInvoker对象,它是ProtocolFilterWrapper的内部类, 实现Invoker接口。封装了Invoker调用链和所有的过滤器。

static class CallbackRegistrationInvoker<T> implements Invoker<T> {
private final Invoker<T> filterInvoker; // 封装了filter的invoker调用链,真正的调用实现类在链尾 private final List<Filter> filters; // 所有的过滤器实现类
public CallbackRegistrationInvoker(Invoker<T> filterInvoker, List<Filter> filters) { this.filterInvoker = filterInvoker; this.filters = filters; }}

完成调用链的构建之后,就会进入到DubboProtocol的export方法完成服务导出。基本流程就是讲Invoker转换为Exporter,然后放到exportMap缓存中,然后调用operServer方法开启NettyServer,进行服务监听。

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl();
// 根据url获取服务暴露的key String key = serviceKey(url); // ① // 将Invoker对象转换成为Exporter DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); // ② // 将Exporter放入map缓存 exporterMap.put(key, exporter);
//export an stub service for dispatching event Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); }
} else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } // 开启NettyServer,进行监听 openServer(url); // ③ optimizeSerialization(url);
return exporter;}


①: 根据invoker的url构建服务的key。

②: 将Invoker对象转换为Exporter,并放入exporterMap中。

③: 调用openServer方法开启NettyServer,进行服务监听。

3.2.2 openServer方法实现

private void openServer(URL url) { // 获取提供者机器地址+ip String key = url.getAddress(); // 只有提供者才需要启动监听 boolean isServer = url.getParameter(IS_SERVER_KEY, true); if (isServer) { // 先从缓存中获取server,缓存中没有的话,则通过createServer创建。每个机器的ip:port是唯一的,所以只会创建一次NettyServer,之后都是从缓存中取 ExchangeServer server = serverMap.get(key); if (server == null) { synchronized (this) { server = serverMap.get(key); if (server == null) { serverMap.put(key, createServer(url)); } } } else { // server supports reset, use together with override server.reset(url); } }}

该方法实现首先会判断当前URL是否为服务提供方的信息,只有是提供者才需要启动监听。然后会先从serverMap中根据ip:port获取ExchangeServer对象,每个ip:port是唯一的,所以只会创建一次Server对象,之后都是从缓存中获取。如果缓存中没有,调用调用createServer方法创建Server,在双重检测的机制下确保只创建一次。

3.2.3 createServer方法实现

dubbo默认使用netty作为网络通讯服务器。

private ExchangeServer createServer(URL url) { url = URLBuilder.from(url) // 当服务准备关闭时,运行发送只读事件 .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()) // 开启心跳机制 .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT)) .addParameter(CODEC_KEY, DubboCodec.NAME) .build(); // ① // 获取使用了那种Server,默认使用Netty String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER); // ②
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { throw new RpcException("Unsupported server type: " + str + ", url: " + url); }
ExchangeServer server; try { server = Exchangers.bind(url, requestHandler); // ③ } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); }
str = url.getParameter(CLIENT_KEY); if (str != null && str.length() > 0) { Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: " + str); } } return server;}


①: 向URL添加其它参数信息。如服务准备关闭,是否开启只处理只读事件;是否开启心跳机制;

②: 获取配置的server字段值,默认为netty。dubbo也提供了基于如Mina服务的实现。

③: 调用Exchangers.bind方法开启服务。

Exchangers.bind方法中首先获取Exchanger对象,Exchanger是扩展点接口,默认扩展实现了是HeaderExchanger。所以进到HeaderExchanger的bind方法。

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));}

方法最终返回HeaderExchangeServer对象。在返回之前会先调用Transporters.bind方法,创建一个Server对象。创建Server对象,会传入一个DecodeHandler,创建DecodeHandler会传入HeaderExchangeHandler,创建HeaderExchangeHandler又会传入一个ExchangeHandler,这个ExchangeHandler是位于DubboProtocol的ExchangeHandlerAdapter实例——requestHandler。ExchangeHandlerAdapter实现了ExchangeHandler接口。

通过上面的分析,Transporters.bind方法传入的ChannelHandler实例为DecoderHandler。

public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handlers == null || handlers.length == 0) { throw new IllegalArgumentException("handlers == null"); } ChannelHandler handler; if (handlers.length == 1) { handler = handlers[0]; } else { handler = new ChannelHandlerDispatcher(handlers); } // 获取Transporter扩展类实例,然后执行bind操作。getTransporter方法返回的是自适应扩展类。 // bind方法会先根据server的值来决定使用那个扩展实现类,如果server值为空,则使用transporter的值来决定使用哪个扩展实现类 // 这里server为netty,所以调用NettyTransporter#bind方法,并发挥NettyServer对象 return getTransporter().bind(url, handler);}

该方法的实现,也是通过URL中的参数来决定选择Transporter扩展接口的实际扩展实现类。getTransporter方法返回Transporter$Adaptive自适应类,根据bind方法上的@Adaptive注解配置key,从URL中选择参数为key的值。默认为netty,所以会调用NettyTransporter的bind方法。

public Server bind(URL url, ChannelHandler listener) throws RemotingException { return new NettyServer(url, listener);}

NettyTransporter的bind方法,返回一个NettyServer实例。接下来看NettyServer的构造函数。

public NettyServer(URL url, ChannelHandler handler) throws RemotingException { // you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants. // the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));}

构造函数主要是调用父类的构造函数。在调用父类构造函数之前,会调用ChannelHandlers.wrap方法,对传进来的ChannelHandler进行封装,此时传进来的ChannelHandler为DecoderHandler。

public static ChannelHandler wrap(ChannelHandler handler, URL url) { return ChannelHandlers.getInstance().wrapInternal(handler, url);}protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) { // ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url) 默认Dispatcher为all,所以返回AllChannelHandler
// MultiMessageHandler -> HeartbeatHandler -> AllChannelHandler  return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url)));}

wrap方法内部调用wrapInternal方法对Handler进行封装。直接看最后一句,Dispatcher是一个扩展接口,dispatch方法被@Adaptive注解修饰,根据URL中的dispatcher、dispather、channel.handler参数值来选择具体的扩展实现类。这里默认是all,所以dispatch方法的默认实现是AllDispatcher的dispatch方法。AllDispatcher的dispatch方法,直接返回AllChannelHandler对象。

public ChannelHandler dispatch(ChannelHandler handler, URL url) { return new AllChannelHandler(handler, url);}


所以warp方法将传进来的DecoderHandler进行封装后,返回的是MultiMessageHandler对象。内部的引用关系为:

MultiMessageHandler -> HeartbeatHandler -> AllChannelHandler -> DecoderHandler -> HeaderExchangeHandler -> ExchangeHandlerAdapter。

完成Handler的封装之后,就会进入到父类AbstractServer的构造函数,这时传进来的ChannelHandler实例为MultiMessageHandler。

public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); // ① // 本地地址 localAddress = getUrl().toInetSocketAddress(); // 绑定的ip String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost()); // 绑定端口 int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort()); if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) { bindIp = ANYHOST_VALUE; } bindAddress = new InetSocketAddress(bindIp, bindPort); // ② this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS); // ③ this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT); // ④ try { // 启动服务监听 doOpen(); // ⑤ if (logger.isInfoEnabled()) { logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress()); } } catch (Throwable t) { throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t); } //fixme replace this with better method DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));}

①: 继续调用父类的构造函数,设置handler、timeout、connectTimeout、codec、url等属性值。

③: 获取服务器允许最大连接数。默认为0,表示不限制。

④: 获取连接空闲时间。默认为600s。

⑤: 调用doOpen方法,开启服务监听。

doOpen方法在AbstractServer是一个抽象方法,有子类实现。具体看NettyServer的doOpen方法。

protected void doOpen() throws Throwable { bootstrap = new ServerBootstrap(); // 创建ServerBootStrap  // ① // 创建主线程组 bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true)); // 创建工作线程组 workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), new DefaultThreadFactory("NettyServerWorker", true));  // 创建NettyServerHandler ② final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this); channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { // FIXME: should we use getTimeout()? int idleTimeout = UrlUtils.getIdleTimeout(getUrl()); // ③ NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug .addLast("decoder", adapter.getDecoder()) // 添加解码器Handler到管道 .addLast("encoder", adapter.getEncoder()) // 添加编码码器Handler到管道 .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS)) .addLast("handler", nettyServerHandler); // 添加处理业务的handler到管道 } }); // bind ChannelFuture channelFuture = bootstrap.bind(getBindAddress()); channelFuture.syncUninterruptibly(); channel = channelFuture.channel();}

这就是一个典型的Netty开启服务的实现。

①: Dubbo使用了Netty的reactor模式,使用主从多线程来出来连接事件和业务事件。

②: 创建处理业务的Handler。而构造函数传入的ChannelHandler对象是NettyServer对象,从NettyServer的继承关系可以,NettyServer继承自AbstractPeer,而AbstractPeer实现了ChannelHandler接口,所有NettyServer本质上也会一个ChannelHandler。根据NettyServer的构造函数可知,此时NettyServer对象内部持有的ChannelHandler对象是MultiMessageHandler。

③: 添加编码器handler、解码器handler和处理业务的handler对象到pipeline中。

经过上述开启NettyServer的过程,ChannelHandler的最终的连接关系如下图:

dubbo学习-服务暴露与注册源码剖析

NettyServer接收到一个请求之后,就会逐一经过这里ChannelHandler,最后到达ExchangeHandlerAdapter实例,也就是DubboProtocol的requestHandler,最终由它来完成业务处理。这块会在后面的内容中讲到。

3.3 服务导出小结

服务的导出是将接口实现类转换成Invoker对象,Invoker转换成Exporter对象的过程。首先ServiceConfig类拿到对外提供服务的的实际类ref(例如:DemoServiceImpl),然后通过ProxyFactory类的getInvoker方法,将ref转换为一个AbstractProxyInvoker实例,到这一步就完成了接口实现类到Invoker的转换。Invoker转换为Exporter的过程,根据具体的导出协议,如dubbo,则会在DubboProtocol中的export方法完成转换。

4. 服务注册

在分析服务导出的地方,只给出了RegistryProtocol的export方法的服务导出部分代码。完成服务的导出之后,接下来就会执行服务注册的逻辑。

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { /** * 省略服务导出的代码 */ // 根据注册中心url获取对应的Registry扩展实现类。这里配置的是注册协议是zookeeper,所以registry对象是ZookeeperRegistry final Registry registry = getRegistry(originInvoker); // ① final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl); // ② ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); // 根据配置判断,是否需要注册服务 boolean register = registeredProviderUrl.getParameter("register", true); // ③ if (register) { // 进行服务注册 register(registryUrl, registeredProviderUrl); // ④ providerInvokerWrapper.setReg(true); } // Deprecated! Subscribe to override rules in 2.6.x or before. registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
exporter.setRegisterUrl(registeredProviderUrl); exporter.setSubscribeUrl(overrideSubscribeUrl); //Ensure that a new exporter instance is returned every time export return new DestroyableExporter<>(exporter);}

①: 根据Invoker对象获取Registry对象。

②: 获取服务提供者需要注册到注册中心的URL信息。

③: 从registeredProviderUrl获取register参数信息,也就是配置的register值,如果为false,则不会进行服务注册,反正进行服务注册。默认为true。

④: 进行服务注册。

4.1 getRegistry方法实现

进行服务注册之前,需要通过Invoker中的URL信息获取Registry对象。

private Registry getRegistry(final Invoker<?> originInvoker) { URL registryUrl = getRegistryUrl(originInvoker); return registryFactory.getRegistry(registryUrl);}

方法逻辑首先会获取registryUrl,getRegistryUrl方法在服务导出时已经分析过了,registryUrl对象的protocol值为zookeeper;然后调用RegistryFactory的getRegistry方法获取Registry实例。

RegistryFactory是一个扩展接口,getRegistry方法被@Adaptive注解修饰,根据protocol的值来选择具体的实现类。这里registryUrl的protocol值为zookeeper,所有getRegistry方法实现要看ZookeeperRegistryFactory。ZookeeperRegistryFactory继承自AbstractRegistryFactory,getRegistry方法在父类中已经被实现。

public Registry getRegistry(URL url) { url = URLBuilder.from(url) .setPath(RegistryService.class.getName()) .addParameter(INTERFACE_KEY, RegistryService.class.getName()) .removeParameters(EXPORT_KEY, REFER_KEY) .build(); String key = url.toServiceStringWithoutResolving(); // Lock the registry access process to ensure a single instance of the registry LOCK.lock(); try { // 先从缓存中获取,没有的话通过createRegistry方法创建 Registry registry = REGISTRIES.get(key); if (registry != null) { return registry; } // createRegistry方法是一个抽象方法,有子类实现,因为此时的注册协议为zookeeper,所以看ZookeeperRegistryFactory#createRegistry方法 registry = createRegistry(url); if (registry == null) { throw new IllegalStateException("Can not create registry " + url); } // 放入缓存 REGISTRIES.put(key, registry); return registry; } finally { // Release the lock LOCK.unlock(); }}

该方法主要从缓存中获取Registry实例,如果缓存中没有就调用createRegistry方法创建,然后在放入缓存。通过ReentrantLock来确保Registry不会被重复创建。createRegistry方法是一个抽象方法,由子类去实现,因为此时的protocol值为zookeeper,所有看ZookeeperRegistryFactory的createRegistry方法实现。

public Registry createRegistry(URL url) { // 返回一个ZookeeperRegistry对象 return new ZookeeperRegistry(url, zookeeperTransporter);}

该方法简单粗暴的返回了一个ZookeeperRegistry对象。

4.1.1 ZookeeperRegistry构造函数实现

ZookeeperRegistry继承自FailbackRegistry,构造函数中首先会调用父类的构造函数,主要是设置注册失败后重试机制。

public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { // 父类的构造函数,会设置注册失败的轮询 super(url); // ① if (url.isAnyHost()) { throw new IllegalStateException("registry address == null"); } String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT); if (!group.startsWith(PATH_SEPARATOR)) { group = PATH_SEPARATOR + group; } this.root = group; // 连接上zkServer zkClient = zookeeperTransporter.connect(url); // ② // 添加状态监控器,当发生重连接事件时,对注册和订阅信息进行覆盖 zkClient.addStateListener(state -> { // ③ if (state == StateListener.RECONNECTED) { try { recover(); } catch (Exception e) { logger.error(e.getMessage(), e); } } });}

①: 调用父类构造函数,设置注册失败重试。基于HashedWheelTimer,默认每5s重试一次,最多重试128次。

②: 连接Zookeeper服务器。

③: 添加Zookeeper状态监听器,当发生重连接事件时,对注册信和订阅信息进行覆盖。

4.2 执行服务注册

经过上面的步骤,获取到了ZookeeperRegistry对象。接下来就会执行注册逻辑。在执行注册逻辑之前,会获取通过调用getRegisteredProvinceUrl方法获取提供者的信息。简单理解处理过程就是: 将服务暴露的URL信息进行了修改,去掉了如monitor、bind.ip、bind.port、qos.enable、interfaces等的参数信息。

拿到更新之后的服务URL信息之后,从URL信息中获取register参数值,如为false,则不进行服务注册。默认为true,进行服务注册。

public void register(URL registryUrl, URL registeredProviderUrl) { // RegistryFactory是个扩展接口,getRegistry方法被@Adaptive修饰,通过protocol的值选择具体的扩展实现类。 // RegisterFactory的扩展实现类都继承了AbstractRegisterFactory类,所以先回执行AbstractRegisterFactory#getRegistry方法,返回ZookeeperRegistry对象 Registry registry = registryFactory.getRegistry(registryUrl); // ZookeeperRegistry对象继承FailbackRegistry,先执行FailbackRegistry#register方法 registry.register(registeredProviderUrl);}

register方法中会再次获取Registry实例,因为之前已经创建过了,所以这次直接从缓存中获取即可。因为此时注册协议为zookeeper,ZookeeperRegistry继承了FailbackRegistry,且父类对register方法做了实现。

public void register(URL url) { super.register(url); removeFailedRegistered(url); removeFailedUnregistered(url); try { // 抽象方法,由子类实现,这里是ZookeeperRegistry doRegister(url); } catch (Exception e) { // 省略异常处理代码
// Record a failed registration request to a failed list, retry regularly addFailedRegistered(url); }}

该方法的主体实现还是需要看子类ZookeeperRegistry的doRegister方法实现。

public void doRegister(URL url) { try { // 通过调用zkClient.create方法注册服务节点信息到zookeeper zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); }}

该方法通过zkClient向Zookeeper服务器创建url相关的目录。

public void create(String path, boolean ephemeral) { if (!ephemeral) { if (checkExists(path)) { return; } } int i = path.lastIndexOf('/'); if (i > 0) { create(path.substring(0, i), false); } // 判断是否是临时节点 if (ephemeral) { createEphemeral(path); } else { // 第一层的root节点,也就是/dubbo目录是持久节点 // 第二层的service节点,也就是/dubbo/xxx.xxx.xxxService目录是持久节点 // 第三层的type节点,也就是/dubbo/xxx.xxx.xxxService/providers目录是持久化节点 createPersistent(path); }}

create方法对path进行递归创建,如果path是临时节点,则会创建临时目录,反正创建持久目录。在dubbo服务中,第一层的root节点、第二层的service节点、第三层的type节点,都是持久化目录,其余是临时目录。

4.3 服务注册小结

服务注册总体分为两步,首先根据协议获取Registry实例,之后通过注册中心实例注册服务。注册服务的过程也就是向Zookeeper服务器创建服务相关信息的目录。