dubbo 源码 v2.7 分析:核心机制(一)
系列文章
dubbo 源码 v2.7 分析:结构、container 入口及线程模型
dubbo 源码 v2.7 分析:SPI 机制
一 回顾
上一篇我们介绍了SPI机制。本篇会先介绍dubbo中的核心机制,包括设计模式、bean加载、扩展点机制、动态代理和远程调用流程。
二 设计模式
2.1 装饰器&责任链模式
2.1.1 装饰器模式
装饰器模式(Decorator Pattern)允许向一个现有的对象添加新的功能,同时又不改变其结构。这种类型的设计模式属于结构型模式,它是作为现有的类的一个包装。
这种模式创建了一个装饰类,用来包装原有的类,并在保持类方法签名完整性的前提下,提供了额外的功能。
2.1.2 责任链模式
责任链模式(Chain of Responsibility Pattern)为请求创建了一个接收者对象的链。这种模式给予请求的类型,对请求的发送者和接收者进行解耦。这种类型的设计模式属于行为型模式。
在这种模式中,通常每个接收者都包含对另一个接收者的引用。如果一个对象不能处理该请求,那么它会把相同的请求传给下一个接收者,依此类推。
2.1.3 Dubbo中的使用
Dubbo的源码中,在启动和调用阶段都使用了装饰器。例如生产者(Provider)的调用链,具体的调用过程是在org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper类的方法buildInvokerChain()内完成的。相关代码:
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
public Class<T> getInterface() {
return invoker.getInterface();
}
public URL getUrl() {
return invoker.getUrl();
}
public boolean isAvailable() {
return invoker.isAvailable();
}
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
asyncResult = filter.invoke(next, invocation);
} catch (Exception e) {
// onError callback
if (filter instanceof ListenableFilter) {
Filter.Listener listener = ((ListenableFilter) filter).listener();
if (listener != null) {
listener.onError(e, invoker, invocation);
}
}
throw e;
}
return asyncResult;
}
public void destroy() {
invoker.destroy();
}
public String toString() {
return invoker.toString();
}
};
}
}
return new CallbackRegistrationInvoker<>(last, filters);
}
在这个方法的第3行(源码中的第55行),通过SPI机制获取Filter列表,从rpc.filter目录下可以看到Filter的实现类集合。每个Filter的实现类,都带有@Activate注解,将注解中含有 group=provider的Filter实现,按照order值的顺序排序,构成了一个调用链条。调用顺序为:
以EchoFilter为例,org.apache.dubbo.rpc.filter.EchoFilter的源码:
110000) (group = CommonConstants.PROVIDER, order = -
public class EchoFilter implements Filter {
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
if (inv.getMethodName().equals($ECHO) && inv.getArguments() != null && inv.getArguments().length == 1) {
return AsyncRpcResult.newDefaultAsyncResult(inv.getArguments()[0], inv);
}
return invoker.invoke(inv);
}
}
这里是装饰器和责任链模式的混合使用。例如,EchoFilter 的作用是判断
是否是回声测试请求,是的话直接返回内容,这是一种责任链的体现。而像 ClassLoaderFilter则只是在主功能上添加了功能,更改当前线程的 ClassLoader,这是典型的装饰器模式。
2.2 观察者模式
2.2.1 定义
当对象间存在一对多关系时,则使用观察者模式(Observer Pattern)。比如,当一个对象被修改时,则会自动通知依赖它的对象。观察者模式属于行为型模式。
2.2.2 Dubbo中的使用
Provider启动时,需要与注册中心进行交互,先注册自己的服务,再订阅自己的服务。而在订阅时,使用了观察者模式,开启一个Listener。注册中心会做定时扫描,默认每5秒检查一次是否有服务更新,如果有,就会向该服务的提供者发送一个notify消息。Provider接收到notify消息后,就会执行NotifyListener的notify方法,执行监听器方法。
2.3 代理模式——动态代理
2.3.1 定义
在代理模式(Proxy Pattern)中,一个类代表另一个类的功能。这种类型的设计模式属于结构型模式。
在代理模式中,我们创建具有现有对象的对象,以便向外界提供功能接口。
2.3.2 Dubbo中的使用
Dubbo扩展了JDK标准SPI类的Adaptive,也就是自适应扩展点,这是一个典型的动态代理实现。Dubbo
需要灵活地控制实现类,即在调用阶段动态地根据参数决定调用哪个实现类,所以采用先生成代理类的方法,能够做到灵活的调用。生成代理类的代码是 ExtensionLoader 的createAdaptiveExtensionClassCode 方法。代理类的主要逻辑是,获取 URL 参数中指定参数的值作为获取实现类的 key。
2.2.3 源码
Adaptive注解:
AdaptiveCompiler是使用Adaptive注解的示例:
public class AdaptiveCompiler implements Compiler {
private static volatile String DEFAULT_COMPILER;
public static void setDefaultCompiler(String compiler) {
DEFAULT_COMPILER = compiler;
}
public Class<?> compile(String code, ClassLoader classLoader) {
Compiler compiler;
ExtensionLoader<Compiler> loader = ExtensionLoader.getExtensionLoader(Compiler.class);
String name = DEFAULT_COMPILER; // copy reference
if (name != null && name.length() > 0) {
compiler = loader.getExtension(name);
} else {
compiler = loader.getDefaultExtension();
}
return compiler.compile(code, classLoader);
}
}
Adaptive注解是一个自适应 扩展点的标识。它可以修饰在类上,也可以修饰在方法上面。
修饰类和方法上的区别:放在类上,说明当前类是一个确定的自适应扩展点的类。如果是放在方法级别,那么需要生成一个动态字节码来进行转发。
例如Protocol 接口来说,定义了 export 和 refer 两个抽象方法,这两个方法分别带有@Adaptive 的标识,标识是 一个自适应方法。我们知道 Protocol 是一个通信协议的接口,具体有多种实现,那么这个时候选择哪一种呢?取决于我们在使用 dubbo 的时候所 配置的协议名称。而这里的方法层面的 Adaptive 就决定了当前这个方法会采用何种协议来发布服务。
org.apache.dubbo.rpc.Protocol源码:
/**
* Protocol. (API/SPI, Singleton, ThreadSafe)
*/
"dubbo") (
public interface Protocol {
/**
* Get default port when user doesn't config the port.
*
* @return default port
*/
int getDefaultPort();
/**
* Export service for remote invocation: <br>
* 1. Protocol should record request source address after receive a request:
* RpcContext.getContext().setRemoteAddress();<br>
* 2. export() must be idempotent, that is, there's no difference between invoking once and invoking twice when
* export the same URL<br>
* 3. Invoker instance is passed in by the framework, protocol needs not to care <br>
*
* @param <T> Service type
* @param invoker Service invoker
* @return exporter reference for exported service, useful for unexport the service later
* @throws RpcException thrown when error occurs during export the service, for example: port is occupied
*/
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
/**
* Refer a remote service: <br>
* 1. When user calls `invoke()` method of `Invoker` object which's returned from `refer()` call, the protocol
* needs to correspondingly execute `invoke()` method of `Invoker` object <br>
* 2. It's protocol's responsibility to implement `Invoker` which's returned from `refer()`. Generally speaking,
* protocol sends remote request in the `Invoker` implementation. <br>
* 3. When there's check=false set in URL, the implementation must not throw exception but try to recover when
* connection fails.
*
* @param <T> Service type
* @param type Service class
* @param url URL address for the remote service
* @return invoker service's local proxy
* @throws RpcException when there's any error while connecting to the service provider
*/
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
/**
* Destroy protocol: <br>
* 1. Cancel all services this protocol exports and refers <br>
* 2. Release all occupied resources, for example: connection, port, etc. <br>
* 3. Protocol can continue to export and refer new service even after it's destroyed.
*/
void destroy();
}