Dubbo服务的发布流程
今天这篇文章非常关键,堪称Dubbo的核心内容,出去面试的时候如果问你Dubbo相关的问题,十有八九是Dubbo服务的发布和引用流程,由于内容太多,今天先分析Dubbo的服务发布流程。
中的写的那个demo吗,其中Provider中的testProvider方法的第6步(导出服务),也就是serviceConfig.export()这个方法,服务提供方需要调用这个方法来发布服务。
看下serviceConfig.export()这个方法的代码:
public synchronized void export() {
checkAndUpdateSubConfigs();
//一、是否需要导出服务
if (!shouldExport()) {
return;
}
//二、是否需要延迟发布
if (shouldDelay()) {
delayExportExecutor.schedule(this::doExport, delay, TimeUnit.MILLISECONDS);
} else {
//三、直接发布
doExport();
}
}
接下来对这个方法中的关键方法进行解析,checkAndUpdateSubConfigs()这个方法就不细说了,就是检测下配置,自己可以点点看
一、shouldExport()方法:
private boolean shouldExport() {
//1
/**
是否要导出这个服务
protected Boolean export;
*/
Boolean shouldExport = getExport();
//2
/**
provider配置
private ProviderConfig provider;
*/
if (shouldExport == null && provider != null) {
shouldExport = provider.getExport();
}
//3
// default value is true
if (shouldExport == null) {
return true;
}
return shouldExport;
}
1.1、getExport()这个方法你点进去你会发现他是dubbo配置类的一个属性,叫做:
/**
* Whether to export the service(是否导出服务)
*/
protected Boolean export;
你可以初始化serviceConfig的时候给它设置值的:
1.2、如果没有配置export属性并且provider不为空(注意下这个provider
/**
* The provider configuration
*/
private ProviderConfig provider;
其实也是serviceConfig的一个配置)就从provider中获取
1.3、经过前两步之后如果shouldExport还是空,那就默认为true,也就是导出服务
二、shouldDelay()方法,是否延迟发布
private boolean shouldDelay() {
//1
Integer delay = getDelay();
//2
/**
延时发布多少毫秒
protected Integer delay;
*/
if (delay == null && provider != null) {
delay = provider.getDelay();
}
//3
return delay != null && delay > 0;
}
2.1、getDeley()方法其实取得是serviceConfig的delay属性
/**
* The time delay register service (milliseconds)
*/
protected Integer delay;
可以看到单位是毫秒,如果设置了这个属性,就会延时多少毫秒之后发布服务。
2.2、如果没有设置delay的值并且provider配置不为空的话,就从provider中获取。
2.3、如果设置的delay不为空并且大于0的话肯定就是要延迟发布了。
通过上面的代码可知,Dubbo延迟发布其实是使用ScheduledExcutorService(定时任务线程池)来实现的
/**
* A delayed exposure service timer
*/
private static final ScheduledExecutorService delayExportExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("DubboServiceDelayExporter", true));
三、doExport()直接发布,其实延时发布最终调用的也是这个方法
protected synchronized void doExport() {
//1
/**
服务导出或者不导出的一个标识,如果true表示不用导出这个服务
private transient volatile boolean unexported;
*/
if (unexported) {
throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
}
//2
/**
服务是否已经被导出,true表示该服务已经被导出了
private transient volatile boolean exported;
*/
if (exported) {
return;
}
//3 开始导出前把是否已经被导出标记成已导出
exported = true;
//4
/**
服务的名称
private String path;
*/
/**
被导出服务的接口名
private String interfaceName;
*/
if (StringUtils.isEmpty(path)) {
path = interfaceName;
}
//5 导出成url
doExportUrls();
}
3.1、判断这个服务是不是被标记成不导出,如果标记了就直接抛出异常
3.2、判断服务是否已被导出,如果已被导出,就直接返回
3.3、准备导出,导出前将exported标记为true
3.4、判断如果path为空,就把要导出服务的接口名字赋值给path
3.5、导出成url,可以看到又是一个方法,继续看doExportUrls()这个方法吧
@SuppressWarnings({"unchecked", "rawtypes"})
private void doExportUrls() {
//1 加载注册中心
List<URL> registryURLs = loadRegistries(true);
//2
for (ProtocolConfig protocolConfig : protocols) {
//2.1 封装pathKey
String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
//2.2 将要暴露的服务封装成一个pojo
ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
//2.3 将要暴露的服务加入缓存
ApplicationModel.initProviderModel(pathKey, providerModel);
//2.4
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
3.5.1、loadRegistries(),加载注册中心
protected List<URL> loadRegistries(boolean provider) {
List<URL> registryList = new ArrayList<URL>();
// 1 判断注册中心是否为空
if (CollectionUtils.isNotEmpty(registries)) {
for (RegistryConfig config : registries) {
//2 获取注册中心地址
String address = config.getAddress();
if (StringUtils.isEmpty(address)) {
//3 如果注册中心地址不存在,就给个默认值0.0.0.0
address = Constants.ANYHOST_VALUE;
}
//4 如果注册中新地址不可用,也就是直连的意思 N/A,前面加非,意思就是不是直连的时候
if (!RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
Map<String, String> map = new HashMap<String, String>();
//5 应用信息添加到map中,比如应用名
appendParameters(map, application);
//6 将注册中心的配置放入map
appendParameters(map, config);
//7 将path放入map,"path" -> "org.apache.dubbo.registry.RegistryService"
map.put(Constants.PATH_KEY, RegistryService.class.getName());
//8 将dubbo的版本,当前的时间戳以及运行时的进程号放入map
appendRuntimeParameters(map);
//9 如果参数中不包含协议,那么协议默认为Dubbo协议
if (!map.containsKey(Constants.PROTOCOL_KEY)) {
map.put(Constants.PROTOCOL_KEY, Constants.DUBBO_PROTOCOL);
}
//10 把map转换成url zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-provider&dubbo=2.0.2&pid=1736&release=2.7.1×tamp=1606877890879
List<URL> urls = UrlUtils.parseURLs(address, map);
//11 遍历url
for (URL url : urls) {
//12 改变url registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-provider&dubbo=2.0.2&pid=1736®istry=zookeeper&release=2.7.1×tamp=1606877890879
url = URLBuilder.from(url)
.addParameter(Constants.REGISTRY_KEY, url.getProtocol())
.setProtocol(Constants.REGISTRY_PROTOCOL)
.build();
if ((provider && url.getParameter(Constants.REGISTER_KEY, true))
|| (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {
//13 放入注册中心集合,因为可能有多个注册中心
registryList.add(url);
}
}
}
}
}
return registryList;
}
接下来对上面的每一步都进行详细的解析,因为真的很重要:
3.5.1.1、判断注册中心是否为空,我们看下这个registries
/**
* Registry centers 注册中心配置
*/
protected List<RegistryConfig> registries;
为什么是个list?因为dubbo服务可以被暴露到多个注册中心,比如我可以同时把我的服务暴露到zk和redis,就是这个道理
3.5.1.3、如果这个adress为空的话,就给个缺省值0.0.0.0
3.5.1.4、如果注册中心不可用,意思就是直连,用过dubbo的人应该知道N/A,其实就是不连注册中心,消费者直接调用提供者,这里是不是直连的时候进入,所以配置不能是N/A,我现在用的zk,所以肯定会进入的,继续往下走
3.5.1.5、将应用信息放入map,其实就是初始化ApplicationConfig的时候传入的那个dubbo-provider,标识提供者的名称
3.5.1.6、将注册中心配置放入map,可以看到并未放入任何东西,map的大小还是1
3.5.1.7、将path放入map
3.5.1.8、添加一些运行时的信息
3.5.1.9、添加协议,如果没有设置,默认是Dubbo协议
3.5.1.11-13、转换url,注意看下和上面的有什么不同
对比一下你会发现url的前缀由zookeeper变成了registry了,并且连接中拼上了registry=zookeeper,然后将url放入注册中心集合。
3.5.2、测试类里咱并未给serviceConfig设置protocols属性,看看默认的是什么
3.5.2.1、封装pathKey
这里面path是服务名,也就是你要暴露的服务,值是你暴露的接口的名字,group就是你设置的dubbo的分组,version就是服务的版本,所以经过这句代码之后,看下pathKey的值
String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
3.5.2.2、将要暴露的服务封装成一个pojo
ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
先看下这句代码里的providerModel这个类:
package org.apache.dubbo.rpc.model;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
要发布的服务的模型,意思就是把你要发布的服务封装成一个pojo了
* ProviderModel which is about published services
*/
public class ProviderModel {
//服务名,这个其实就是pathKey
private final String serviceName;
//服务的实例
private final Object serviceInstance;
//服务的接口名
private final Class<?> serviceInterfaceClass;
//接口里的方法
private final Map<String, List<ProviderMethodModel>> methods = new HashMap<String, List<ProviderMethodModel>>();
public ProviderModel(String serviceName, Object serviceInstance, Class<?> serviceInterfaceClass) {
if (null == serviceInstance) {
throw new IllegalArgumentException("Service[" + serviceName + "]Target is NULL.");
}
this.serviceName = serviceName;
this.serviceInstance = serviceInstance;
this.serviceInterfaceClass = serviceInterfaceClass;
initMethod();
}
public String getServiceName() {
return serviceName;
}
public Class<?> getServiceInterfaceClass() {
return serviceInterfaceClass;
}
public Object getServiceInstance() {
return serviceInstance;
}
public List<ProviderMethodModel> getAllMethods() {
List<ProviderMethodModel> result = new ArrayList<ProviderMethodModel>();
for (List<ProviderMethodModel> models : methods.values()) {
result.addAll(models);
}
return result;
}
public ProviderMethodModel getMethodModel(String methodName, String[] argTypes) {
List<ProviderMethodModel> methodModels = methods.get(methodName);
if (methodModels != null) {
for (ProviderMethodModel methodModel : methodModels) {
if (Arrays.equals(argTypes, methodModel.getMethodArgTypes())) {
return methodModel;
}
}
}
return null;
}
private void initMethod() {
Method[] methodsToExport = null;
methodsToExport = this.serviceInterfaceClass.getMethods();
for (Method method : methodsToExport) {
method.setAccessible(true);
List<ProviderMethodModel> methodModels = methods.get(method.getName());
if (methodModels == null) {
methodModels = new ArrayList<ProviderMethodModel>(1);
methods.put(method.getName(), methodModels);
}
methodModels.add(new ProviderMethodModel(method, serviceName));
}
}
}
3.5.2.3、将要暴露的服务加入缓存
ApplicationModel.initProviderModel(pathKey, providerModel);
先看下initProviderModel这个方法:
public static void initProviderModel(String serviceName, ProviderModel providerModel) {
if (providedServices.putIfAbsent(serviceName, providerModel) != null) {
LOGGER.warn("Already register the same:" + serviceName);
}
}
/**
* full qualified class name -> provided service
*/
private static final ConcurrentMap<String, ProviderModel> providedServices = new ConcurrentHashMap<>();
其实就是往这个concurrentHashMap中放值。key和value如下:
3.5.2.4、接下来又是一个方法doExportUrlsFor1Protocol,继续看,这个方法可是有点长了
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
//1.取出协议名,没有设置的话默认是dubbo协议
String name = protocolConfig.getName();
if (StringUtils.isEmpty(name)) {
name = Constants.DUBBO;
}
//2.初始化一个hashMap
Map<String, String> map = new HashMap<String, String>();
//放入side
map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
//放入运行时的一些信息,比如时间戳,dubbo的版本,进程号,服务版本号等信息
appendRuntimeParameters(map);
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, provider, Constants.DEFAULT_KEY);
appendParameters(map, protocolConfig);
appendParameters(map, this);
//3.判断方法的相关配置是否为空
if (CollectionUtils.isNotEmpty(methods)) {
for (MethodConfig method : methods) {
appendParameters(map, method, method.getName());
String retryKey = method.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
}
}
List<ArgumentConfig> arguments = method.getArguments();
if (CollectionUtils.isNotEmpty(arguments)) {
for (ArgumentConfig argument : arguments) {
// convert argument type
if (argument.getType() != null && argument.getType().length() > 0) {
Method[] methods = interfaceClass.getMethods();
// visit all methods
if (methods != null && methods.length > 0) {
for (int i = 0; i < methods.length; i++) {
String methodName = methods[i].getName();
// target the method, and get its signature
if (methodName.equals(method.getName())) {
Class<?>[] argtypes = methods[i].getParameterTypes();
// one callback in the method
if (argument.getIndex() != -1) {
if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
} else {
// multiple callbacks in the method
for (int j = 0; j < argtypes.length; j++) {
Class<?> argclazz = argtypes[j];
if (argclazz.getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + j);
if (argument.getIndex() != -1 && argument.getIndex() != j) {
throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
}
}
}
}
}
}
} else if (argument.getIndex() != -1) {
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
}
}
}
} // end of methods for
}
//4 是否是泛型调用
if (ProtocolUtils.isGeneric(generic)) {
map.put(Constants.GENERIC_KEY, generic);
map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
} else {
//5获取修订版本号,其实取得就是接口的版本号
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put("revision", revision);
}
//6 获取接口里的方法
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
//如果接口中没有方法,就打印个警告日志:接口里没有方法
logger.warn("No method found in service interface " + interfaceClass.getName());
//往map里放入 methods->*
map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
} else {
//如果接口里存在方法,就将方法名放入map,多个用逗号隔开
map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
//7 判断token是否为空
if (!ConfigUtils.isEmpty(token)) {
if (ConfigUtils.isDefault(token)) {
map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
} else {
map.put(Constants.TOKEN_KEY, token);
}
}
// 8 获取提供者的ip地址和端口号
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = this.findConfigedPorts(protocolConfig, name, map);
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
//9 获取scope,这里为null,如果scope设置成none的话就不进行导出了
String scope = url.getParameter(Constants.SCOPE_KEY);
// 如果配置成none就不导出了
if (!Constants.SCOPE_NONE.equalsIgnoreCase(scope)) {
// 如果scope的值不是remote,就进行本地导出
if (!Constants.SCOPE_REMOTE.equalsIgnoreCase(scope)) {
//10 本地导出
exportLocal(url);
}
// 11 如果scope的值不是local,就进行远程导出
if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
//12 判断注册中心是否为空
if (CollectionUtils.isNotEmpty(registryURLs)) {
for (URL registryURL : registryURLs) {
//给url拼接上dynamic参数,这个参数是从注册中心的url上获取,没有的话默认是false
url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
//获取dubbo监控中心的url
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
//不为空的话拼接到url上面
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
// 13 从url上获取代理,对于提供者来说,可以用这个自定义的代理方式去创建invoker的
String proxy = url.getParameter(Constants.PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
//不为空的话拼接到注册中心的url上
registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
}
//14 通过代理生成invoker
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
//对exporter进行包装
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
//通过相关协议将invoker导出成exporter
Exporter<?> exporter = protocol.export(wrapperInvoker);
//添加到要导出的服务列表里
exporters.add(exporter);
}
} else {
//如果注册中心为空,直接生成invoker
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
//将invoker转换成exporter
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
/**
* @since 2.7.0
* ServiceData Store
*/
//15 元数据存储
MetadataReportService metadataReportService = null;
if ((metadataReportService = getMetadataReportService()) != null) {
metadataReportService.publishProvider(url);
}
}
}
this.urls.add(url);
}
@SuppressWarnings({"unchecked", "rawtypes"})
private void exportLocal(URL url) {
if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
URL local = URLBuilder.from(url)
.setProtocol(Constants.LOCAL_PROTOCOL)
.setHost(LOCALHOST_VALUE)
.setPort(0)
.build();
Exporter<?> exporter = protocol.export(
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
}
}
3.5.2.4.1、先看一眼入参:
3.5.2.4.2、一直运行到判断methods这一行,看下map中都存了什么玩意,先有个大概印象:
3.5.2.4.3、判断方法的相关配置是否为空,咱的测试类里并未设置这个配置,所以会跳过去
3.5.2.4.4、判断是否是泛型调用,如果是的话设置泛型类型,这里也是跳过
3.5.2.4.5、获取修订的版本号,其实取得就是接口的版本号
3.5.2.4.6、将接口里的方法名都放入map,多个用逗号隔开
3.5.2.4.7、判断token是否为空,这里有必要说下这个token,这个token是权限校验用的,意思要校验消费者有没有权限去调用这个服务,这个token属于提供者端的配置,消费者只有通过注册中心才能获取,目的就是为了防止消费者越过注册中心直接去调用用提供者。
3.5.2.4.9、如果scope配置成none的话就不进行导出了,然后如果scope的值只要不是remote,就进行本地导出,因为这里咱没配置scope,所以值是null,那妥妥的本地导出了。
3.5.2.4.10、exportLocal()方法,本地导出
@SuppressWarnings({"unchecked", "rawtypes"})
private void exportLocal(URL url) {
//1.只要协议不是injvm,就导出
if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
//2.将url转换为本地导出的url
URL local = URLBuilder.from(url)
.setProtocol(Constants.LOCAL_PROTOCOL)
.setHost(LOCALHOST_VALUE)
.setPort(0)
.build();
//3.调用protocol的export方法得到一个exporter,并添加到要导出的集合exporters中
Exporter<?> exporter = protocol.export(
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
}
}
3.5.2.4.10.1、只要协议不是injvm,就进行本地导出
3.5.2.4.10.2、传进来的url转换为本地导出的url,咱看下转换后的url是什么样子的:injvm://127.0.0.1/com.ayo.dubbo.service.UserService?anyhost=true&application=dubbo-provider&bind.ip=10.2.116.32&bind.port=20880&default.deprecated=false&default.dynamic=false&default.register=true&deprecated=false&dubbo=2.0.2&dynamic=false&generic=false&group=member&interface=com.ayo.dubbo.service.UserService&methods=say&pid=10384®ister=true&release=2.7.1&revision=1.0.0&side=provider×tamp=1606903863883&version=1.0.0
看见没,协议变成了injvm了,ip变成了127.0.0.1,端口号没了,这就是进行本地导出时候的url的变化,很重要,务必记住。
3.5.2.4.10.3、首先根据ref(要暴露接口的实现类)、接口类、本地导出的url调用proxyFactory的getInvoker方法获取一个invoker,然后再调用protocol的export方法将这个invoker导出成一个exporter!!本地导出完毕
3.5.2.4.11、如果scope的值不是local,就进行远程导出,由此可以推断,如果是local的时候只进行本地导出。
3.5.2.4.12、判断注册中心url是否为空,其实就是判断是否存在注册中心,因为远程暴露其实就是把服务注册到注册中心
3.5.2.4.13、判断代理方式是否为空,不为空的话会选择传过来的代理方式生成invoker,可以自定义。
3.5.2.4.14、通过代理生成invoker,通过相关协议将invoker导出成exporter,对exporter进行包装,然后将exporter添加到要导出的服务列表里。
这篇文章就先写到这里,主要分析了服务提供者导出服务的过程,但其实核心的export方法底层做了很多工作,限于篇幅,下一篇再说!