vlambda博客
学习文章列表

【第六弹】详解分布式中间件Dubbo

5. Dubbo SPI机制源码剖析

基于前面SPI机制的说明,可以看到SPI机制运行流程图如下:


SPI机制运行过程主要包括两部分:
(1)getExtensionLoader: 获取扩展点加载器,并加载所对应的所有的扩展点实现
(2)getExtension: 根据name获取扩展的指定实现
源码分析过程如下:
(1)实例化 ExtensionLoader

private static <T> boolean withExtensionAnnotation(Class<T> type) { // 包含`@SPI`注解在接口上 return type.isAnnotationPresent(SPI.class); } public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) { @SuppressWarnings("unchecked") // 必须传入类型 if (type == null) { throw new IllegalArgumentException("Extension type null"); } // 必须是接口类型 if (!type.isInterface()) { throw new IllegalArgumentException("Extension type (" + type + ") is not an interface!"); } // 必须包含SPI的注解 if (!withExtensionAnnotation(type)) { throw new IllegalArgumentException("Extension type (" + type + ") is not an extension, because it is NOT annotated with @" + SPI.class.getSimpleName() + "!"); } // 尝试从已经加载过的数据中去读取(缓存功能) ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type); if (loader == null) { // 如果没有的话,才会进行初始化,并且放入到缓存汇总 EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type)); loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type); } return loader;}

(2)获取 ExtensionFactory 对象

@SPIpublic interface ExtensionFactory { /** * Get extension. * * @param type object type. * @param name object name. * @return object instance. */ <T> T getExtension(Class<T> type, String name);}

(3)获取 ExtensionFactory工厂对象的具体实现,

private ExtensionLoader(Class<?> type) { this.type = type; // 这里需要对对象的工厂做额外的创建,可以看到扩展的工厂也是一个扩展点 objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());}



可以在 dubbo-common/src/main/resources/META-

INF/dubbo/internal/org.apache.dubbo.common.extension.ExtensionFactory 中看到,他默认有两个实现的提供(之前版本有三类,包括SpringExtensionFactory):

adaptive=org.apache.dubbo.common.extension.factory.AdaptiveExtensionFactoryspi=org.apache.dubbo.common.extension.factory.SpiExtensionFactory

(4)利用ExtensionFactory工厂对象的具体实现获取所有支持扩展信息的实现,即ExtensionLoader.getSupportedExtensions

public Set<String> getSupportedExtensions() { // 获取所有的扩展类信息 Map<String, Class<?>> clazzes = getExtensionClasses(); // 返回所有的扩展点名称 return Collections.unmodifiableSet(new TreeSet<>(clazzes.keySet()));}

(5)加载对应的扩展实现类,即getExtensionClasses
在通过名称获取扩展类之前,首先需要根据配置文件解析出扩展类名称到扩展类的映射关系表classes中,之后再根据扩展项名称从映射关系表中获取取对应的扩展类即可。

private Map<String, Class<?>> getExtensionClasses() { // 从缓存中获取已加载的扩展类 Map<String, Class<?>> classes = cachedClasses.get(); // 双重检查 if (classes == null) { // 为空的话,则锁住,标识只会被执行一次 synchronized (cachedClasses) { classes = cachedClasses.get(); if (classes == null) { // 进行加载信息 加载扩展类 classes = loadExtensionClasses(); cachedClasses.set(classes); } } } return classes;}

(6)接下来,加载当前SPI的默认实现以及加载这个类的所有扩展点实现,并且按照name和Class对象的形式存储,即loadExtensionClasses中cacheDefaultExtensionName 和 loadDirectory

private Map<String, Class<?>> loadExtensionClasses() { // 加载默认扩展的实现名称 cacheDefaultExtensionName(); // 获取其中每一种实现的名称和对应的classes // 具体的目录请参照下面的所有目录 Map<String, Class<?>> extensionClasses = new HashMap<>(); // internal extension load from ExtensionLoader's ClassLoader first loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY, type.getName(),true); loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"), true); loadDirectory(extensionClasses, DUBBO_DIRECTORY, type.getName()); loadDirectory(extensionClasses, DUBBO_DIRECTORY, type.getName().replace("org.apache", "com.alibaba")); loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName()); loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName().replace("org.apache", "com.alibaba")); return extensionClasses;}
private void cacheDefaultExtensionName() { // 获取当前类是否包含SPI注解,一般走到这里都是拥有这个注解的 final SPI defaultAnnotation = type.getAnnotation(SPI.class); if (defaultAnnotation == null) { return; } // 来获取其的value值,这个值主要的作用是设置这个SPI中的默认扩展名 // 比如LoadBalance的默认实现就是random。就是通过这里进行的设置 String value = defaultAnnotation.value(); if ((value = value.trim()).length() > 0) { String[] names = NAME_SEPARATOR.split(value); if (names.length > 1) { throw new IllegalStateException("More than 1 default extension name" "on extension" + type.getName()+ ": " + Arrays.toString(names)); } if (names.length == 1) { cachedDefaultName = names[0]; } }}

loadDirectory方法主要功能是从这个文件夹中寻找真正的文件列表,并且对其中的文件内容利用loadResource解析并且放入到extensionClasses Map中。

private void loadDirectory(Map<String, Class<?>> extensionClasses, String dir,String type, boolean extensionLoaderClassLoaderFirst) { // 文件名称规则: 路径/包名.接口名 String fileName = dir + type; try { // 寻找classloader和url列表 Enumeration<java.net.URL> urls = null; ClassLoader classLoader = findClassLoader(); // try to load from ExtensionLoader's ClassLoader first // 如果需要的话, 需要先从当前类的ClassLoader中寻找 if (extensionLoaderClassLoaderFirst) { ClassLoader extensionLoaderClassLoader = ExtensionLoader.class.getClassLoader(); if (ClassLoader.getSystemClassLoader() != extensionLoaderClassLoader) { urls = extensionLoaderClassLoader.getResources(fileName); } } // 如果找不到任何的URL列表,则继续尝试去其当前线程的ClassLoader中寻找 if(urls == null || !urls.hasMoreElements()) { if (classLoader != null) { urls = classLoader.getResources(fileName); } else { urls = ClassLoader.getSystemResources(fileName); } } // 如果存在文件的话 if (urls != null) { while (urls.hasMoreElements()) { // 遍历每一个资源文件,并且进行加载资源信息到extensionClasses,  //主要功能是读取文件内容 java.net.URL resourceURL = urls.nextElement(); loadResource(extensionClasses, classLoader, resourceURL); } } } catch (Throwable t) { logger.error("Exception occurred when loading extension class "(interface: " + type + ", description file: " + fileName + ").", t); }}

(7)读取文件操作,并利用loadClass来加载类信息,即loadResource

private void loadResource(Map<String, Class<?>> extensionClasses, ClassLoaderclassLoader, java.net.URL resourceURL) { try { // 读取文件 try (BufferedReader reader = new BufferedReader(new InputStreamReader(resourceURL.openStream(), StandardCharsets.UTF_8))) { String line; while ((line = reader.readLine()) != null) { // 截取文件#前面的内容 final int ci = line.indexOf('#'); if (ci >= 0) { line = line.substring(0, ci); } line = line.trim(); // 如果有内容的话 if (line.length() > 0) { try { // 则进行加载key=value的形式数据 String name = null; int i = line.indexOf('='); if (i > 0) { name = line.substring(0, i).trim(); line = line.substring(i + 1).trim(); } if (line.length() > 0) { // 对类信息进行加载操作 loadClass(extensionClasses, resourceURL, Class.forName(line, true, classLoader), name); } } catch (Throwable t) { IllegalStateException e = new IllegalStateException("Failed to load extension class (interface: " + type + ",class line: " + line + ") in " + resourceURL + ", cause: " + t.getMessage(), t); exceptions.put(line, e); } } } } } catch (Throwable t) { logger.error("Exception occurred when loading extension class" "(interface: " +type + ", class file: " + resourceURL + ") in " + resourceURL,t); }}

(8)最终加载实现类信息,即loadClass

private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URLresourceURL, Class<?> clazz, String name) throws NoSuchMethodException { // 当前扩展点的实现,必须是当前扩展接口的实现才可以 if (!type.isAssignableFrom(clazz)) { throw new IllegalStateException("Error occurred when loading extension" "class (interface: " +type + ", class line: " + clazz.getName() + "), class " + clazz.getName() + " is not subtype of interface."); } // 如果是包含了Adaptive注解,则认为是需要对扩展点包装的方法,这里只做了存储操作,存储至 //cachedAdaptiveClass中 if (clazz.isAnnotationPresent(Adaptive.class)) { cacheAdaptiveClass(clazz); } else if (isWrapperClass(clazz)) { // 判断是否是wapper类型, 是否构造函数中有该接口类型的传入 // wrapper类型的意思是,对当前的扩展点实现封装功能处理 cacheWrapperClass(clazz); } else { clazz.getConstructor(); // 寻找他是否已经定义过了名称, 这里就不继续往里面细看了,主要是获取当前类的 //org.apache.dubbo.common.Extension注解,如果有的话就使用这个名称,否则的话就是用当前类的 //简单名称 if (StringUtils.isEmpty(name)) { name = findAnnotationName(clazz); if (name.length() == 0) { throw new IllegalStateException("No such extension name for the" "class " + clazz.getName() + " in the config " + resourceURL); } } // 否则的话,就对这个名称和class做映射关系 String[] names = NAME_SEPARATOR.split(name); if (ArrayUtils.isNotEmpty(names)) { // 如果当前类拥有Activate注解,则将其进行添加到cachedActivates对象中,意味 //着需要执行 cacheActivateClass(clazz, names[0]); // 进行名称映射保存 for (String n : names) { cacheName(clazz, n); saveInExtensionClass(extensionClasses, clazz, n); } } }}

(9)上面已经加载好所有的扩展点实现,接下来根据name对扩展点进行处理和进行加锁来创建真实的引用,其中都是有使用缓存来处理。

public T getExtension(String name) { if (StringUtils.isEmpty(name)) { throw new IllegalArgumentException("Extension name null"); } // 获取当前SPi的默认扩展实现类 if ("true".equals(name)) { return getDefaultExtension(); } // 获取当前类的holder,实现原理和cachedClasses的方式相同,都是建立同一个引用后再进行加锁 final Holder<Object> holder = getOrCreateHolder(name); Object instance = holder.get(); if (instance == null) { synchronized (holder) { instance = holder.get(); if (instance == null) { // 真正进行创建实例 instance = createExtension(name); holder.set(instance); } } } return (T) instance;}
private Holder<Object> getOrCreateHolder(String name) { // 获取当前名称的和对象Holder的映射关系 Holder<Object> holder = cachedInstances.get(name); if (holder == null) { // 如果不存在的话,则使用putIfAbsent的原子操作来设置值,这个值可以保证多线程的额情 //况下有值的时候不处理,没有值进行保存 cachedInstances.putIfAbsent(name, new Holder<>()); // 获取真实的holder处理器 holder = cachedInstances.get(name); } return holder;}

(10)最终拿到指定名称的扩展点的实现类后的方法调用

private T createExtension(String name) { // 从配置文件中加载所有的扩展类 可以得到配置项名称 到配置类的映射关系 Class<?> clazz = getExtensionClasses().get(name); if (clazz == null) { throw findException(name); } try { // 获取是否已经有实例了 T instance = (T) EXTENSION_INSTANCES.get(clazz); if (instance == null) { // 没有的话,同样适用putIfAbsent的方式来保证只会创建一个对象并且保存 EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance()); instance = (T) EXTENSION_INSTANCES.get(clazz); } // 注入其他扩展点的实体,用于扩展点和其他的扩展点相互打通 injectExtension(instance); // 进行遍历所有的包装类信息,分别对包装的类进行包装实例化,并且返回自身引用 Set<Class<?>> wrapperClasses = cachedWrapperClasses; if (CollectionUtils.isNotEmpty(wrapperClasses)) { for (Class<?> wrapperClass : wrapperClasses) { // 同样进行注册其他扩展点的功能 instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance)); } } // 对扩展点进行初始化操作 initExtension(instance); return instance; } catch (Throwable t) { throw new IllegalStateException("Extension instance (name: " + name + ", "class: " +type + ") couldn't be instantiated: " + t.getMessage(), t); }}

总结:
第一阶段:ExtensionLoader执行getExtensionLoader,实际是先获取ExtensionFactory对应类型的具体实现,主要有两类,AdaptiveExtensionFactory和SpiExtensionFactory,那到对应类型的扩展点工厂后就去获取支持对应类型的所有扩展点实现,并将这个扩展点的所有实现按照name和Class对象的形式进行存储,进而执行loadDirectory的方法按照路径利用loadResource加载文件并交由loadClass 来加载实现类信息。
第二阶段:利用getExtension根据name对扩展点进行处理和进行加锁来创建真实的引用,进而调用其对应的方法,完成功能的实现。

【第六弹】详解分布式中间件Dubbo

入骨相思知不知


玲珑骰子安红豆

【第六弹】详解分布式中间件Dubbo


【第六弹】详解分布式中间件Dubbo
【第六弹】详解分布式中间件Dubbo
【第六弹】详解分布式中间件Dubbo
【第六弹】详解分布式中间件Dubbo

入我相思门,知我相思苦,长相思兮长相忆,短相思兮无穷极。

【第六弹】详解分布式中间件Dubbo