dubbo service启动流程源码分析
之前写过dubbo整体的源代码流程,看留言大家想更深入的了解,今天就给大家分享一下service注册和发布的流程代码。<dubbo:service>到底干了什么
dubbo的service处理主要是基于spring的serviceBean对象的生成和注册过程。主要是afterPropertiesSet这个方法:
1.初始化构建ProviderConfig对象,ProviderConfig对象主要就是provider的配置信息(是一个bean,主要是保存配置信息),这个里面没什么难度都是一些配置获取,设置ProviderConfig对象的操作,如果service上没有配置对应的,直接会找全局的配置,后面的获取配置都是一个逻辑
2.设置application
3.设置module对象
注册中心配置获取
由代码可以知道如果配置了多个注册中心,并且指定了多个注册中心时,就会向多个注册中心注册,如果未指定,则会向所有注册中心都会注册,这个许多文章或者配置都没有讲到,需要注意的。
后面的配置都和上面一样就不一一分析了,协议配置和注册中心的方式也是一样的:支持多协议开放,如果未指定此服务开放的协议,则默认是所有协议开放。
最主要是export()方法,将服务发布处理,如果不支持listenner机制时,直接调用export将服务发布,
如果支持listenner时,会在应用启动事件处理里面通过export将服务发布出去。
所以服务发布的流程最终都走的是export方法,下面就看一下export方法干了些什么?
@Override
public void export() {
//调用父类的exort
super.export();
// Publish ServiceBeanExportedEvent
publishExportEvent();
}
调用父类的export方法,也就是ServiceConfig类里面的export方法。
checkAndUpdateSubConfigs()方法里面主要是:
1. 从配置中心中刷新最新的配置
2.进行配置数据检查
3 对注册中心,协议,application对象从配置中心进行检查:
为什么只检查这三个呢?因为这三个是服务必须要有的。
4. ref的检查:调用真实类的发布类的interface配置,和方法检查等
5.调用方式处理:
6 泛型服务设置:设置调用类
7 本地调用或者stub调用时设置调用的类等
doExport里面直接调用了doExportUrls(),这个是重点
协议设置为注册协议这个后面会遇到。
2.通过循环获取配置协议,将协议注册按注册注册到多个注册中心:doExportUrlsFor1Protocol(protocolConfig, registryURLs);
1.服务信息:协议名默认为为dubbo
//对外输出的方法如果不为空
if (CollectionUtils.isNotEmpty(methods)) {
for (MethodConfig method : methods) {
//设置方法信息
appendParameters(map, method,method.getName());
String retryKey =method.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue =map.remove(retryKey);
if ("false".equals(retryValue)){
map.put(method.getName()+ ".retries", "0");
}
}
//设置方法的输入参数信息
List<ArgumentConfig>arguments = method.getArguments();
if (CollectionUtils.isNotEmpty(arguments)){
for (ArgumentConfig argument: arguments) {
// convert argument type
if (argument.getType() !=null && argument.getType().length() > 0) {
Method[] methods = interfaceClass.getMethods();
// visit all methods
if (methods != null &&methods.length > 0) {
for (int i = 0; i< methods.length; i++) {
StringmethodName = methods[i].getName();
// target themethod, and get its signature
if (methodName.equals(method.getName())){
Class<?>[] argtypes =methods[i].getParameterTypes();
// onecallback in the method
if (argument.getIndex()!= -1) {
if (argtypes[argument.getIndex()].getName().equals(argument.getType())){
appendParameters(map,argument, method.getName() + "." + argument.getIndex());
} else{
throw new IllegalArgumentException("Argumentconfig error : the index attribute and type attribute not match :index :" +argument.getIndex() + ", type:" + argument.getType());
}
} else {
//multiple callbacks in the method
for (intj = 0; j < argtypes.length; j++) {
Class<?> argclazz = argtypes[j];
if (argclazz.getName().equals(argument.getType())){
appendParameters(map, argument, method.getName() + "." + j);
if (argument.getIndex() != -1 && argument.getIndex() != j) {
throw new IllegalArgumentException("Argument config error : theindex attribute and type attribute not match :index :" +argument.getIndex() + ", type:" + argument.getType());
}
}
}
}
}
}
}
} else if (argument.getIndex()!= -1) {
appendParameters(map,argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("Argumentconfig must set index or type attribute.eg: <dubbo:argument index='0'.../> or <dubbo:argument type=xxx .../>");
}
}
}
} // end of methods for
}
上面主要主要将要发布的服务的方法中的信息放入到元数据里面。没有什么注意的
以上主要是设置泛型调用,版本号等,最主要的是获取方法,采用装饰类(动态动态编译的方式自动)并进行缓存
带token访问,进行简单的授权访问
依据外放的返回进行外放,如果不是远程服务,则直接进行本地开放本地开放就是injvm协议方式开放如果是远程进行远程开放
//将服务多个注册中心开放一下是精髓:
首先获取每一个注册中心外放url(注册中心url)进行外放:
上面红线里面就是外放代码看着就几行其实里面涉及了很多东西:
1.通过创建代理对象invoker进行进行代理处理
1.Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref,(Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY,url.toFullString()));
由于默认是javassist代理:
代理源码如下
1.代理调用方法处理过程:1.是将获取封装器对象,封装器就是我们上面那个interface的封装器,从缓存中取的(图中标红的)。2.调用invokeMethod,invokeMethod 会路由到真实对象的(REF对象)的调用方法里面去:invoke增强时会调用ref对应的methodName的方法,从而调用到真实的业务方法里面通过代理我们获取的是AbstractProxyInvoker对象。
2.对AbstractProxyInvoker对象进行二次带元数据的增强
3. Exporter<?> exporter = protocol.export(wrapperInvoker);协议外放,协议外放这个是一个重点:protocol 是通过:
private static final Protocol protocol =ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
在成员中获取的,这个方法获取就涉及到了dubbo 扩展机制。后续会有扩展机制讲解:
扩展机制:先简单理解为:1.从META-INF加载扩展定义文件。2
按类类型生成ExtensionLoader(调用时)从ExtensionLoader获取获取对应的扩展对象生成时会语句url和Adaptive动态生成一个类:讲解先了解大概(后序会有一章将dubbo扩展机制如何实现)
:
在这会生成protocol的
最终是调用到如下:
生成新类里面会依据url配置调用扩展:
动态生成的新类为:
public class Protocol$Adaptive implementsorg.apache.dubbo.rpc.Protocol {
public void destroy() {
throw newUnsupportedOperationException("The method public abstract voidorg.apache.dubbo.rpc.Protocol.destroy() of interfaceorg.apache.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {
throw newUnsupportedOperationException("The method public abstract intorg.apache.dubbo.rpc.Protocol.getDefaultPort() of interfaceorg.apache.dubbo.rpc.Protocol is not adaptive method!");
}
public org.apache.dubbo.rpc.Exporterexport(org.apache.dubbo.rpc.Invoker arg0) throwsorg.apache.dubbo.rpc.RpcException {
if (arg0 == null) throw newIllegalArgumentException("org.apache.dubbo.rpc.Invoker argument ==null");
if (arg0.getUrl() == null) throw newIllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl()== null");
org.apache.dubbo.common.URL url =arg0.getUrl();
String extName = ( url.getProtocol() ==null ? "dubbo" : url.getProtocol() );
if(extName == null) throw newIllegalStateException("Failed to get extension(org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() +") use keys([protocol])");
org.apache.dubbo.rpc.Protocol extension =(org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
public org.apache.dubbo.rpc.Invokerrefer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throwsorg.apache.dubbo.rpc.RpcException {
if (arg1 == null) throw newIllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg1;
String extName = ( url.getProtocol() ==null ? "dubbo" : url.getProtocol() );
if(extName == null) throw newIllegalStateException("Failed to get extension(org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() +") use keys([protocol])");
org.apache.dubbo.rpc.Protocol extension =(org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
这个类通过:org.apache.dubbo.rpc.Protocol extension =(org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);会对形成调用链(封装器对象的自动形成调用链),set属性的自动注入,与springBean的自动注入和装配比较像。
最终会到:
以上扩展会在后续详细讲解。协议装饰类的自动拉链,DubboProtocol被自动注入到RegistryProtocol 。于是就有ProtocolFilterWrapper与ProtocolFilterWrapper(自动拉链) --------》(链式调用到)RegistryProtocol(动态生成的Protocolcol类)--------->setProtocol(SPI扩展set自注入DubboProtocol协议注入到RegistryProtocol协议中)
的一个链式调用用就产生了。
调用:ProtocolFilterWrapper.export-------->ProtocolFilterWrapper----->RegistryProtocol.export(动态生成的Protocol,进行export时调用RegistryProtocol.export外放)------------->DubboProtocol.export(动态生成的Protocol 调用export时获取到dubboProtocol进行export外放)调用到本地dubbo服务的外放流程里面,这个细节很重要,也是整个dubbo扩展的核心。许多文章都没有把这个逻辑讲出来。
RegistryProtocol:export中的代码:
上面就是整个发布和服务外放的过程。由代码可以知道,注册时先调用本地的export,进行协议export,然后在获取注册服务 final Registry registry =getRegistry(originInvoker);进行后续注册处理:
final URL overrideSubscribeUrl =getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl,originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
//export invoker 将invoker进行本地export出去
final ExporterChangeableWrapper<T> exporter =doLocalExport(originInvoker, providerUrl);
// url to registry
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl,registryUrl);
ProviderInvokerWrapper<T> providerInvokerWrapper =ProviderConsumerRegTable.registerProvider(originInvoker,
registryUrl,registeredProviderUrl);
//to judge if we need to delay publish
boolean register = registeredProviderUrl.getParameter("register", true);
if (register) {
register(registryUrl,registeredProviderUrl);
providerInvokerWrapper.setReg(true);
}
// Deprecated! Subscribe to override rules in 2.6.x or before.
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<>(exporter);
ProtocolFilterWrapper:产生filter过滤链调用:
所以此处代码export 是就要判断是哪种个协议,防止dubbo协议被重复在本地外放。因为DubboProtocol是在RegistryProtocol里面外放出来的
ProtocolListenerWrapper:进行listenner扩展处理,并调用注册服务
同上
调用DubboProtocol中的外放进行服务外放
DubboProtocolexport 逻辑是真正的启动本地端口外放协议。
获取扩展的Transporter,Transporter就是netty服务端,并设置协议处理
requestHandler进行报文网络层面的处理
元数据保存上报,完成服务外放
至此服务发布完成,整个核心依赖于dubbo的扩展机制,
后续会正对扩展机制详细描述源码实现细节