vlambda博客
学习文章列表

dubbo service启动流程源码分析

之前写过dubbo整体的源代码流程,看留言大家想更深入的了解,今天就给大家分享一下service注册和发布的流程代码。<dubbo:service>到底干了什么

   dubboservice处理主要是基于springserviceBean对象的生成和注册过程。主要是afterPropertiesSet这个方法:

   1.初始化构建ProviderConfig对象,ProviderConfig对象主要就是provider的配置信息(是一个bean,主要是保存配置信息),这个里面没什么难度都是一些配置获取,设置ProviderConfig对象的操作,如果service上没有配置对应的,直接会找全局的配置,后面的获取配置都是一个逻辑


2.设置application

dubbo service启动流程源码分析

3.设置module对象

dubbo service启动流程源码分析

注册中心配置获取

dubbo service启动流程源码分析


由代码可以知道如果配置了多个注册中心,并且指定了多个注册中心时,就会向多个注册中心注册,如果未指定,则会向所有注册中心都会注册,这个许多文章或者配置都没有讲到,需要注意的。

后面的配置都和上面一样就不一一分析了,协议配置和注册中心的方式也是一样的:支持多协议开放,如果未指定此服务开放的协议,则默认是所有协议开放。


dubbo service启动流程源码分析


dubbo service启动流程源码分析

dubbo service启动流程源码分析

dubbo service启动流程源码分析


最主要是export()方法,将服务发布处理,如果不支持listenner机制时,直接调用export将服务发布,

如果支持listenner时,会在应用启动事件处理里面通过export将服务发布出去。

所以服务发布的流程最终都走的是export方法,下面就看一下export方法干了些什么?

@Override
public void export() {
    //调用父类的exort
    super.export();
    // Publish ServiceBeanExportedEvent
    publishExportEvent();
}

调用父类的export方法,也就是ServiceConfig类里面的export方法。


dubbo service启动流程源码分析

checkAndUpdateSubConfigs()方法里面主要是:

1. 从配置中心中刷新最新的配置
2.进行配置数据检查
3 对注册中心,协议,application对象从配置中心进行检查:
 为什么只检查这三个呢?因为这三个是服务必须要有的。
4. ref的检查:调用真实类的发布类的interface配置,和方法检查等
5.调用方式处理:
6   泛型服务设置:设置调用类
7    本地调用或者stub调用时设置调用的类等

doExport里面直接调用了doExportUrls(),这个是重点


dubbo service启动流程源码分析

dubbo service启动流程源码分析


dubbo service启动流程源码分析

协议设置为注册协议这个后面会遇到。

 

2.通过循环获取配置协议,将协议注册按注册注册到多个注册中心:doExportUrlsFor1Protocol(protocolConfig, registryURLs);

    1.服务信息:协议名默认为为dubbo

   


dubbo service启动流程源码分析


//对外输出的方法如果不为空
if (CollectionUtils.isNotEmpty(methods)) {
    for (MethodConfig method : methods) {
        //设置方法信息
        appendParameters(map, method,method.getName());
        String retryKey =method.getName() + ".retry";
        if (map.containsKey(retryKey)) {
            String retryValue =map.remove(retryKey);
            if ("false".equals(retryValue)){
                map.put(method.getName()+ ".retries", "0");
            }
        }
        //设置方法的输入参数信息
        List<ArgumentConfig>arguments = method.getArguments();
        if (CollectionUtils.isNotEmpty(arguments)){
            for (ArgumentConfig argument: arguments) {
                // convert argument type
                if (argument.getType() !=null && argument.getType().length() > 0) {
                    Method[] methods = interfaceClass.getMethods();
                    // visit all methods
                    if (methods != null &&methods.length > 0) {
                        for (int i = 0; i< methods.length; i++) {
                            StringmethodName = methods[i].getName();
                            // target themethod, and get its signature
                            if (methodName.equals(method.getName())){
                                Class<?>[] argtypes =methods[i].getParameterTypes();
                                // onecallback in the method
                                if (argument.getIndex()!= -1) {
                                    if (argtypes[argument.getIndex()].getName().equals(argument.getType())){
                                        appendParameters(map,argument, method.getName() + "." + argument.getIndex());
                                    } else{
                                        throw new IllegalArgumentException("Argumentconfig error : the index attribute and type attribute not match :index :" +argument.getIndex() + ", type:" + argument.getType());
                                    }
                                } else {
                                    //multiple callbacks in the method
                                    for (intj = 0; j < argtypes.length; j++) {
                                       Class<?> argclazz = argtypes[j];
                                        if (argclazz.getName().equals(argument.getType())){
                                           appendParameters(map, argument, method.getName() + "." + j);
                                           if (argument.getIndex() != -1 && argument.getIndex() != j) {
                                               throw new IllegalArgumentException("Argument config error : theindex attribute and type attribute not match :index :" +argument.getIndex() + ", type:" + argument.getType());
                                            }
                                        }
                                    }
                                }
                            }
                        }
                    }
                } else if (argument.getIndex()!= -1) {
                    appendParameters(map,argument, method.getName() + "." + argument.getIndex());
                } else {
                    throw new IllegalArgumentException("Argumentconfig must set index or type attribute.eg: <dubbo:argument index='0'.../> or <dubbo:argument type=xxx .../>");
                }

            }
        }
    } // end of methods for
}

