vlambda博客
学习文章列表

dubbo的spi机制分析和实战案例

技术文章第一时间送达!


本文同步更新到CSDN:

https://me.csdn.net/Danny_idea

1. 

2. 

3. 

4. 

java里面提供了一种内置的服务提供和发现机制,可以通过配置让一个程序在运行的时候动态加载该类的具体实现。这样子我们可以在调用某个相应接口的时候,同时达到调用某些具体类的实现功能。

具体的代码案例如下所示:

首先定义一个接口和两个接口的实现类

接口

/**
 * @author idea
 * @date 2019/5/16
 */

public interface PersonAction {
    void say();
}

接口实现类

然后我们需要在META-INF/services的文件夹底下配置一份文件:
(ps:这里的配置文件命名方式为类所在包名+类名)

dubbo的spi机制分析和实战案例


这份文件里面加入以下的配置信息:


com.sise.dubbo.spi.SpiMainTest
com.sise.dubbo.spi.SpiSubTest

接着是编写测试类代码

import java.util.ServiceLoader;

/**
 * @author idea
 * @date 2019/5/16
 */

public class Demo {

    public static void main(String[] args) {
        ServiceLoader<PersonAction> serviceLoader=ServiceLoader.load(PersonAction.class);
        System.out.println("this is java spi");
        serviceLoader.forEach(PersonAction::say);
    }
}

当我们执行代码之后,会发现控制台输出了相应的内容:

this is java spi
this is a SpiMainTest
this is a SpiSubTest

其实jdk自带的spi功能的实现原理分为了以下几步

1.首先通过java.util.ServiceLoader来加载META-INF/services/文件夹底下的类信息
2.在运行期间需要引用相关类的时候,对加载到内存的类进行搜索和分析,进行实例化调用。

为什么是META-INF/services该文件夹呢?

在ServiceLoader类里面,我们可以通过阅读源码看到它在加载配置的时候会指定默认的加载位置META-INF/services文件夹。

ServiceLoader会将该文件底下的配置类信息全部加载存储到内存中,然后在接口进行实例化的时候提供相应的实现类进行对象的实例化功能。这一点和ioc的思想有点类似,通过一个可插拔式的方式来对类的实例化进行控制。

dubbo的spi机制分析和实战案例


dubbo的spi机制分析和实战案例

在了解了java的spi功能之后,我们不妨再来看看dubbo的spi扩展机制。

先用一些实际的案例来进行实战的演练,然后再进行原理性的分析:

基于dubbo的spi实现自定义负载均衡算法

dubbo里面提供了一个可扩展的LoadBalance类专门供开发者们进行扩展:

/**
 * LoadBalance. (SPI, Singleton, ThreadSafe)
 * 
 * <a href="http://en.wikipedia.org/wiki/Load_balancing_(computing)">Load-Balancing</a>
 * 
 * @see com.alibaba.dubbo.rpc.cluster.Cluster#join(Directory)
 * @author qian.lei
 * @author william.liangf
 */

@SPI(RandomLoadBalance.NAME)
public interface LoadBalance {

    /**
     * select one invoker in list.
     * 
     * @param invokers invokers.
     * @param url refer url
     * @param invocation invocation.
     * @return selected invoker.
     */

    @Adaptive("loadbalance")
    <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;

}

这个类的头部加入了 @SPI 的注解标识,申明了该类是可以进行自定义拓展的。

在了解了loadBalance之后,我们需要在客户端加入自定义的负载均衡器代码,实现loadBalance接口

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.cluster.LoadBalance;

import java.util.List;

/**
 * @author idea
 * @data 2019/5/18
 */

public class MyLoadBalance implements LoadBalance {

    @Override
    public <T> Invoker<T> select(List<Invoker<T>> list, URL url, Invocation invocation) throws RpcException {
        System.out.println("执行自定义的负载均衡算法");
        for (Invoker<T> tInvoker : list) {
            //可以根据url里面的相关参数做负载均衡计算
            System.out.println("url: "+tInvoker.getUrl());
        }
        //默认只请求第一台服务器
        return list.get(0);
    }
}

这是最为基本的一种自定义负载均衡策略(永远只能请求一台机器)这种方式过于简陋,那么我们来对应用场景进行一些拓展吧。

假设说现在有个需求,由于某些特定的业务常景所需,要求consumer端在9-18点之间只能请求A机器(或者说更多机器),在18-23点之间请求B机器(或者说更多机器),其余时间可以任意请求,那么这个场景下,dubbo自带的负载均衡策略
ConsistentHashLoadBalance, RandomLoadBalance, RoundRobinLoadBalance, LeastActiveLoadBalance
均不支持,负载均衡该如何实现呢?

