vlambda博客
学习文章列表

Dubbo系列之Dubbo 服务暴露


https://blog.csdn.net/Impassive_y/article/details/108720737

1、Dubbo配置解析

1.1 配置解析

首先理解一个配置文件中元素的含义:

<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd"
>


<dubbo:protocol name="dubbo"/>

<bean id="demoService" class="org.apache.dubbo.demo.provider.DemoServiceImpl"/>

<beams/>
  1. xmlns:dubbo : 声明前缀为dubbo的命名空间,后面的链接是赋予dubbo一个唯一的名字。他的值与xsi:schemaLocation中指定的第二个值http://dubbo.apache.org/schema/dubbo/dubbo.xsd相关,是里面的targetNamespace,如果他们的值不一致,编译会抛出异常

Caused by: org.xml.sax.SAXParseException; systemId: http://dubbo.apache.org/schema/dubbo/dubbo.xsd; lineNumber: 6; columnNumber: 68; TargetNamespace.1: 应为名称空间 'dubbo', 但方案文档的目标名称空间为 'http://dubbo.apache.org/schema/dubbo'

除此之外,也需要与META-INF/下的spring.handler中的key相对应,指定当遇到这类命名空间时,Spring使用指定的Handler去加载该配置,如果不一致会抛出异常:

Exception in thread "main" org.springframework.beans.factory.parsing.BeanDefinitionParsingException: Configuration problem: Unable to locate Spring NamespaceHandler for XML schema namespace [http://dubbo.apache.org/schema/dubbo]
  1. xmlns : 什么一个默认的命名空间,所有没有带前缀的元素,都会默认使用这个命名空间

  2. xsi:schemaLocation : 这个需要搭配命名空间使用。前面的http://dubbo.apache.org/schema/dubbo代表一个命名空间的名称,后面的http://dubbo.apache.org/schema/dubbo/dubbo.xsd代表供命名空间使用的XML schema的位置

  3. 对于里面的元素,比如<dubbo:protocol />则会使用到dubbo对应的xsd -> http://dubbo.apache.org/schema/dubbo/dubbo.xsd;对应里面的protocol,具体的元素类型,对应里面的type属性指定的值。

1.1.1 基于schema设计解析

在Dubbo-config-spring下,有一个dubbo.xsd用于约束使用XML配置时的标签和对应的属性。Spring在解析到对应的一个命名空间的时候,会去spring.handlers和spring.schemas两个文件中,查找对应的一个Handler,用于解析对应的XML配置信息。例如

http\://dubbo.apache.org/schema/dubbo=org.apache.dubbo.config.spring.schema.DubboNamespaceHandler
http\://code.alibabatech.com/schema/dubbo=org.apache.dubbo.config.spring.schema.DubboNamespaceHandler

当spring遇到http://dubbo.apache.org/schema/dubbo的时候,会使用DubboNamespaceHandler进行解析。

1.1.2 基于XML配置原理解析

从上面可知,XML的配置的约束的dubbo.xsd的作用,以及他的解析Handler。

public void init() {
registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
registerBeanDefinitionParser("config-center", new DubboBeanDefinitionParser(ConfigCenterBean.class, true));
registerBeanDefinitionParser("metadata-report", new DubboBeanDefinitionParser(MetadataReportConfig.class, true));
registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
registerBeanDefinitionParser("metrics", new DubboBeanDefinitionParser(MetricsConfig.class, true));
registerBeanDefinitionParser("ssl", new DubboBeanDefinitionParser(SslConfig.class, true));
registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
}

这一块主要是Spring的XML的解析逻辑,会将对应的一个element交给指定的一个Parser进行解析,会保存到一个map里面,以后遇到该element的时候,就使用对应的一个Parser进行解析。
解析的步骤

  1. 替换Id里面的占位符信息,得到一个ID

  2. 如果ID为空,则根绝名字生成ID,如果名字也为空,则使用类的名称

  3. 判断当前的容器中,是否存在一个相同的ID,存在的话,则抛出异常,不存在则注册该bean的信息

  4. 并设置propertyValue的值,添加ID信息

  5. 根据元素不同的类型,去解析不同的配置信息

  6. 找出xml中配置的所有的参数,并放入到beanDefinition中property中

1.1.3 基于注解配置原理解析

1.1.3.1 @EnableDubbo

EnableDubbo注解的作用是启用Dubbo的服务,当Spring在启动初始化的时候,会获取到对应配置头上的注解信息。在@EnableDubbo注解的头部,有两个注解比较重要@EnableDubboConfig和@DubboComponentScan。这两个注解中分别包含了一个@Import注解。

  • @EnableDubboConfig

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
@Import(DubboConfigConfigurationRegistrar.class)
public @interface EnableDubboConfig {

/**
* It indicates whether binding to multiple Spring Beans.
*
* @return the default value is <code>true</code>
* @revised 2.5.9
*/

boolean multiple() default true;

}

@Import注解中的值DubboConfigConfigurationRegistrar实现了ImportBeanDefinitionRegistrar,Spring容器在加载的时候会去加载@Import注解,并且执行其中值指定的类中的registerBeanDefinitions方法。

private void collectImports(SourceClass sourceClass, Set<SourceClass> imports, Set<SourceClass> visited)
throws IOException {

if (visited.add(sourceClass)) {
for (SourceClass annotation : sourceClass.getAnnotations()) {
String annName = annotation.getMetadata().getClassName();
if (!annName.startsWith("java") && !annName.equals(Import.class.getName())) {
collectImports(annotation, imports, visited);
}
}
imports.addAll(sourceClass.getAnnotationAttributes(Import.class.getName(), "value"));
}
}

对去递归的遍历所有的注解,如果注解的名称不是Import并且不是java开头的,则都会进行遍历。然后会去执行registerBeanDefinitions方法。

public class DubboConfigConfigurationRegistrar implements ImportBeanDefinitionRegistrar {

@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {

AnnotationAttributes attributes = AnnotationAttributes.fromMap(
importingClassMetadata.getAnnotationAttributes(EnableDubboConfig.class.getName()));

boolean multiple = attributes.getBoolean("multiple");

// Single Config Bindings
registerBeans(registry, DubboConfigConfiguration.Single.class);
// 配置的值是以dubbo.xxxs开头的,可以根据Multiple中的注解判断。其主要作用的是Multiple中的注解
if (multiple) { // Since 2.6.6 https://github.com/apache/dubbo/issues/3193
registerBeans(registry, DubboConfigConfiguration.Multiple.class);
}

// 将一些配置相关的Bean都注册到Spring容器中
// Since 2.7.6
registerCommonBeans(registry);
}
}

当设置为multiple的时候,例如dubbo.applations.name1.test1=test1Value会在加载ApplicationConfig的时候,name1会作为bean的名称,注入到容器中,test1/test1Value会作为keyValue的值,保存到BeanDefination中的attribute中。当有多个配置中心的时候,即可使用。

  • @DubboComponentScan
    Dubbo注解的扫描。

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(DubboComponentScanRegistrar.class)
public @interface DubboComponentScan {

/**
* Alias for the {@link #basePackages()} attribute. Allows for more concise annotation
* declarations e.g.: {@code @DubboComponentScan("org.my.pkg")} instead of
* {@code @DubboComponentScan(basePackages="org.my.pkg")}.
*
* @return the base packages to scan
*/

String[] value() default {};

/**
* Base packages to scan for annotated @Service classes. {@link #value()} is an
* alias for (and mutually exclusive with) this attribute.
* <p>
* Use {@link #basePackageClasses()} for a type-safe alternative to String-based
* package names.
*
* @return the base packages to scan
*/

String[] basePackages() default {};

/**
* Type-safe alternative to {@link #basePackages()} for specifying the packages to
* scan for annotated @Service classes. The package of each class specified will be
* scanned.
*
* @return classes from the base packages to scan
*/

Class<?>[] basePackageClasses() default {};

}

他也会去执行@Import中的DubboComponentScanRegistrar.class下的registerBeanDefinitions。主要的工作步骤分为了三个:

  1. 先去获取该注解写所有的可扫描的包的路径

  2. 创建一个ServiceAnnotationBeanPostProcessor Bean,并设置他的构造函数的参数为所有可扫描的包的路径

  3. 将2中生成的Bean注入到容器中,其他的工作交给Spring来完成。
    2中处理Bean的关键代码如下:

    @Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {

// @since 2.7.5
registerInfrastructureBean(registry, DubboBootstrapApplicationListener.BEAN_NAME, DubboBootstrapApplicationListener.class);

// 解析需要扫描的包的路径。替换里面可能存在的占位符等
Set<String> resolvedPackagesToScan = resolvePackagesToScan(packagesToScan);

if (!CollectionUtils.isEmpty(resolvedPackagesToScan)) {
// 扫描包信息
registerServiceBeans(resolvedPackagesToScan, registry);
} else {
if (logger.isWarnEnabled()) {
logger.warn("packagesToScan is empty , ServiceBean registry will be ignored!");
}
}

}

/**
* Registers Beans whose classes was annotated {@link Service}
*
* @param packagesToScan The base packages to scan
* @param registry {@link BeanDefinitionRegistry}
*/

private void registerServiceBeans(Set<String> packagesToScan, BeanDefinitionRegistry registry) {

DubboClassPathBeanDefinitionScanner scanner =
new DubboClassPathBeanDefinitionScanner(registry, environment, resourceLoader);

BeanNameGenerator beanNameGenerator = resolveBeanNameGenerator(registry);

scanner.setBeanNameGenerator(beanNameGenerator);

// refactor @since 2.7.7
// 指定去扫描 serviceAnnotationTypes 下的注解类型
serviceAnnotationTypes.forEach(annotationType -> {
scanner.addIncludeFilter(new AnnotationTypeFilter(annotationType));
});

for (String packageToScan : packagesToScan) {

// Registers @Service Bean first
// 会去扫描包,找出包下的每一个资源,然后加载他的Metadata信息,并判断是否是标有serviceAnnotationTypes中的注解
// 并注入到容器中
scanner.scan(packageToScan);

// Finds all BeanDefinitionHolders of @Service whether @ComponentScan scans or not.
// 生成一个BeanDefinitionHolder
Set<BeanDefinitionHolder> beanDefinitionHolders =
findServiceBeanDefinitionHolders(scanner, packageToScan, registry, beanNameGenerator);

if (!CollectionUtils.isEmpty(beanDefinitionHolders)) {

for (BeanDefinitionHolder beanDefinitionHolder : beanDefinitionHolders) {
// 注入到Bean容器中
registerServiceBean(beanDefinitionHolder, registry, scanner);
}

if (logger.isInfoEnabled()) {
logger.info(beanDefinitionHolders.size() + " annotated Dubbo's @Service Components { " +
beanDefinitionHolders +
" } were scanned under package[" + packageToScan + "]");
}

} else {

if (logger.isWarnEnabled()) {
logger.warn("No Spring Bean annotating Dubbo's @Service was found under package["
+ packageToScan + "]");
}
}
}
}

2、Dubbo服务暴露原理

配置承载初始化

配置优先级(由高到低)

  1. —D传递给JVM参数优先级最高

  2. 代码或XML配置优先级次高

  3. 配置文件优先级最低

对于运行期属性值(Dubbo的配置受到provider的影响),遵循以下规则:

  1. 只有provider端指定配置规则,则会自动透传到客户端

  2. 如果客户端配置了相应的属性,则会覆盖服务端

远程服务的暴露机制

在Spring容器启动的时候,会去获取所有的ApplicationListener,然后执行OneTimeExecutionApplicationContextEventListener中的onApplicationEvent方法。最终会调用到DubboBootstrapApplicationListener中的onApplicationContextEvent方法,探测到是容器启动的事件,就会去DubboBootstrap的start方法先进行一些初始化的任务,然后开始暴露服务。

总体过程

根据上面的图,暴露的过程主要分为六个个步骤:

  1. 根据ServiceConfig,配置暴露所需要的参数信息

  2. 使用Dubbo的SPI机制,创建一个ProxyFactory(默认是使用javassist)

  3. 根据创建的ProxyFactory,使用字节码技术,创建一个代理对象

  4. 将代理对象和第一步中生成的参数信息一起,创建一个Invoker对象

  5. 使用SPI机制,获取一个Protocol,根据具体的协议转换成一个Exportor

  6. 暴露服务

在进行服务暴露的时候需要注意,会先暴露的入口在ProtocolFilterWrapper的exporter方法。具体的逻辑如下:

  1. 使用SPI机制获取Protocol的时候,采用的方式是自适应扩展点,会根据url中的Protocol行选择(自适应生成的代码如下:)

package org.apache.dubbo.rpc;

import org.apache.dubbo.common.extension.ExtensionLoader;

public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0)
throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null)
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
org.apache.dubbo.common.URL url = arg0.getUrl();
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException(
"Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url ("
+ url.toString()
+ ") use keys([protocol])");
org.apache.dubbo.rpc.Protocol extension =
(org.apache.dubbo.rpc.Protocol)
ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class)
.getExtension(extName);
return extension.export(arg0);
}