上面主要主要将要发布的服务的方法中的信息放入到元数据里面。没有什么注意的

dubbo service启动流程源码分析

以上主要是设置泛型调用,版本号等,最主要的是获取方法,采用装饰类(动态动态编译的方式自动)并进行缓存

 

dubbo service启动流程源码分析

token访问,进行简单的授权访问

dubbo service启动流程源码分析

dubbo service启动流程源码分析

依据外放的返回进行外放,如果不是远程服务,则直接进行本地开放本地开放就是injvm协议方式开放如果是远程进行远程开放

 

//将服务多个注册中心开放一下是精髓:

dubbo service启动流程源码分析

首先获取每一个注册中心外放url(注册中心url)进行外放:

上面红线里面就是外放代码看着就几行其实里面涉及了很多东西:

1.通过创建代理对象invoker进行进行代理处理

1.Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref,(Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY,url.toFullString()));

由于默认是javassist代理:


dubbo service启动流程源码分析

代理源码如下

dubbo service启动流程源码分析

1.代理调用方法处理过程:1.是将获取封装器对象,封装器就是我们上面那个interface的封装器,从缓存中取的(图中标红的)。2.调用invokeMethod,invokeMethod 会路由到真实对象的(REF对象)的调用方法里面去:invoke增强时会调用ref对应的methodName的方法,从而调用到真实的业务方法里面通过代理我们获取的是AbstractProxyInvoker对象。


2.AbstractProxyInvoker对象进行二次带元数据的增强

 

3. Exporter<?> exporter = protocol.export(wrapperInvoker);协议外放,协议外放这个是一个重点:protocol 是通过:

private static final Protocol protocol =ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

在成员中获取的,这个方法获取就涉及到了dubbo 扩展机制。后续会有扩展机制讲解:

扩展机制:先简单理解为:1.META-INF加载扩展定义文件。2

按类类型生成ExtensionLoader(调用时)从ExtensionLoader获取获取对应的扩展对象生成时会语句urlAdaptive动态生成一个类:讲解先了解大概(后序会有一章将dubbo扩展机制如何实现)

dubbo service启动流程源码分析

在这会生成protocol

dubbo service启动流程源码分析

最终是调用到如下:

dubbo service启动流程源码分析

生成新类里面会依据url配置调用扩展:

动态生成的新类为:

