【第八弹】详解分布式中间件Dubbo
6. 服务暴露(注册)源码剖析
服务暴露(注册)流程:
开始服务提供者会进行初始化,将暴露给其他服务调用,如下图:
服务消费者也需要初始化,并且在注册中心注册自己,如下图:
服务提供者暴露的主要过程:
(1)将服务转化成 Invoker: 在服务提供者初始化的时候,会通过Config组件中的ServiceConfig 读取服务的配置信息,在读取配置文件生成服务实体以后,会通过ProxyFactory将Proxy转换成 Invoker。
(2)将 Invoker转化成Exporter: Invoker会被定义Protocol,然后会被包装成Exporter,最后Exporter会发送到注册中心,作为服务的注册信息。
现以dubbo-demo工程中的dubbo-demo-xml项目进行源码分析。
服务暴露源码剖析过程:
服务提供端执行org.apache.dubbo.demo.provider.Application启动后,服务提供者进行初始化,Dubbo是基于Spring的Schema进行扩展和加载的,如下:
这里应用了Spring的自定义标签功能,定义了dubbo标签,然后声明xsd的位置,我们进入xsd文件如下:
dubbo的xsd文件在dubbo-config-spring项目的META-INF下面,注意还有个spring.handlers,这个是用来解析文件标签的,进入到这个文件发现里面只有一个DubboNamespaceHandler,如下:
进入到这个方法如下:
首先看到DubboNamespaceHandler继承了类NamespaceHandlerSupport,对NamespaceHandlerSupport可以注册任意个BeanDefinitionParser,而解析XML的工作委托给各个BeanDefinitionParser负责。spring在扫描并加载BeanDefinition的时候会执行到这里,根据dubbo配置文件生成的BeanDefinition此刻交由spring管理。因为现在看的是服务端,所以这里我们主要看service对应的ServiceBean。
可以看到ServiceBean继承了ServiceConfig,如下:
在spring容器加载完成后触发contextrefreshedevent事件,这个事件会被实现了ApplicationListener接口的类监听到,执行对应的onApplicationEvent函数。这个方法里面核心只有这个export方法。接下来在ServiceConfig.java中的491行,可以看到ProxyFactory通过getInvoker方法获得了invoker实例,然后对invoker实例进行了包装,接下来将包装好的invoker实例转化成exporter实例。整个过程如下:
=>org.apache.dubbo.config.spring.context.DubboBootstrapApplicationListener#onContextRefreshedEvent
==> org.apache.dubbo.config.bootstrap.DubboBootstrap#start
==> org.apache.dubbo.config.bootstrap.DubboBootstrap#exportServices
==> org.apache.dubbo.config.ServiceConfig#export
==> org.apache.dubbo.config.ServiceConfig#doExport
==> org.apache.dubbo.config.ServiceConfig#doExportUrls
==> org.apache.dubbo.config.ServiceConfig#doExportUrlsFor1Protocol
==> java.lang.String#equalsIgnoreCase(local)
==> org.apache.dubbo.rpc.ProxyFactory#getInvoker
==> org.apache.dubbo.rpc.Protocol#export
==> org.apache.dubbo.registry.integration.RegistryProtocol#export
==> org.apache.dubbo.registry.integration.RegistryProtocol#register
==> org.apache.dubbo.registry.RegistryFactory#getRegistry
==> org.apache.dubbo.registry.RegistryService#register
==> org.apache.dubbo.registry.ListenerRegistryWrapper#register
==> org.apache.dubbo.registry.support.FailbackRegistry#register
==> org.apache.dubbo.registry.support.FailbackRegistry#doRegister
==> org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doRegister
==> org.apache.dubbo.remoting.zookeeper.ZookeeperClient#create
==> java.util.List#add(E)
服务暴露(注册)源码级流程图:
总结:
第一阶段:service转化为Invoker,服务容器启动初始化过程中,会将service服务对应的ServiceBean信息初始化加载,Dubbo的启动器监听会启动服务容器,其中一项工作就是暴露服务,ServiceConfig组件会执行export方法暴露服务,这里会判断是本地暴露还是远程暴露,然后ProxyFactory接口的实现类会调用getInvoker方法使用 ref(对外提供服务的实际类) 生成一个AbstractProxyInvoker实例,并对其进行包装得到wrapperInvoker;
第二阶段:Invoker转化为Exporter,注册中心协议管理类执行export方法对包装的Invoker进行暴露,若是远程暴露,先通过注册中心工厂类RegistryFactory获得对应协议的注册类Register,进而调用其register方法进行注册,若使用Zookeeper注册中心,注册过程将由ZookeeperRegistry执行doRegister方法完成,即创建Zookeeper数据节点保存注册服务的信息,最终将需要暴露的Exporter加入到列表中,整个暴露过程完成。
7. 服务引用(消费)源码剖析
服务消费流程:服务消费者首先持有远程服务实例生成的 Invoker,然后把 Invoker 转换成用户接口的动态代理引用。
服务消费者消费初始化过程:
(1)把远端服务转化为Invoker: ReferenceConfig 类的init方法调用createProxy() ,期间使用 Protocol调用refer方法生成Invoker 实例;
(2)把Invoker转化为客户端需要的接口: 即使用ProxyFactory把Invoker转换为客户端需要的接口(如HelloService)
服务消费源码剖析过程:
服务消费端执行org.apache.dubbo.demo.consumer.Application启动后,服务消费者进行初始化,Dubbo是基于Spring的Schema进行扩展和加载的,如下:
这里应用了Spring的自定义标签功能,定义了dubbo标签,然后声明xsd的位置,我们进入xsd文件如下:
dubbo的xsd文件在dubbo-config-spring项目的META-INF下面,注意还有个spring.handlers,这个是用来解析文件标签的,进入到这个文件发现里面只有一个DubboNamespaceHandler,如下:
进入到这个方法如下:
首先看到DubboNamespaceHandler继承了类NamespaceHandlerSupport,对NamespaceHandlerSupport可以注册任意个BeanDefinitionParser,而解析XML的工作委托给各个BeanDefinitionParser负责。spring在扫描并加载BeanDefinition的时候会执行到这里,根据dubbo配置文件生成的BeanDefinition此刻交由spring管理。因为现在看的是服务端,所以这里我们主要看reference对应的ReferenceBean。
可以看到ReferenceBean继承了ReferenceConfig,如下:
在spring容器加载完成后触发contextrefreshedevent事件,这个事件会被实现了ApplicationListener接口的类监听到,执行对应的onApplicationEvent函数。接下来执行ReferenceConfig.java中225行的init方法,整个过程如下:
==> org.apache.dubbo.config.ReferenceConfig#init
==> org.apache.dubbo.config.ReferenceConfig#createProxy
==> org.apache.dubbo.config.ReferenceConfig#shouldJvmRefer(本地引用)
==> org.apache.dubbo.rpc.Protocol#refer(远程引用)
==> org.apache.dubbo.registry.integration.RegistryProtocol#refer
==> org.apache.dubbo.registry.integration.RegistryProtocol#doRefer
==> org.apache.dubbo.rpc.cluster.Cluster#getCluster(java.lang.String, boolean)
==> org.apache.dubbo.rpc.ProxyFactory#getProxy(org.apache.dubbo.rpc.Invoker, boolean)
服务消费源码级流程图:
第一阶段:reference转化为Invoker,服务消费容器启动初始化过程中,会将reference服务对应的ReferenceBean信息初始化加载,进而调用ReferenceConfig 类的init方法中的createProxy() 方法,这里会判断是本地引用还是远程引用,对于远程引用,如果url只存在一个,那么直接用Protocol的refer()进行转换,如果存在多个url,会先通过urls获取所有invoker,然后根据urls中是否存在registry协议的url,做不同的集群调用,最终获得消费者需要的Invoker实例;
第二阶段:Invoker转化为interface,上面已经得到Invoker实例,ProxyFactory会通过getProxy方法将Invoker进行转化,实际是交由其实现类AbstractProxyFactory的getProxy方法进行处理,然后由其子类JdkProxyFactory的getProxy方法执行,到此就创建出接口的动态代理对象,然后用InvokerInvocationHandler调用invoke方法执行,最后用recreate方法用来将result转换为接口实际需要的类型返回后将代理放到spring容器中,这样用起来就像本地调用一样,依赖注入的时候主动进行初始化,整个服务消费的初始化完成。
8. 集群容错、负载均衡、路由服务源码剖析
智能容错、负载均衡及路由服务执行流程,如下图:
集群容错的所有组件,包含集群容错组件Cluster、集群容错调用者组件Cluster Invoker、信息缓存组件Directory、路由服务组件Router 和 负载均衡组件LoadBalance 等
智能容错、负载均衡及路由服务执行流程核心过程:
(1)缓存服务信息: Cluster组件调用信息缓存接口Directory缓存可调用的服务列表;
(2)用路由规则过滤服务信息: 信息缓存接口Directory调用路由组件Router根据路由规则进行过滤;
(3)服务调用进行负载均衡: 通过负载均衡组件LoadBalance对过滤后的服务进行选择负载调用。
源码剖析过程:
FailoverClusterInvoker的doInvoke重试机制和负载均衡源码如下:==>org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker#doInvoke
在FailoverClusterInvoker的doInvoke方法中,首先checkInvokers进行检查服务是否可用情况,然后获取重试次数,如果重试次数配置不合理就给个默认值1,然后RpcException le为成功调用服务后的最后一次调用失败的异常对象,如果没有调用失败的情况,则该对象一直为null,然后for循环遍历len次进行重试调用,在调用的过程中,重试时,进行重新选择,避免重试时invoker列表已发生变化,如果列表发生了变化,那么invoked判断会失效,因为invoker实例已经改变。
然后select执行的是负载均衡的逻辑,选出一个invoker,调用目标 Invoker 的 invoke 方法,如果调用成功,直接返回result,反之,如果捕获到异常,如果是业务逻辑层异常,捕获到即退出重试,向上抛出返回给客户端,如果不是业务逻辑层异常,就赋值给len。最终invoker放进providers中,开始下一次重试。如果最终len次重试后均调用失败,则直接抛出异常信息。
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class);
public FailoverClusterInvoker(Directory<T> directory) {
super(directory);
}
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyInvokers = invokers;
checkInvokers(copyInvokers, invocation);
String methodName = RpcUtils.getMethodName(invocation);
//获取重试次数,默认重试次数DEFAULT_RETRIES为2
int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
//循环调用,失败重试
//成功调用之后的最后一次失败调用的异常
RpcException le = null; // last exception.
//被调用的调用者
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
//重试时,进行重新选择,避免重试时invoker列表已发生变化.
//注意:如果列表发生了变化,那么invoked判断会失效,因为invoker实例已经改变
if (i > 0) {
checkWhetherDestroyed();
//在进行重试前重新列举 Invoker,这样做的好处是,如果某个服务挂了,
//通过调用 list 可得到最新可用的 Invoker 列表
copyInvokers = list(invocation);
checkInvokers(copyInvokers, invocation);
}
//开始执行负载均衡逻辑
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
//添加到 invoker 到 invoked 列表中
invoked.add(invoker);
//设置 invoked 到 RPC 上下文中
RpcContext.getContext().setInvokers((List) invoked);
try {
//调用目标 Invoker 的 invoke 方法
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
logger.warn("Although retry the method " + methodName
+ " in the service " + getInterface().getName()
+ " was successful by the provider " + invoker.getUrl().getAddress()
+ ", but there have been failed providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion() + ". Last error is: "
+ le.getMessage(), le);
}
return result;
} catch (RpcException e) {
//如果是业务逻辑层异常,捕获到即退出重试
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
//若重试失败,则抛出异常
throw new RpcException(le.getCode(), "Failed to invoke the method "
+ methodName + " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
+ Version.getVersion() + ". Last error is: "
+ le.getMessage(), le.getCause() != null ? le.getCause() : le);
入骨相思知不知
玲珑骰子安红豆
入我相思门,知我相思苦,长相思兮长相忆,短相思兮无穷极。