public void destroy() {
throw new UnsupportedOperationException(
"The method public abstract void org.apache.dubbo.rpc.Protocol.destroy() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
}

public int getDefaultPort() {
throw new UnsupportedOperationException(
"The method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
}

public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1)
throws org.apache.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException(
"Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url ("
+ url.toString()
+ ") use keys([protocol])");
org.apache.dubbo.rpc.Protocol extension =
(org.apache.dubbo.rpc.Protocol)
ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class)
.getExtension(extName);
return extension.refer(arg0, arg1);
}

public java.util.List getServers() {
throw new UnsupportedOperationException(
"The method public default java.util.List org.apache.dubbo.rpc.Protocol.getServers() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
}
}

其中的关键方法是exporter中的

String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException(
"Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url ("
+ url.toString()
+ ") use keys([protocol])");
org.apache.dubbo.rpc.Protocol extension =
(org.apache.dubbo.rpc.Protocol)
ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class)
.getExtension(extName);
return extension.export(arg0);

所以,最终会选择到"registry"这个名字的扩展点,具体配置如下
Dubbo系列之Dubbo 服务暴露

所以会使用的是org.apache.dubbo.registry.integration.RegistryProtocol。但是由于Protocol有两个包装类,配置如下,

filter=org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper
mock=org.apache.dubbo.rpc.support.MockProtocol