这个时候我们只能通过spi机制来自定义一套负载均衡策略进行实现了:

package com.sise.dubbo.config.loadBalanceSpi;

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.cluster.LoadBalance;

import java.time.LocalTime;
import java.util.List;
import java.util.Random;

/**
 *
 * @author idea
 * @data 2019/5/18
 */

public class MyLoadBalance implements LoadBalance {

    private final String A_MACHINE_HOST_PORT = "192.168.43.191:20880";

    private final String B_MACHINE_HOST_PORT = "192.168.43.191:20880";

    @Override
    public <T> Invoker<T> select(List<Invoker<T>> list, URL url, Invocation invocation) throws RpcException {
        System.out.println("执行自定义的负载均衡算法");
        //模拟场景
        System.out.println(url);
        int currentHour = LocalTime.now().getHour();
        if (currentHour >= 9 && currentHour <= 18) {
            System.out.println("请求A机器");
            findInvokerInList(list, A_MACHINE_HOST_PORT);
        } else if (currentHour >= 18 && currentHour <= 23) {
            System.out.println("请求B机器");
            findInvokerInList(list, B_MACHINE_HOST_PORT);
        }
        int randIndex = new Random().nextInt(list.size());
        return list.get(randIndex);
    }


    /**
     * 从服务列表里面进行dubbo服务地址匹配
     *
     * @param list
     * @param matchKey
     * @param <T>
     * @return
     */

    private <T> Invoker findInvokerInList(List<Invoker<T>> list, String matchKey) {
        for (Invoker tInvoker : list) {
            String addr = tInvoker.getUrl().getHost() + tInvoker.getUrl().getPort();
            if (matchKey.equals(addr)) {
                return tInvoker;
            }
        }
        return null;
    }
}

然后在META-INF/dubbo文件夹底下配置一份纯文本的配置文件,文件命名为:

com.alibaba.dubbo.rpc.cluster.LoadBalance 

(ps:不同版本的dubbo,LoadBalance的包名可能不同)

dubbo的spi机制分析和实战案例


在这份文件里面写入这么一行内容(有点key,value的味道)


mylb=com.sise.dubbo.config.loadBalanceSpi.MyLoadBalance

在consumer端的配置文件中写入以下内容,这里的loadbalance需要和配置文件里的mylb一致。

 <dubbo:reference interface="com.sise.dubbo.api.UserRpcService" id="userRpcService" loadbalance="mylb" />

然后我们可以启动多台provider,用consumer去调用这些服务进行测试,通过调整机器的时间点,控制台就会打印出不同的属性信息

请求B机器
执行自定义的负载均衡算法
zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?anyhost=true&application=consumer&check=false&dubbo=2.5.3&interface=com.sise.dubbo.api.UserRpcService&loadbalance=mylb&methods=findByUsername,findAll,printStr&pid=12460&printStr.async=true&service.filter=MyFilter&side=consumer&timestamp=1558143174084&weight=1600
请求A机器
执行自定义的负载均衡算法
zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?anyhost=true&application=consumer&check=false&dubbo=2.5.3&interface=com.sise.dubbo.api.UserRpcService&loadbalance=mylb&methods=findByUsername,findAll,printStr&pid=12460&printStr.async=true&service.filter=MyFilter&side=consumer&timestamp=1558143174084&weight=1600

通过上述的这种思路,我们借助dubbo的spi机制来加载满足自己特殊业务的负载均衡器,使得该框架的灵活性更高,扩展性更强。

自定义的dubbo过滤器

基于spi的扩展机制,dubbo里面还提供了对于filter类型的自定义拓展。开发者可以自定义一套filter来进行对于请求的功能拦截和校验,这个有点类似于springmvc里面的filter过滤器,通过特定的过滤器拦截数据之后,可以结合特殊的业务场景来做一些控制性的功能。

如何建立自己的filter过滤器?

首先我们需要在provider模块那定义一个filter类:

package com.sise.dubbo.config.filterSpi;

import com.alibaba.dubbo.rpc.*;

/**
 * @author idea
 * @date 2019/5/17
 */

public class MyFilter implements Filter {

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        System.out.println("this is before");
        Result result = invoker.invoke(invocation);
        System.out.println("this is after");
        return result;
    }
}

然后在META-INF/dubbo文件夹底下去创建相应的配置文件:(这个项目里面我还加入了其他的spi配置,不过对于过滤器配置没有影响)

dubbo的spi机制分析和实战案例


配置里面需要加入下边的内容:


MyFilter=com.sise.dubbo.config.filterSpi.MyFilter

对于过滤器的xml配置只需要在相应的provider的xml中加入

    <dubbo:provider filter="MyFilter"></dubbo:provider>

