dubbo暴露服务与生成代理原理
基本使用(配置方式)
服务端
<!-- 具体的实现bean 一般实际项目中 不会把bean写在dubbo配置中,例如采用注解开发时,通过扫描的方式把bean交给spring管理,这里不需要写,直接在dubbo-service引用就好-->
<bean id="demoService" class="com.unj.dubbotest.provider.DemoServiceImpl" />
<!-- 提供方应用信息,用于计算依赖关系 -->
<dubbo:application name="xixi_provider" />
<!-- 使用multicast广播注册中心暴露服务地址
<dubbo:registry address="multicast://224.5.6.7:1234" />-->
<!-- 使用zookeeper注册中心暴露服务地址 -->
<!--实际项目中使用properties文件的形式定义zookeeper的地址 -->
<-- <dubbo:registry protocol="zookeeper" address="${zookeeper.address}" check="false" file="dubbo.properties" /> -->
<dubbo:registry address="zookeeper://127.0.0.1:2181" />
<!-- 用dubbo协议在20880端口暴露服务 -->
<dubbo:protocol name="dubbo" port="20880" />
<!-- 声明需要暴露的服务接口 version是服务的版本号dubbo只找对应版本号的服务提供者进行调用 timeout 超时时间 超过时间过报错-->
<dubbo:service interface="com.unj.dubbotest.provider.DemoService" ref="demoService" version="1.0" timeout="5000"/>
消费端
<!-- 消费方应用名,用于计算依赖关系,不是匹配条件,不要与提供方一样 -->
<dubbo:application name="hehe_consumer" />
<!-- 使用zookeeper注册中心暴露服务地址 -->
<!-- <dubbo:registry address="${zookeeper.address}" check="false" file="dubbo.properties"> </dubbo:registry> -->
<dubbo:registry address="zookeeper://127.0.0.1:2181" />
<!-- 生成远程服务代理,可以像使用本地bean一样使用demoService -->
<!-- 配置url属性可以直接指定本接口服务的提供地址 方便进行测试 此时通常是dubbo直连的方式,并不需要经过zookeeper注册中心 -->
<dubbo:reference id="demoService"
interface="com.unj.dubbotest.provider.DemoService" url="${study.dubbo.url}" version="1.0" timeout="20000" check="false"/>
核心设计
dubbo的rpc调用的两个核心的流程包含远程服务的暴露,以及消费端的调用引用代理的生成,这两个核心的流程被抽象成它的顶层核心接口 Protocol (协议)
@SPI("dubbo")
public interface Protocol {
/**
* Get default port when user doesn't config the port.
*
* @return default port
*/
int getDefaultPort();
/**
* Export service for remote invocation: <br>
* 1. Protocol should record request source address after receive a request:
* RpcContext.getContext().setRemoteAddress();<br>
* 2. export() must be idempotent, that is, there's no difference between invoking once and invoking twice when
* export the same URL<br>
* 3. Invoker instance is passed in by the framework, protocol needs not to care <br>
*
* @param <T> Service type
* @param invoker Service invoker
* @return exporter reference for exported service, useful for unexport the service later
* @throws RpcException thrown when error occurs during export the service, for example: port is occupied
*/
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
/**
* Refer a remote service: <br>
* 1. When user calls `invoke()` method of `Invoker` object which's returned from `refer()` call, the protocol
* needs to correspondingly execute `invoke()` method of `Invoker` object <br>
* 2. It's protocol's responsibility to implement `Invoker` which's returned from `refer()`. Generally speaking,
* protocol sends remote request in the `Invoker` implementation. <br>
* 3. When there's check=false set in URL, the implementation must not throw exception but try to recover when
* connection fails.
*
* @param <T> Service type
* @param type Service class
* @param url URL address for the remote service
* @return invoker service's local proxy
* @throws RpcException when there's any error while connecting to the service provider
*/
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
/**
* Destroy protocol: <br>
* 1. Cancel all services this protocol exports and refers <br>
* 2. Release all occupied resources, for example: connection, port, etc. <br>
* 3. Protocol can continue to export and refer new service even after it's destroyed.
*/
void destroy();
}
生成远程服务暴露对象
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
生成消费端的服务调用者对象
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
SPI
SPI(service provider interface)是JDK内置的一种服务提供发现机制。目前市面上很多框架都用它来做服务的扩展发现。简单的说,它是一种动态替换发现的机制。
举个简单的例子,我们想在运行时动态给它添加实现,你只需要添加一个实现,然后把新的实现描述给JDK知道就行了。大家耳熟能详的如JDBC,日志框架都有用到
实现一个标准的 SPI 发现机制需要
需要在 classpath 下创建一个目录,该目录命名必须是:META-INF/service
在该目录下创建一个 properties 文件,该文件需要满足以下几个条件 :
a 文件名必须是扩展的接口的全路径名称
b 文件内部描述的是该扩展接口的所有实现类
c 文件的编码格式是 UTF-8
通过 java.util.ServiceLoader 的加载机制来发现
Dubbo中的SPI机制
Dubbo也用了SPI思想,不过没有用JDK的SPI机制,是自己实现的一套SPI机制。在Dubbo的源码中,很多地方会存在下面这样的三种代码,分别是自适应扩展点、指定名称的扩展点、激活扩展点。
例如dubbo中rpc协议接口就是可以通过dubbo的spi机制动态扩展实现的
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
Protocol接口,在运行的时候dubbo会判断一下应该选用这个Protocol接口的哪个实现类来实例化对象。
在META-INF/dubbo/internal/org.apache.dubbo.rpc.Protocol文件中配置文件中可以找到实现类的配置
filter=org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper
mock=org.apache.dubbo.rpc.support.MockProtocol
dubbo=org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol
injvm=org.apache.dubbo.rpc.protocol.injvm.InjvmProtocol
rmi=org.apache.dubbo.rpc.protocol.rmi.RmiProtocol
hessian=org.apache.dubbo.rpc.protocol.hessian.HessianProtocol
http=org.apache.dubbo.rpc.protocol.http.HttpProtocol
org.apache.dubbo.rpc.protocol.webservice.WebServiceProtocol
thrift=org.apache.dubbo.rpc.protocol.thrift.ThriftProtocol
native-thrift=org.apache.dubbo.rpc.protocol.nativethrift.ThriftProtocol
memcached=org.apache.dubbo.rpc.protocol.memcached.MemcachedProtocol
redis=org.apache.dubbo.rpc.protocol.redis.RedisProtocol
rest=org.apache.dubbo.rpc.protocol.rest.RestProtocol
xmlrpc=org.apache.dubbo.xml.rpc.protocol.xmlrpc.XmlRpcProtocol
registry=org.apache.dubbo.registry.integration.RegistryProtocol
qos=org.apache.dubbo.qos.protocol.QosProtocolWrapper
服务暴露过程
ServiceBean ApplicationContextAware(以便使用spring容器对象ApplicationContext) InitializingBean(afterPropertiesSet 在bean属性初始化完成后调用,获取容器中的ProviderConfig对象,调用暴露服务方法export)
ServiceBean afterPropertiesSet
public void afterPropertiesSet() throws Exception {
if (getProvider() == null) {
Map<String, ProviderConfig> providerConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProviderConfig.class, false, false);
if (providerConfigMap != null && providerConfigMap.size() > 0) {
Map<String, ProtocolConfig> protocolConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProtocolConfig.class, false, false);
if (CollectionUtils.isEmptyMap(protocolConfigMap)
&& providerConfigMap.size() > 1) { // backward compatibility
List<ProviderConfig> providerConfigs = new ArrayList<ProviderConfig>();
for (ProviderConfig config : providerConfigMap.values()) {
if (config.isDefault() != null && config.isDefault()) {
providerConfigs.add(config);
}
}
if (!providerConfigs.isEmpty()) {
setProviders(providerConfigs);
}
} else {
ProviderConfig providerConfig = null;
for (ProviderConfig config : providerConfigMap.values()) {
if (config.isDefault() == null || config.isDefault()) {
if (providerConfig != null) {
throw new IllegalStateException("Duplicate provider configs: " + providerConfig + " and " + config);
}
providerConfig = config;
}
}
if (providerConfig != null) {
setProvider(providerConfig);
}
}
}
}
if (getApplication() == null
&& (getProvider() == null || getProvider().getApplication() == null)) {
Map<String, ApplicationConfig> applicationConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ApplicationConfig.class, false, false);
if (applicationConfigMap != null && applicationConfigMap.size() > 0) {
ApplicationConfig applicationConfig = null;
for (ApplicationConfig config : applicationConfigMap.values()) {
if (applicationConfig != null) {
throw new IllegalStateException("Duplicate application configs: " + applicationConfig + " and " + config);
}
applicationConfig = config;
}
if (applicationConfig != null) {
setApplication(applicationConfig);
}
}
}
if (getModule() == null
&& (getProvider() == null || getProvider().getModule() == null)) {
Map<String, ModuleConfig> moduleConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ModuleConfig.class, false, false);
if (moduleConfigMap != null && moduleConfigMap.size() > 0) {
ModuleConfig moduleConfig = null;
for (ModuleConfig config : moduleConfigMap.values()) {
if (config.isDefault() == null || config.isDefault()) {
if (moduleConfig != null) {
throw new IllegalStateException("Duplicate module configs: " + moduleConfig + " and " + config);
}
moduleConfig = config;
}
}
if (moduleConfig != null) {
setModule(moduleConfig);
}
}
}
if (StringUtils.isEmpty(getRegistryIds())) {
if (getApplication() != null && StringUtils.isNotEmpty(getApplication().getRegistryIds())) {
setRegistryIds(getApplication().getRegistryIds());
}
if (getProvider() != null && StringUtils.isNotEmpty(getProvider().getRegistryIds())) {
setRegistryIds(getProvider().getRegistryIds());
}
}
if ((CollectionUtils.isEmpty(getRegistries()))
&& (getProvider() == null || CollectionUtils.isEmpty(getProvider().getRegistries()))
&& (getApplication() == null || CollectionUtils.isEmpty(getApplication().getRegistries()))) {
Map<String, RegistryConfig> registryConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, RegistryConfig.class, false, false);
if (CollectionUtils.isNotEmptyMap(registryConfigMap)) {
List<RegistryConfig> registryConfigs = new ArrayList<>();
if (StringUtils.isNotEmpty(registryIds)) {
Arrays.stream(COMMA_SPLIT_PATTERN.split(registryIds)).forEach(id -> {
if (registryConfigMap.containsKey(id)) {
registryConfigs.add(registryConfigMap.get(id));
}
});
}
if (registryConfigs.isEmpty()) {
for (RegistryConfig config : registryConfigMap.values()) {
if (StringUtils.isEmpty(registryIds) && (config.isDefault() == null || config.isDefault().booleanValue())) {
registryConfigs.add(config);
}
}
}
if (!registryConfigs.isEmpty()) {
super.setRegistries(registryConfigs);
}
}
}
if (getMetadataReportConfig() == null) {
Map<String, MetadataReportConfig> metadataReportConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MetadataReportConfig.class, false, false);
if (metadataReportConfigMap != null && metadataReportConfigMap.size() == 1) {
super.setMetadataReportConfig(metadataReportConfigMap.values().iterator().next());
} else if (metadataReportConfigMap != null && metadataReportConfigMap.size() > 1) {
throw new IllegalStateException("Multiple MetadataReport configs: " + metadataReportConfigMap);
}
}
if (getConfigCenter() == null) {
Map<String, ConfigCenterConfig> configenterMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ConfigCenterConfig.class, false, false);
if (configenterMap != null && configenterMap.size() == 1) {
super.setConfigCenter(configenterMap.values().iterator().next());
} else if (configenterMap != null && configenterMap.size() > 1) {
throw new IllegalStateException("Multiple ConfigCenter found:" + configenterMap);
}
}
if (getMonitor() == null
&& (getProvider() == null || getProvider().getMonitor() == null)
&& (getApplication() == null || getApplication().getMonitor() == null)) {
Map<String, MonitorConfig> monitorConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MonitorConfig.class, false, false);
if (monitorConfigMap != null && monitorConfigMap.size() > 0) {
MonitorConfig monitorConfig = null;
for (MonitorConfig config : monitorConfigMap.values()) {
if (config.isDefault() == null || config.isDefault()) {
if (monitorConfig != null) {
throw new IllegalStateException("Duplicate monitor configs: " + monitorConfig + " and " + config);
}
monitorConfig = config;
}
}
if (monitorConfig != null) {
setMonitor(monitorConfig);
}
}
}
if (getMetrics() == null) {
Map<String, MetricsConfig> metricsConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MetricsConfig.class, false, false);
if (metricsConfigMap != null && metricsConfigMap.size() > 0) {
MetricsConfig metricsConfig = null;
for (MetricsConfig config : metricsConfigMap.values()) {
if (metricsConfig != null) {
throw new IllegalStateException("Duplicate metrics configs: " + metricsConfig + " and " + config);
}
metricsConfig = config;
}
if (metricsConfig != null) {
setMetrics(metricsConfig);
}
}
}
if (StringUtils.isEmpty(getProtocolIds())) {
if (getProvider() != null && StringUtils.isNotEmpty(getProvider().getProtocolIds())) {
setProtocolIds(getProvider().getProtocolIds());
}
}
if (CollectionUtils.isEmpty(getProtocols())
&& (getProvider() == null || CollectionUtils.isEmpty(getProvider().getProtocols()))) {
Map<String, ProtocolConfig> protocolConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProtocolConfig.class, false, false);
if (protocolConfigMap != null && protocolConfigMap.size() > 0) {
List<ProtocolConfig> protocolConfigs = new ArrayList<ProtocolConfig>();
if (StringUtils.isNotEmpty(getProtocolIds())) {
Arrays.stream(COMMA_SPLIT_PATTERN.split(getProtocolIds()))
.forEach(id -> {
if (protocolConfigMap.containsKey(id)) {
protocolConfigs.add(protocolConfigMap.get(id));
}
});
}
if (protocolConfigs.isEmpty()) {
for (ProtocolConfig config : protocolConfigMap.values()) {
if (StringUtils.isEmpty(protocolIds)) {
protocolConfigs.add(config);
}
}
}
if (!protocolConfigs.isEmpty()) {
super.setProtocols(protocolConfigs);
}
}
}
if (StringUtils.isEmpty(getPath())) {
if (StringUtils.isNotEmpty(beanName)
&& StringUtils.isNotEmpty(getInterface())
&& beanName.startsWith(getInterface())) {
setPath(beanName);
}
}
if (!supportedApplicationListener) {
export();
}
}
public void export() {
super.export();
// Publish ServiceBeanExportedEvent
publishExportEvent();
}
ServiceConfig export
public synchronized void export() {
checkAndUpdateSubConfigs();
if (!shouldExport()) {
return;
}
if (shouldDelay()) {
DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
} else {
doExport();
}
}
protected synchronized void doExport() {
if (unexported) {
throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
}
if (exported) {
return;
}
exported = true;
if (StringUtils.isEmpty(path)) {
path = interfaceName;
}
doExportUrls();
}
private void doExportUrls() {
List<URL> registryURLs = loadRegistries(true);
for (ProtocolConfig protocolConfig : protocols) {
String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
ApplicationModel.initProviderModel(pathKey, providerModel);
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
String name = protocolConfig.getName();
if (StringUtils.isEmpty(name)) {
name = DUBBO;
}
Map<String, String> map = new HashMap<String, String>();
map.put(SIDE_KEY, PROVIDER_SIDE);
appendRuntimeParameters(map);
appendParameters(map, metrics);
appendParameters(map, application);
appendParameters(map, module);
// remove 'default.' prefix for configs from ProviderConfig
// appendParameters(map, provider, Constants.DEFAULT_KEY);
appendParameters(map, provider);
appendParameters(map, protocolConfig);
appendParameters(map, this);
if (CollectionUtils.isNotEmpty(methods)) {
for (MethodConfig method : methods) {
appendParameters(map, method, method.getName());
String retryKey = method.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if (Boolean.FALSE.toString().equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
}
}
List<ArgumentConfig> arguments = method.getArguments();
if (CollectionUtils.isNotEmpty(arguments)) {
for (ArgumentConfig argument : arguments) {
// convert argument type
if (argument.getType() != null && argument.getType().length() > 0) {
Method[] methods = interfaceClass.getMethods();
// visit all methods
if (methods != null && methods.length > 0) {
for (int i = 0; i < methods.length; i++) {
String methodName = methods[i].getName();
// target the method, and get its signature
if (methodName.equals(method.getName())) {
Class<?>[] argtypes = methods[i].getParameterTypes();
// one callback in the method
if (argument.getIndex() != -1) {
if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
} else {
// multiple callbacks in the method
for (int j = 0; j < argtypes.length; j++) {
Class<?> argclazz = argtypes[j];
if (argclazz.getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + j);
if (argument.getIndex() != -1 && argument.getIndex() != j) {
throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
}
}
}
}
}
}
} else if (argument.getIndex() != -1) {
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
}
}
}
} // end of methods for
}
if (ProtocolUtils.isGeneric(generic)) {
map.put(GENERIC_KEY, generic);
map.put(METHODS_KEY, ANY_VALUE);
} else {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put(REVISION_KEY, revision);
}
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("No method found in service interface " + interfaceClass.getName());
map.put(METHODS_KEY, ANY_VALUE);
} else {
map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
if (!ConfigUtils.isEmpty(token)) {
if (ConfigUtils.isDefault(token)) {
map.put(TOKEN_KEY, UUID.randomUUID().toString());
} else {
map.put(TOKEN_KEY, token);
}
}
// export service
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = this.findConfigedPorts(protocolConfig, name, map);
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
String scope = url.getParameter(SCOPE_KEY);
// don't export when none is configured
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
// export to local if the config is not remote (export to remote only when config is remote)
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
exportLocal(url);
}
// export to remote if the config is not local (export to local only when config is local)
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
if (CollectionUtils.isNotEmpty(registryURLs)) {
for (URL registryURL : registryURLs) {
//if protocol is only injvm ,not register
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
continue;
}
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
if (url.getParameter(REGISTER_KEY, true)) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
} else {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
}
// For providers, this is used to enable custom proxy to generate invoker
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
/**
* @since 2.7.0
* ServiceData Store
*/
MetadataReportService metadataReportService = null;
if ((metadataReportService = getMetadataReportService()) != null) {
metadataReportService.publishProvider(url);
}
}
}
this.urls.add(url);
}
首先会生成调用者对象 invoker
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
这个 proxyFactory 是 而这个 Invoker 将最终将会用反射调用 ref(实际的service) 的方法
public class JdkProxyFactory extends AbstractProxyFactory {
@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker));
}
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
Method method = proxy.getClass().getMethod(methodName, parameterTypes);
return method.invoke(proxy, arguments);
}
};
}
}
而我们提供给远程的代用是用 protocol.export(wrapperInvoker) 暴露的一个代理对象 而最终的代理对象生成 JdkProxyFactory getProxy
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker));
}
我们看看动态代理的处理器里面都做了啥
public class InvokerInvocationHandler implements InvocationHandler {
private final Invoker<?> invoker;
public InvokerInvocationHandler(Invoker<?> handler) {
this.invoker = handler;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
RpcInvocation invocation;
if (RpcUtils.hasGeneratedFuture(method)) {
Class<?> clazz = method.getDeclaringClass();
String syncMethodName = methodName.substring(0, methodName.length() - Constants.ASYNC_SUFFIX.length());
Method syncMethod = clazz.getMethod(syncMethodName, method.getParameterTypes());
invocation = new RpcInvocation(syncMethod, args);
invocation.setAttachment(Constants.FUTURE_GENERATED_KEY, "true");
invocation.setAttachment(Constants.ASYNC_KEY, "true");
} else {
invocation = new RpcInvocation(method, args);
if (RpcUtils.hasFutureReturnType(method)) {
invocation.setAttachment(Constants.FUTURE_RETURNTYPE_KEY, "true");
invocation.setAttachment(Constants.ASYNC_KEY, "true");
}
}
return invoker.invoke(invocation).recreate();
}
}
最终调用的是传递给他的invoker对象的 invoke 我们上面看到 invoke里面用反射执行了实际service的方法
DubboProtocol export 暴露服务
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
openServer(url);
optimizeSerialization(url);
return exporter;
}
openServer 暴露服务及启动服务,调用底层的remote层
调用端引用代理构建过程
流程与服务端一样
ReferenceBean afterPropertiesSet
public void afterPropertiesSet() throws Exception {
if (applicationContext != null) {
BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ConfigCenterBean.class, false, false);
}
if (getConsumer() == null) {
Map<String, ConsumerConfig> consumerConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ConsumerConfig.class, false, false);
if (consumerConfigMap != null && consumerConfigMap.size() > 0) {
ConsumerConfig consumerConfig = null;
for (ConsumerConfig config : consumerConfigMap.values()) {
if (config.isDefault() == null || config.isDefault()) {
if (consumerConfig != null) {
throw new IllegalStateException("Duplicate consumer configs: " + consumerConfig + " and " + config);
}
consumerConfig = config;
}
}
if (consumerConfig != null) {
setConsumer(consumerConfig);
}
}
}
if (getApplication() == null
&& (getConsumer() == null || getConsumer().getApplication() == null)) {
Map<String, ApplicationConfig> applicationConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ApplicationConfig.class, false, false);
if (applicationConfigMap != null && applicationConfigMap.size() > 0) {
ApplicationConfig applicationConfig = null;
for (ApplicationConfig config : applicationConfigMap.values()) {
if (applicationConfig != null) {
throw new IllegalStateException("Duplicate application configs: " + applicationConfig + " and " + config);
}
applicationConfig = config;
}
if (applicationConfig != null) {
setApplication(applicationConfig);
}
}
}
if (getModule() == null
&& (getConsumer() == null || getConsumer().getModule() == null)) {
Map<String, ModuleConfig> moduleConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ModuleConfig.class, false, false);
if (moduleConfigMap != null && moduleConfigMap.size() > 0) {
ModuleConfig moduleConfig = null;
for (ModuleConfig config : moduleConfigMap.values()) {
if (config.isDefault() == null || config.isDefault()) {
if (moduleConfig != null) {
throw new IllegalStateException("Duplicate module configs: " + moduleConfig + " and " + config);
}
moduleConfig = config;
}
}
if (moduleConfig != null) {
setModule(moduleConfig);
}
}
}
if (StringUtils.isEmpty(getRegistryIds())) {
if (getApplication() != null && StringUtils.isNotEmpty(getApplication().getRegistryIds())) {
setRegistryIds(getApplication().getRegistryIds());
}
if (getConsumer() != null && StringUtils.isNotEmpty(getConsumer().getRegistryIds())) {
setRegistryIds(getConsumer().getRegistryIds());
}
}
if (CollectionUtils.isEmpty(getRegistries())
&& (getConsumer() == null || CollectionUtils.isEmpty(getConsumer().getRegistries()))
&& (getApplication() == null || CollectionUtils.isEmpty(getApplication().getRegistries()))) {
Map<String, RegistryConfig> registryConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, RegistryConfig.class, false, false);
if (registryConfigMap != null && registryConfigMap.size() > 0) {
List<RegistryConfig> registryConfigs = new ArrayList<>();
if (StringUtils.isNotEmpty(registryIds)) {
Arrays.stream(COMMA_SPLIT_PATTERN.split(registryIds)).forEach(id -> {
if (registryConfigMap.containsKey(id)) {
registryConfigs.add(registryConfigMap.get(id));
}
});
}
if (registryConfigs.isEmpty()) {
for (RegistryConfig config : registryConfigMap.values()) {
if (StringUtils.isEmpty(registryIds)) {
registryConfigs.add(config);
}
}
}
if (!registryConfigs.isEmpty()) {
super.setRegistries(registryConfigs);
}
}
}
if (getMetadataReportConfig() == null) {
Map<String, MetadataReportConfig> metadataReportConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MetadataReportConfig.class, false, false);
if (metadataReportConfigMap != null && metadataReportConfigMap.size() == 1) {
// first elements
super.setMetadataReportConfig(metadataReportConfigMap.values().iterator().next());
} else if (metadataReportConfigMap != null && metadataReportConfigMap.size() > 1) {
throw new IllegalStateException("Multiple MetadataReport configs: " + metadataReportConfigMap);
}
}
if (getConfigCenter() == null) {
Map<String, ConfigCenterConfig> configenterMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ConfigCenterConfig.class, false, false);
if (configenterMap != null && configenterMap.size() == 1) {
super.setConfigCenter(configenterMap.values().iterator().next());
} else if (configenterMap != null && configenterMap.size() > 1) {
throw new IllegalStateException("Multiple ConfigCenter found:" + configenterMap);
}
}
if (getMonitor() == null
&& (getConsumer() == null || getConsumer().getMonitor() == null)
&& (getApplication() == null || getApplication().getMonitor() == null)) {
Map<String, MonitorConfig> monitorConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MonitorConfig.class, false, false);
if (monitorConfigMap != null && monitorConfigMap.size() > 0) {
MonitorConfig monitorConfig = null;
for (MonitorConfig config : monitorConfigMap.values()) {
if (config.isDefault() == null || config.isDefault()) {
if (monitorConfig != null) {
throw new IllegalStateException("Duplicate monitor configs: " + monitorConfig + " and " + config);
}
monitorConfig = config;
}
}
if (monitorConfig != null) {
setMonitor(monitorConfig);
}
}
}
if (getMetrics() == null) {
Map<String, MetricsConfig> metricsConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MetricsConfig.class, false, false);
if (metricsConfigMap != null && metricsConfigMap.size() > 0) {
MetricsConfig metricsConfig = null;
for (MetricsConfig config : metricsConfigMap.values()) {
if (metricsConfig != null) {
throw new IllegalStateException("Duplicate metrics configs: " + metricsConfig + " and " + config);
}
metricsConfig = config;
}
if (metricsConfig != null) {
setMetrics(metricsConfig);
}
}
}
if (shouldInit()) {
getObject();
}
}
public Object getObject() {
return get();
}
ReferenceConfig get
public synchronized T get() {
checkAndUpdateSubConfigs();
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
}
if (ref == null) {
init();
}
return ref;
}
ReferenceConfig init
private void init() {
if (initialized) {
return;
}
checkStubAndLocal(interfaceClass);
checkMock(interfaceClass);
Map<String, String> map = new HashMap<String, String>();
map.put(SIDE_KEY, CONSUMER_SIDE);
appendRuntimeParameters(map);
if (!ProtocolUtils.isGeneric(getGeneric())) {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put(REVISION_KEY, revision);
}
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("No method found in service interface " + interfaceClass.getName());
map.put(METHODS_KEY, ANY_VALUE);
} else {
map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), COMMA_SEPARATOR));
}
}
map.put(INTERFACE_KEY, interfaceName);
appendParameters(map, metrics);
appendParameters(map, application);
appendParameters(map, module);
// remove 'default.' prefix for configs from ConsumerConfig
// appendParameters(map, consumer, Constants.DEFAULT_KEY);
appendParameters(map, consumer);
appendParameters(map, this);
Map<String, Object> attributes = null;
if (CollectionUtils.isNotEmpty(methods)) {
attributes = new HashMap<String, Object>();
for (MethodConfig methodConfig : methods) {
appendParameters(map, methodConfig, methodConfig.getName());
String retryKey = methodConfig.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(methodConfig.getName() + ".retries", "0");
}
}
attributes.put(methodConfig.getName(), convertMethodConfig2AsyncInfo(methodConfig));
}
}
String hostToRegistry = ConfigUtils.getSystemProperty(DUBBO_IP_TO_REGISTRY);
if (StringUtils.isEmpty(hostToRegistry)) {
hostToRegistry = NetUtils.getLocalHost();
} else if (isInvalidLocalHost(hostToRegistry)) {
throw new IllegalArgumentException("Specified invalid registry ip from property:" + DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
}
map.put(REGISTER_IP_KEY, hostToRegistry);
ref = createProxy(map);
String serviceKey = URL.buildKey(interfaceName, group, version);
ApplicationModel.initConsumerModel(serviceKey, buildConsumerModel(serviceKey, attributes));
initialized = true;
}
createProxy
private T createProxy(Map<String, String> map) {
if (shouldJvmRefer(map)) {
URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
invoker = REF_PROTOCOL.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
} else {
urls.clear(); // reference retry init will add url to urls, lead to OOM
if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (StringUtils.isEmpty(url.getPath())) {
url = url.setPath(interfaceName);
}
if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
} else {
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else { // assemble URL from register center's configuration
// if protocols not injvm checkRegistry
if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())){
checkRegistry();
List<URL> us = loadRegistries(false);
if (CollectionUtils.isNotEmpty(us)) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
}
if (urls.size() == 1) {
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) { // registry url is available
// use RegistryAwareCluster only when register's CLUSTER is available
URL u = registryURL.addParameter(CLUSTER_KEY, RegistryAwareCluster.NAME);
// The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
invoker = CLUSTER.join(new StaticDirectory(u, invokers));
} else { // not a registry url, must be direct invoke.
invoker = CLUSTER.join(new StaticDirectory(invokers));
}
}
}
if (shouldCheck() && !invoker.isAvailable()) {
throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
}
/**
* @since 2.7.0
* ServiceData Store
*/
MetadataReportService metadataReportService = null;
if ((metadataReportService = getMetadataReportService()) != null) {
URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
metadataReportService.publishConsumer(consumerURL);
}
// create service proxy
return (T) PROXY_FACTORY.getProxy(invoker);
}
通过配置的代理对象生成工厂对象生成代理,抽象实现 AbstractProxyFactory jdk动态代理方式实现 JdkProxyFactory
/**
* A {@link ProxyFactory} implementation that will generate a reference service's proxy,the JavassistProxyFactory is
* its default implementation
*/
private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
AbstractProxyFactory
@Override
public <T> T getProxy(Invoker<T> invoker) throws RpcException {
return getProxy(invoker, false);
}
@Override
public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
Class<?>[] interfaces = null;
String config = invoker.getUrl().getParameter(INTERFACES);
if (config != null && config.length() > 0) {
String[] types = COMMA_SPLIT_PATTERN.split(config);
if (types != null && types.length > 0) {
interfaces = new Class<?>[types.length + 2];
interfaces[0] = invoker.getInterface();
interfaces[1] = EchoService.class;
for (int i = 0; i < types.length; i++) {
// TODO can we load successfully for a different classloader?.
interfaces[i + 2] = ReflectUtils.forName(types[i]);
}
}
}
if (interfaces == null) {
interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};
}
if (!GenericService.class.isAssignableFrom(invoker.getInterface()) && generic) {
int len = interfaces.length;
Class<?>[] temp = interfaces;
interfaces = new Class<?>[len + 1];
System.arraycopy(temp, 0, interfaces, 0, len);
interfaces[len] = com.alibaba.dubbo.rpc.service.GenericService.class;
}
return getProxy(invoker, interfaces);
}
JdkProxyFactory
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker));
}
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
Method method = proxy.getClass().getMethod(methodName, parameterTypes);
return method.invoke(proxy, arguments);
}
};
}