所以在获取扩展点的时候,最终返回的是包装类的信息。以下方法在ExtensionLoader中

 // 判断是否是包装类,如果是是的,则初始化包装类,并注入实例
if (wrap) {
List<Class<?>> wrapperClassesList = new ArrayList<>();
// 先对包装类进行一个排序
if (cachedWrapperClasses != null) {
wrapperClassesList.addAll(cachedWrapperClasses);
wrapperClassesList.sort(WrapperComparator.COMPARATOR);
Collections.reverse(wrapperClassesList);
}

if (CollectionUtils.isNotEmpty(wrapperClassesList)) {
for (Class<?> wrapperClass : wrapperClassesList) {
Wrapper wrapper = wrapperClass.getAnnotation(Wrapper.class);
if (wrapper == null
|| (ArrayUtils.contains(wrapper.matches(), name)
&& !ArrayUtils.contains(wrapper.mismatches(), name))) {
instance =
injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
}
}
  1. 一次调用包装类里面的export,直到调用RegistryProtocol类的export方法

  2. 暴露服务

@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 会去获得URL中的registry的参数,并设置当前URL的protocol为该参数的值,如果参数不存在,则直接返回。替换为对应的注册协议,例如zookeeper
URL registryUrl = getRegistryUrl(originInvoker);
// url to export locally,会去获取URL中的export参数的值,如果为空,则会抛出异常,否则会创建一个新的URL
URL providerUrl = getProviderUrl(originInvoker);

// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
// the same service. Because the subscribed is cached key with the name of the service, it causes the
// subscription information to cover.
// TODO 对于控制台的服务重写。先留着,后面继续
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
//export invoker 会创建一个netty服务,用于监听地址和端口
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

// url to registry
// 使用的是SPI机制去获取一个Registry,默认是dubbo,会根据url中的参数protocol进行选择
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);

// decide if we need to delay publish
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
if (register) {
// 将服务写入到zookeeper
register(registryUrl, registeredProviderUrl);
}

// register stated url on provider model
// 会写入到ServiceRepository中的providers中,记录服务提供者的URL的信息以及暴露的状态
registerStatedUrl(registryUrl, registeredProviderUrl, register);


exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);