如果只是想对某个服务进行过滤操作的话,可以这么配置:

    <!-- 需要暴露的服务接口 -->
    <dubbo:service interface="com.sise.dubbo.api.UserRpcService" ref="userRpcService"  filter="MyFilter"/>

通常我们可以基于自定义的filter来实现一些服务调度的权限校验,调度次数统计等功能,但是注意在拦截请求的时候对于性能方面的把控,有时候也可以加入一些特殊ip的拦截校验功能,主要还是需要结合特殊的业务场景来实现。

dubbo本身的可扩展性极强,阿里巴巴团队在官方文档上边给出了十多种常用的spi扩展配置方式,这里主要只展示了两种常见的spi扩展,剩余的可以自行前往官网去查看文档讲解。

dubbo的spi加载原理

拿dubbo的spi来说,它在运行的时候会通过一个叫做ExtensionLoader的加载器来进行dubbo的扩展点加载。
我们可以进入ExtensionLoader这个类里面先进行初步的阅览:

dubbo的spi机制分析和实战案例


核心的加载逻辑图如下所示:

dubbo的spi机制分析和实战案例


通过getExtension函数来加载类:

dubbo的spi机制分析和实战案例


这里面有用到了加锁双重判断,主要是初始化加载之后,这些扩展类信息会被放入到一个ConcurrentMap<string, holder<="" object="" style="font-size: inherit; color: inherit; line-height: inherit;">> cachedInstances 里面。


进入createExtension函数里面,我们会看到以下内容:

dubbo的spi机制分析和实战案例


这段代码的核心操作在于getExtensionClasses函数,再进入该函数中阅读源码:
会发现又是一次双重判断加锁的加载

dubbo的spi机制分析和实战案例


这里面的loadExtensionClasses函数是加载扩展配置类信息的作用,进去之后进行源码阅读会发现:

dubbo的spi机制分析和实战案例


loadFile函数对dubbo配置里面的 META-INF/services/META-INF/dubbo/ ,META-INF/dubbo/internal/目录都进行了类的加载。这一点相比于jdk自带的spi加载所支持的目录要多。

再点进去loadFile源码里面,核心的类加载功能就会展示出来了:

   private void loadFile(Map<String, Class<?>> extensionClasses, String dir) {
        String fileName = dir + type.getName();
        try {
            Enumeration<java.net.URL> urls;
            ClassLoader classLoader = findClassLoader();
            if (classLoader != null) {
                urls = classLoader.getResources(fileName);
            } else {
                urls = ClassLoader.getSystemResources(fileName);
            }
            if (urls != null) {
                while (urls.hasMoreElements()) {
                    java.net.URL url = urls.nextElement();
                    try {
                        BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream(), "utf-8"));
                        try {
                            String line = null;
                            while ((line = reader.readLine()) != null) {
                                final int ci = line.indexOf('#');
                                if (ci >= 0) line = line.substring(0, ci);
                                line = line.trim();
                                if (line.length() > 0) {
                                    try {
                                        String name = null;
                                        int i = line.indexOf('=');
                                        if (i > 0) {
                                            name = line.substring(0, i).trim();
                                            line = line.substring(i + 1).trim();
                                        }
                                        if (line.length() > 0) {
                                            Class<?> clazz = Class.forName(line, true, classLoader);
                                            if (! type.isAssignableFrom(clazz)) {
                                                throw new IllegalStateException("Error when load extension class(interface: " +
                                                        type + ", class line: " + clazz.getName() + "), class " 
                                                        + clazz.getName() + "is not subtype of interface.");
                                            }
                                            if (clazz.isAnnotationPresent(Adaptive.class)) {
                                                if(cachedAdaptiveClass == null) {
                                                    cachedAdaptiveClass = clazz;
                                                } else if (! cachedAdaptiveClass.equals(clazz)) {
                                                    throw new IllegalStateException("More than 1 adaptive class found: "
                                                            + cachedAdaptiveClass.getClass().getName()
                                                            + ", " + clazz.getClass().getName());
                                                }
                                            } else {
                                                try {
                                                    clazz.getConstructor(type);
                                                    Set<Class<?>> wrappers = cachedWrapperClasses;
                                                    if (wrappers == null) {
                                                        cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
                                                        wrappers = cachedWrapperClasses;
                                                    }
                                                    wrappers.add(clazz);
                                                } catch (NoSuchMethodException e) {
                                                    clazz.getConstructor();
                                                    if (name == null || name.length() == 0) {
                                                        name = findAnnotationName(clazz);
                                                        if (name == null || name.length() == 0) {
                                                            if (clazz.getSimpleName().length() > type.getSimpleName().length()
                                                                    && clazz.getSimpleName().endsWith(type.getSimpleName())) {
                                                                name = clazz.getSimpleName().substring(0, clazz.getSimpleName().length() - type.getSimpleName().length()).toLowerCase();
                                                            } else {
                                                                throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + url);
                                                            }
                                                        }
                                                    }
                                                    String[] names = NAME_SEPARATOR.split(name);
                                                    if (names != null && names.length > 0) {
                                                        Activate activate = clazz.getAnnotation(Activate.class);
                                                        if (activate != null) {
                                                            cachedActivates.put(names[0], activate);
                                                        }
                                                        for (String n : names) {
                                                            if (! cachedNames.containsKey(clazz)) {
                                                                cachedNames.put(clazz, n);
                                                            }
                                                            Class<?> c = extensionClasses.get(n);
                                                            if (c == null) {
                                                                extensionClasses.put(n, clazz);
                                                            } else if (c != clazz) {
                                                                throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName());
                                                            }
                                                        }
                                                    }
                                                }
                                            }
                                        }
                                    } catch (Throwable t) {
                                        IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: " + type + ", class line: " + line + ") in " + url + ", cause: " + t.getMessage(), t);
                                        exceptions.put(line, e);
                                    }
                                }
                            } // end of while read lines
                        } finally {
                            reader.close();
                        }
                    } catch (Throwable t) {
                        logger.error("Exception when load extension class(interface: " +
                                            type + ", class file: " + url + ") in " + url, t);
                    }
                } // end of while urls
            }
        } catch (Throwable t) {
            logger.error("Exception when load extension class(interface: " +
                    type + ", description file: " + fileName + ").", t);
        }
    }

