Dubbo服务的发布流程
今天这篇文章非常关键,堪称Dubbo的核心内容,出去面试的时候如果问你Dubbo相关的问题,十有八九是Dubbo服务的发布和引用流程,由于内容太多,今天先分析Dubbo的服务发布流程。
中的写的那个demo吗,其中Provider中的testProvider方法的第6步(导出服务),也就是serviceConfig.export()这个方法,服务提供方需要调用这个方法来发布服务。
看下serviceConfig.export()这个方法的代码:
public synchronized void export() {checkAndUpdateSubConfigs();//一、是否需要导出服务if (!shouldExport()) {return;}//二、是否需要延迟发布if (shouldDelay()) {delayExportExecutor.schedule(this::doExport, delay, TimeUnit.MILLISECONDS);} else {//三、直接发布doExport();}}
接下来对这个方法中的关键方法进行解析,checkAndUpdateSubConfigs()这个方法就不细说了,就是检测下配置,自己可以点点看
一、shouldExport()方法:
private boolean shouldExport() {//1/**是否要导出这个服务protected Boolean export;*/Boolean shouldExport = getExport();//2/**provider配置private ProviderConfig provider;*/if (shouldExport == null && provider != null) {shouldExport = provider.getExport();}//3// default value is trueif (shouldExport == null) {return true;}return shouldExport;}
1.1、getExport()这个方法你点进去你会发现他是dubbo配置类的一个属性,叫做:
/*** Whether to export the service(是否导出服务)*/protected Boolean export;
你可以初始化serviceConfig的时候给它设置值的:
1.2、如果没有配置export属性并且provider不为空(注意下这个provider
/*** The provider configuration*/private ProviderConfig provider;
其实也是serviceConfig的一个配置)就从provider中获取
1.3、经过前两步之后如果shouldExport还是空,那就默认为true,也就是导出服务
二、shouldDelay()方法,是否延迟发布
private boolean shouldDelay() {//1Integer delay = getDelay();//2/**延时发布多少毫秒protected Integer delay;*/if (delay == null && provider != null) {delay = provider.getDelay();}//3return delay != null && delay > 0;}
2.1、getDeley()方法其实取得是serviceConfig的delay属性
/*** The time delay register service (milliseconds)*/protected Integer delay;
可以看到单位是毫秒,如果设置了这个属性,就会延时多少毫秒之后发布服务。
2.2、如果没有设置delay的值并且provider配置不为空的话,就从provider中获取。
2.3、如果设置的delay不为空并且大于0的话肯定就是要延迟发布了。
通过上面的代码可知,Dubbo延迟发布其实是使用ScheduledExcutorService(定时任务线程池)来实现的
/*** A delayed exposure service timer*/private static final ScheduledExecutorService delayExportExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("DubboServiceDelayExporter", true));
三、doExport()直接发布,其实延时发布最终调用的也是这个方法
protected synchronized void doExport() {//1/**服务导出或者不导出的一个标识,如果true表示不用导出这个服务private transient volatile boolean unexported;*/if (unexported) {throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");}//2/**服务是否已经被导出,true表示该服务已经被导出了private transient volatile boolean exported;*/if (exported) {return;}//3 开始导出前把是否已经被导出标记成已导出exported = true;//4/**服务的名称private String path;*//**被导出服务的接口名private String interfaceName;*/if (StringUtils.isEmpty(path)) {path = interfaceName;}//5 导出成urldoExportUrls();}
3.1、判断这个服务是不是被标记成不导出,如果标记了就直接抛出异常
3.2、判断服务是否已被导出,如果已被导出,就直接返回
3.3、准备导出,导出前将exported标记为true
3.4、判断如果path为空,就把要导出服务的接口名字赋值给path
3.5、导出成url,可以看到又是一个方法,继续看doExportUrls()这个方法吧
@SuppressWarnings({"unchecked", "rawtypes"})private void doExportUrls() {//1 加载注册中心List<URL> registryURLs = loadRegistries(true);//2for (ProtocolConfig protocolConfig : protocols) {//2.1 封装pathKeyString pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);//2.2 将要暴露的服务封装成一个pojoProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);//2.3 将要暴露的服务加入缓存ApplicationModel.initProviderModel(pathKey, providerModel);//2.4doExportUrlsFor1Protocol(protocolConfig, registryURLs);}}
3.5.1、loadRegistries(),加载注册中心
protected List<URL> loadRegistries(boolean provider) {List<URL> registryList = new ArrayList<URL>();// 1 判断注册中心是否为空if (CollectionUtils.isNotEmpty(registries)) {for (RegistryConfig config : registries) {//2 获取注册中心地址String address = config.getAddress();if (StringUtils.isEmpty(address)) {//3 如果注册中心地址不存在,就给个默认值0.0.0.0address = Constants.ANYHOST_VALUE;}//4 如果注册中新地址不可用,也就是直连的意思 N/A,前面加非,意思就是不是直连的时候if (!RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {Map<String, String> map = new HashMap<String, String>();//5 应用信息添加到map中,比如应用名appendParameters(map, application);//6 将注册中心的配置放入mapappendParameters(map, config);//7 将path放入map,"path" -> "org.apache.dubbo.registry.RegistryService"map.put(Constants.PATH_KEY, RegistryService.class.getName());//8 将dubbo的版本,当前的时间戳以及运行时的进程号放入mapappendRuntimeParameters(map);//9 如果参数中不包含协议,那么协议默认为Dubbo协议if (!map.containsKey(Constants.PROTOCOL_KEY)) {map.put(Constants.PROTOCOL_KEY, Constants.DUBBO_PROTOCOL);}//10 把map转换成url zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-provider&dubbo=2.0.2&pid=1736&release=2.7.1×tamp=1606877890879List<URL> urls = UrlUtils.parseURLs(address, map);//11 遍历urlfor (URL url : urls) {//12 改变url registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-provider&dubbo=2.0.2&pid=1736®istry=zookeeper&release=2.7.1×tamp=1606877890879url = URLBuilder.from(url).addParameter(Constants.REGISTRY_KEY, url.getProtocol()).setProtocol(Constants.REGISTRY_PROTOCOL).build();if ((provider && url.getParameter(Constants.REGISTER_KEY, true))|| (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {//13 放入注册中心集合,因为可能有多个注册中心registryList.add(url);}}}}}return registryList;}
接下来对上面的每一步都进行详细的解析,因为真的很重要:
3.5.1.1、判断注册中心是否为空,我们看下这个registries
/*** Registry centers 注册中心配置*/protected List<RegistryConfig> registries;
为什么是个list?因为dubbo服务可以被暴露到多个注册中心,比如我可以同时把我的服务暴露到zk和redis,就是这个道理
3.5.1.3、如果这个adress为空的话,就给个缺省值0.0.0.0
3.5.1.4、如果注册中心不可用,意思就是直连,用过dubbo的人应该知道N/A,其实就是不连注册中心,消费者直接调用提供者,这里是不是直连的时候进入,所以配置不能是N/A,我现在用的zk,所以肯定会进入的,继续往下走
3.5.1.5、将应用信息放入map,其实就是初始化ApplicationConfig的时候传入的那个dubbo-provider,标识提供者的名称
3.5.1.6、将注册中心配置放入map,可以看到并未放入任何东西,map的大小还是1
3.5.1.7、将path放入map
3.5.1.8、添加一些运行时的信息
3.5.1.9、添加协议,如果没有设置,默认是Dubbo协议
3.5.1.11-13、转换url,注意看下和上面的有什么不同
对比一下你会发现url的前缀由zookeeper变成了registry了,并且连接中拼上了registry=zookeeper,然后将url放入注册中心集合。
3.5.2、测试类里咱并未给serviceConfig设置protocols属性,看看默认的是什么
3.5.2.1、封装pathKey
这里面path是服务名,也就是你要暴露的服务,值是你暴露的接口的名字,group就是你设置的dubbo的分组,version就是服务的版本,所以经过这句代码之后,看下pathKey的值
String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
3.5.2.2、将要暴露的服务封装成一个pojo
ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
先看下这句代码里的providerModel这个类:
package org.apache.dubbo.rpc.model;import java.lang.reflect.Method;import java.util.ArrayList;import java.util.Arrays;import java.util.HashMap;import java.util.List;import java.util.Map;/**要发布的服务的模型,意思就是把你要发布的服务封装成一个pojo了* ProviderModel which is about published services*/public class ProviderModel {//服务名,这个其实就是pathKeyprivate final String serviceName;//服务的实例private final Object serviceInstance;//服务的接口名private final Class<?> serviceInterfaceClass;//接口里的方法private final Map<String, List<ProviderMethodModel>> methods = new HashMap<String, List<ProviderMethodModel>>();public ProviderModel(String serviceName, Object serviceInstance, Class<?> serviceInterfaceClass) {if (null == serviceInstance) {throw new IllegalArgumentException("Service[" + serviceName + "]Target is NULL.");}this.serviceName = serviceName;this.serviceInstance = serviceInstance;this.serviceInterfaceClass = serviceInterfaceClass;initMethod();}public String getServiceName() {return serviceName;}public Class<?> getServiceInterfaceClass() {return serviceInterfaceClass;}public Object getServiceInstance() {return serviceInstance;}public List<ProviderMethodModel> getAllMethods() {List<ProviderMethodModel> result = new ArrayList<ProviderMethodModel>();for (List<ProviderMethodModel> models : methods.values()) {result.addAll(models);}return result;}public ProviderMethodModel getMethodModel(String methodName, String[] argTypes) {List<ProviderMethodModel> methodModels = methods.get(methodName);if (methodModels != null) {for (ProviderMethodModel methodModel : methodModels) {if (Arrays.equals(argTypes, methodModel.getMethodArgTypes())) {return methodModel;}}}return null;}private void initMethod() {Method[] methodsToExport = null;methodsToExport = this.serviceInterfaceClass.getMethods();for (Method method : methodsToExport) {method.setAccessible(true);List<ProviderMethodModel> methodModels = methods.get(method.getName());if (methodModels == null) {methodModels = new ArrayList<ProviderMethodModel>(1);methods.put(method.getName(), methodModels);}methodModels.add(new ProviderMethodModel(method, serviceName));}}}
3.5.2.3、将要暴露的服务加入缓存
ApplicationModel.initProviderModel(pathKey, providerModel);
先看下initProviderModel这个方法:
public static void initProviderModel(String serviceName, ProviderModel providerModel) {if (providedServices.putIfAbsent(serviceName, providerModel) != null) {LOGGER.warn("Already register the same:" + serviceName);}}
/*** full qualified class name -> provided service*/private static final ConcurrentMap<String, ProviderModel> providedServices = new ConcurrentHashMap<>();
其实就是往这个concurrentHashMap中放值。key和value如下:
3.5.2.4、接下来又是一个方法doExportUrlsFor1Protocol,继续看,这个方法可是有点长了
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {//1.取出协议名,没有设置的话默认是dubbo协议String name = protocolConfig.getName();if (StringUtils.isEmpty(name)) {name = Constants.DUBBO;}//2.初始化一个hashMapMap<String, String> map = new HashMap<String, String>();//放入sidemap.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);//放入运行时的一些信息,比如时间戳,dubbo的版本,进程号,服务版本号等信息appendRuntimeParameters(map);appendParameters(map, application);appendParameters(map, module);appendParameters(map, provider, Constants.DEFAULT_KEY);appendParameters(map, protocolConfig);appendParameters(map, this);//3.判断方法的相关配置是否为空if (CollectionUtils.isNotEmpty(methods)) {for (MethodConfig method : methods) {appendParameters(map, method, method.getName());String retryKey = method.getName() + ".retry";if (map.containsKey(retryKey)) {String retryValue = map.remove(retryKey);if ("false".equals(retryValue)) {map.put(method.getName() + ".retries", "0");}}List<ArgumentConfig> arguments = method.getArguments();if (CollectionUtils.isNotEmpty(arguments)) {for (ArgumentConfig argument : arguments) {// convert argument typeif (argument.getType() != null && argument.getType().length() > 0) {Method[] methods = interfaceClass.getMethods();// visit all methodsif (methods != null && methods.length > 0) {for (int i = 0; i < methods.length; i++) {String methodName = methods[i].getName();// target the method, and get its signatureif (methodName.equals(method.getName())) {Class<?>[] argtypes = methods[i].getParameterTypes();// one callback in the methodif (argument.getIndex() != -1) {if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {appendParameters(map, argument, method.getName() + "." + argument.getIndex());} else {throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());}} else {// multiple callbacks in the methodfor (int j = 0; j < argtypes.length; j++) {Class<?> argclazz = argtypes[j];if (argclazz.getName().equals(argument.getType())) {appendParameters(map, argument, method.getName() + "." + j);if (argument.getIndex() != -1 && argument.getIndex() != j) {throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());}}}}}}}} else if (argument.getIndex() != -1) {appendParameters(map, argument, method.getName() + "." + argument.getIndex());} else {throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");}}}} // end of methods for}//4 是否是泛型调用if (ProtocolUtils.isGeneric(generic)) {map.put(Constants.GENERIC_KEY, generic);map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);} else {//5获取修订版本号,其实取得就是接口的版本号String revision = Version.getVersion(interfaceClass, version);if (revision != null && revision.length() > 0) {map.put("revision", revision);}//6 获取接口里的方法String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();if (methods.length == 0) {//如果接口中没有方法,就打印个警告日志:接口里没有方法logger.warn("No method found in service interface " + interfaceClass.getName());//往map里放入 methods->*map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);} else {//如果接口里存在方法,就将方法名放入map,多个用逗号隔开map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));}}//7 判断token是否为空if (!ConfigUtils.isEmpty(token)) {if (ConfigUtils.isDefault(token)) {map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());} else {map.put(Constants.TOKEN_KEY, token);}}// 8 获取提供者的ip地址和端口号String host = this.findConfigedHosts(protocolConfig, registryURLs, map);Integer port = this.findConfigedPorts(protocolConfig, name, map);URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).hasExtension(url.getProtocol())) {url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getExtension(url.getProtocol()).getConfigurator(url).configure(url);}//9 获取scope,这里为null,如果scope设置成none的话就不进行导出了String scope = url.getParameter(Constants.SCOPE_KEY);// 如果配置成none就不导出了if (!Constants.SCOPE_NONE.equalsIgnoreCase(scope)) {// 如果scope的值不是remote,就进行本地导出if (!Constants.SCOPE_REMOTE.equalsIgnoreCase(scope)) {//10 本地导出exportLocal(url);}// 11 如果scope的值不是local,就进行远程导出if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) {if (logger.isInfoEnabled()) {logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);}//12 判断注册中心是否为空if (CollectionUtils.isNotEmpty(registryURLs)) {for (URL registryURL : registryURLs) {//给url拼接上dynamic参数,这个参数是从注册中心的url上获取,没有的话默认是falseurl = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));//获取dubbo监控中心的urlURL monitorUrl = loadMonitor(registryURL);if (monitorUrl != null) {//不为空的话拼接到url上面url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());}if (logger.isInfoEnabled()) {logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);}// 13 从url上获取代理,对于提供者来说,可以用这个自定义的代理方式去创建invoker的String proxy = url.getParameter(Constants.PROXY_KEY);if (StringUtils.isNotEmpty(proxy)) {//不为空的话拼接到注册中心的url上registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);}//14 通过代理生成invokerInvoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));//对exporter进行包装DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);//通过相关协议将invoker导出成exporterExporter<?> exporter = protocol.export(wrapperInvoker);//添加到要导出的服务列表里exporters.add(exporter);}} else {//如果注册中心为空,直接生成invokerInvoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);//将invoker转换成exporterExporter<?> exporter = protocol.export(wrapperInvoker);exporters.add(exporter);}/*** @since 2.7.0* ServiceData Store*///15 元数据存储MetadataReportService metadataReportService = null;if ((metadataReportService = getMetadataReportService()) != null) {metadataReportService.publishProvider(url);}}}this.urls.add(url);}@SuppressWarnings({"unchecked", "rawtypes"})private void exportLocal(URL url) {if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {URL local = URLBuilder.from(url).setProtocol(Constants.LOCAL_PROTOCOL).setHost(LOCALHOST_VALUE).setPort(0).build();Exporter<?> exporter = protocol.export(proxyFactory.getInvoker(ref, (Class) interfaceClass, local));exporters.add(exporter);logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");}}
3.5.2.4.1、先看一眼入参:
3.5.2.4.2、一直运行到判断methods这一行,看下map中都存了什么玩意,先有个大概印象:
3.5.2.4.3、判断方法的相关配置是否为空,咱的测试类里并未设置这个配置,所以会跳过去
3.5.2.4.4、判断是否是泛型调用,如果是的话设置泛型类型,这里也是跳过
3.5.2.4.5、获取修订的版本号,其实取得就是接口的版本号
3.5.2.4.6、将接口里的方法名都放入map,多个用逗号隔开
3.5.2.4.7、判断token是否为空,这里有必要说下这个token,这个token是权限校验用的,意思要校验消费者有没有权限去调用这个服务,这个token属于提供者端的配置,消费者只有通过注册中心才能获取,目的就是为了防止消费者越过注册中心直接去调用用提供者。
3.5.2.4.9、如果scope配置成none的话就不进行导出了,然后如果scope的值只要不是remote,就进行本地导出,因为这里咱没配置scope,所以值是null,那妥妥的本地导出了。
3.5.2.4.10、exportLocal()方法,本地导出
@SuppressWarnings({"unchecked", "rawtypes"})private void exportLocal(URL url) {//1.只要协议不是injvm,就导出if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {//2.将url转换为本地导出的urlURL local = URLBuilder.from(url).setProtocol(Constants.LOCAL_PROTOCOL).setHost(LOCALHOST_VALUE).setPort(0).build();//3.调用protocol的export方法得到一个exporter,并添加到要导出的集合exporters中Exporter<?> exporter = protocol.export(proxyFactory.getInvoker(ref, (Class) interfaceClass, local));exporters.add(exporter);logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");}}
3.5.2.4.10.1、只要协议不是injvm,就进行本地导出
3.5.2.4.10.2、传进来的url转换为本地导出的url,咱看下转换后的url是什么样子的:injvm://127.0.0.1/com.ayo.dubbo.service.UserService?anyhost=true&application=dubbo-provider&bind.ip=10.2.116.32&bind.port=20880&default.deprecated=false&default.dynamic=false&default.register=true&deprecated=false&dubbo=2.0.2&dynamic=false&generic=false&group=member&interface=com.ayo.dubbo.service.UserService&methods=say&pid=10384®ister=true&release=2.7.1&revision=1.0.0&side=provider×tamp=1606903863883&version=1.0.0
看见没,协议变成了injvm了,ip变成了127.0.0.1,端口号没了,这就是进行本地导出时候的url的变化,很重要,务必记住。
3.5.2.4.10.3、首先根据ref(要暴露接口的实现类)、接口类、本地导出的url调用proxyFactory的getInvoker方法获取一个invoker,然后再调用protocol的export方法将这个invoker导出成一个exporter!!本地导出完毕
3.5.2.4.11、如果scope的值不是local,就进行远程导出,由此可以推断,如果是local的时候只进行本地导出。
3.5.2.4.12、判断注册中心url是否为空,其实就是判断是否存在注册中心,因为远程暴露其实就是把服务注册到注册中心
3.5.2.4.13、判断代理方式是否为空,不为空的话会选择传过来的代理方式生成invoker,可以自定义。
3.5.2.4.14、通过代理生成invoker,通过相关协议将invoker导出成exporter,对exporter进行包装,然后将exporter添加到要导出的服务列表里。
这篇文章就先写到这里,主要分析了服务提供者导出服务的过程,但其实核心的export方法底层做了很多工作,限于篇幅,下一篇再说!