// Deprecated! Subscribe to override rules in 2.6.x or before.
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

// 用户可自定义的监听器
notifyExport(exporter);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<>(exporter);
}

里面一个是 final ExporterChangeableWrapper exporter = doLocalExport(originInvoker, providerUrl); 会先创建一个DubboExporter(如果使用的是Dubbo协议),放进exporterMap,key是由group、version、port和path组成,value为包含了invoker的exporter对象。然后会去并打开一个网络服务器(默认是netty),然后监听端口
5. 根据registryUrl将registeredProviderUrl写入到对应的服务(推荐是zookeeper)
6. 修改状态并返回,完成服务的暴露

创建服务器(默认是netty)

private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
// 根据Address去获取一个server
ProtocolServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
// 没有找到则创建一个servce
if (server == null) {
// 以address作为key,可以保证每一个地址只有一个netty
serverMap.put(key, createServer(url));
}
}
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}

会先去获得一个Address信息,毕竟一个Address只能被一个服务器监听,然后去到缓存中拿,如果有,则不会创建服务器,没有则会创建一个服务器,然后放入缓存。

private ProtocolServer createServer(URL url) {
// 添加一些服务器得相关参数
url = URLBuilder.from(url)
// send readonly event when server closes, it's enabled by default
.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
// enable heartbeat by default
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
// 设置编解码器,默认是用dubbo
.addParameter(CODEC_KEY, DubboCodec.NAME)
.build();
String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);