这段代码由于比较冗长,因为dubbo在进行实际加载的过程中需要考虑很多的因素,主要目的就是实现加载指定目录底下的拓展类并且将其存入一个map中缓存起来。

这段代码我进行了稍微的改写之后,变成了一个比较简单的util类,简化学习和理解的难度

package com.sise.dubbo.spi.myspi;

import com.sise.dubbo.spi.spidemo.UserService;
import com.sise.dubbo.spi.spidemo.UserServiceImpl;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URL;
import java.util.Enumeration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author idea
 * @date 2019/5/17
 */

public class MySpiUtil {

    /**
    * 这里自定义了加载配置的文件夹
    **/

    private static final String SPI_DIR = "META-INF/idea/";

    private Map<String, Class<?>> classMap = new ConcurrentHashMap<>();

    /**
     * 加载目录底下的文件信息
     *
     * @param clazz
     */

    public void loadDirectory(Class clazz) {
        String fileName = SPI_DIR + clazz.getName();
        ClassLoader classLoader = this.getClass().getClassLoader();
        try {
            Enumeration<URL> resources = classLoader.getResources(fileName);
            if (resources != null) {
                while (resources.hasMoreElements()) {
                    URL url = resources.nextElement();
                    loadResource(classLoader, url);
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }


    public void loadResource(ClassLoader classLoader, URL url) {
        //读取配置文件里面的内容
        try {
            BufferedReader reader = new BufferedReader(
                    new InputStreamReader(url.openStream(), "utf-8"));
            String line;
            while ((line = reader.readLine()) != null) {
                int c = line.indexOf("#");
                //该行内容没有注释
                if (c <= 0) {
                    line = line.trim();
                    if (line.length() > 0) {
                        int splitIndex = line.indexOf("=");
                        String name = line.substring(0, splitIndex).trim();
                        String className = line.substring(splitIndex + 1).trim();
                        classMap.put(name, Class.forName(className, true, classLoader));
                    }

                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
    }


    public static void main(String[] args) throws IllegalAccessException, InstantiationException {
        MySpiUtil mySpiUtil = new MySpiUtil();
        mySpiUtil.loadDirectory(UserService.class);
        UserServiceImpl userService = (UserServiceImpl) mySpiUtil.classMap.get("UserService").newInstance();
        userService.say();
    }
}

相关的待加载服务代码:

dubbo的spi机制分析和实战案例

然后根据代码里面的指定目录进行配置文件的放置:

dubbo的spi机制分析和实战案例


配置文件也是按照dubbo的spi配置文件的格式来书写:


UserService=com.sise.dubbo.spi.spidemo.UserServiceImpl

运行程序之后,便可加载到相应的类并进行执行:

spi技术在java中应用场景比较广泛,通常在开发的时候为了实现接口自动寻找实现类的功能,可以通过spi来进行实现,将接口的实现类转移到一份配置文件中来进行控制。jdk自带的spi通常会一次性就将所有类进行实例化比较耗时,而dubbo在加载类的时候直接通过名称来定位具体的类,按实际需要加载,同时支持加载的路径也更加多,相比于传统jdk的spi加载要效率更高。

看完本文有收获?请转发分享给更多人