public class Protocol$Adaptive implementsorg.apache.dubbo.rpc.Protocol {

public void destroy()  {

throw newUnsupportedOperationException("The method public abstract voidorg.apache.dubbo.rpc.Protocol.destroy() of interfaceorg.apache.dubbo.rpc.Protocol is not adaptive method!");

}

public int getDefaultPort()  {

throw newUnsupportedOperationException("The method public abstract intorg.apache.dubbo.rpc.Protocol.getDefaultPort() of interfaceorg.apache.dubbo.rpc.Protocol is not adaptive method!");

}

public org.apache.dubbo.rpc.Exporterexport(org.apache.dubbo.rpc.Invoker arg0) throwsorg.apache.dubbo.rpc.RpcException {

if (arg0 == null) throw newIllegalArgumentException("org.apache.dubbo.rpc.Invoker argument ==null");

if (arg0.getUrl() == null) throw newIllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl()== null");

org.apache.dubbo.common.URL url =arg0.getUrl();

String extName = ( url.getProtocol() ==null ? "dubbo" : url.getProtocol() );

if(extName == null) throw newIllegalStateException("Failed to get extension(org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() +") use keys([protocol])");

org.apache.dubbo.rpc.Protocol extension =(org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);

return extension.export(arg0);

}

public org.apache.dubbo.rpc.Invokerrefer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throwsorg.apache.dubbo.rpc.RpcException {

if (arg1 == null) throw newIllegalArgumentException("url == null");

org.apache.dubbo.common.URL url = arg1;

String extName = ( url.getProtocol() ==null ? "dubbo" : url.getProtocol() );

if(extName == null) throw newIllegalStateException("Failed to get extension(org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() +") use keys([protocol])");

org.apache.dubbo.rpc.Protocol extension =(org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);

return extension.refer(arg0, arg1);

}

这个类通过:org.apache.dubbo.rpc.Protocol extension =(org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);会对形成调用链(封装器对象的自动形成调用链),set属性的自动注入,与springBean的自动注入和装配比较像。

最终会到:

 

dubbo service启动流程源码分析


以上扩展会在后续详细讲解。协议装饰类的自动拉链,DubboProtocol被自动注入到RegistryProtocol 。于是就有ProtocolFilterWrapperProtocolFilterWrapper(自动拉链) --------》(链式调用到)RegistryProtocol(动态生成的Protocolcol类)--------->setProtocol(SPI扩展set自注入DubboProtocol协议注入到RegistryProtocol协议中)

的一个链式调用用就产生了。

调用:ProtocolFilterWrapper.export-------->ProtocolFilterWrapper----->RegistryProtocol.export(动态生成的Protocol,进行export时调用RegistryProtocol.export外放)------------->DubboProtocol.export(动态生成的Protocol 调用export时获取到dubboProtocol进行export外放)调用到本地dubbo服务的外放流程里面,这个细节很重要,也是整个dubbo扩展的核心。许多文章都没有把这个逻辑讲出来。

RegistryProtocolexport中的代码:

dubbo service启动流程源码分析

dubbo service启动流程源码分析

上面就是整个发布和服务外放的过程。由代码可以知道,注册时先调用本地的export,进行协议export,然后在获取注册服务 final Registry registry =getRegistry(originInvoker);进行后续注册处理:

final URL overrideSubscribeUrl =getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl,originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
//export invoker invoker进行本地export出去
final ExporterChangeableWrapper<T> exporter =doLocalExport(originInvoker, providerUrl);

// url to registry
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl,registryUrl);
ProviderInvokerWrapper<T> providerInvokerWrapper =ProviderConsumerRegTable.registerProvider(originInvoker,
        registryUrl,registeredProviderUrl);
//to judge if we need to delay publish
boolean register = registeredProviderUrl.getParameter("register", true);
if (register) {
    register(registryUrl,registeredProviderUrl);
    providerInvokerWrapper.setReg(true);
}

// Deprecated! Subscribe to override rules in 2.6.x or before.
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<>(exporter);


ProtocolFilterWrapper:产生filter过滤链调用:

所以此处代码export 是就要判断是哪种个协议,防止dubbo协议被重复在本地外放。因为DubboProtocol是在RegistryProtocol里面外放出来的

dubbo service启动流程源码分析

ProtocolListenerWrapper:进行listenner扩展处理,并调用注册服务

同上

 

dubbo service启动流程源码分析

调用DubboProtocol中的外放进行服务外放

DubboProtocolexport 逻辑是真正的启动本地端口外放协议。

dubbo service启动流程源码分析

获取扩展的Transporter,Transporter就是netty服务端,并设置协议处理

requestHandler进行报文网络层面的处理




元数据保存上报,完成服务外放

至此服务发布完成,整个核心依赖于dubbo的扩展机制,

后续会正对扩展机制详细描述源码实现细节