// 判断是否有该网络服务器的扩展点,默认是使用netty
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}

ExchangeServer server;
try {
// 会创建一个Netty服务,并且绑定并监听host
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}

str = url.getParameter(CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}

return new DubboProtocolServer(server);
}

写入注册中心

  1. 先获取一个注册中心的创建工厂

  2. 使用工厂,获取一个注册中心的实例

  3. 调用registry方法

private void register(URL registryUrl, URL registeredProviderUrl) {
// registryFactory是使用自适应创建一个工厂
// getRegistry得时候会优先根据protocol获取一个Registry,如果没有,则使用dubbo
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registeredProviderUrl);
}

RegistryFactory是在实例化RegistryProtocol,获取扩展点时,使用set方法注入的值。RegistryFactory有一个包装类org.apache.dubbo.registry.RegistryFactoryWrapper,在实例化是会返回最终的包装类。在调用registryFactory.getRegistry(registryUrl)的时候,会调用到包装类中的getRegistry方法,会使用registryFactory根据URL去获取一个Registry实例,然后包装成ListenerRegistryWrapper并返回,所以最终调用registry方法的是ListenerRegistryWrapper。
这里补一下RegistryFactory的来源:在实例化的RegistryProtocol的时候,会使用set方法注入RegistryFactory,去获取扩展点的时候,会去获取扩展点的时候,会去获取RegistryFactory下的有标记@Adaptive的实现类。因为找不到该实现类,所以是使用了javassist技术生成了一个自适应的类,并进行了加载,该接口下标记了@Adaptive的方法,会进行实现,根据URL中的protocol进行获取扩展点,如果protocol为空,则使用默认的DubboProtocolFactory。
接着上面调用的Registry方法,因为ListenerRegistryWrapper内部包含了ZookeeperRegistry,所以最终会调用ZookeeperRegister进行服务的暴露。
再来一张图

ZookeeperRefistry继承自FailbackRegistry,因为未重写registry方法,所以会调用到FailbackRegistry的Registry方法。

if (!acceptable(url)) {
logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type.");
return;
}
// 调用父类的方法,将URL写入已经注册的URL的缓存中
super.register(url);
// 如果该URL注册失败过,则先从失败的队列中移除
removeFailedRegistered(url);
// 如果是未注册的URL,则移除未注册URL的队列
removeFailedUnregistered(url);
try {
// Sending a registration request to the server side
// 调用具体的实例,写入数据。接下来就是zk的事情
doRegister(url);
} catch (Exception e) {
Throwable t = e;

// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}

// Record a failed registration request to a failed list, retry regularly
addFailedRegistered(url);
}

到此远程服务暴露全部完成

本地服务的暴露机制

本地服务的暴露相对于远程服务暴露要简单的多,主要分为两个步骤:

  1. 使用代理工程创建一个代理对象invoker。

  2. 创建一个InjvmExporter对象,并将invoker放入到本地暴露的map中,完成暴露

 private void exportLocal(URL url) {
// 这里设置了protocol为injvm,后面获取扩展点会使用
URL local = URLBuilder.from(url)
.setProtocol(LOCAL_PROTOCOL)
.setHost(LOCALHOST_VALUE)
.setPort(0)
.build();
// getInvoker会去创建一个代理对象
// export会创建一个InjvmExporter,并将invoker放入缓存map,暴露完成
Exporter<?> exporter = PROTOCOL.export(
PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);
}