vlambda博客
学习文章列表

阿里面试官:你读过Dubbo的源码,给我说说服务之间是怎么进行消费的?

# 前言


本文接着分析 Dubbo 服务的消费流程。主要从以下几个方面进行分析:注册中心的暴露;通过注册中心进行服务消费通知;直连服务进行消费。服务消费端启动时,将自身的信息注册到注册中心的目录,同时还订阅服务提供方的目录,当服务提供方的 URL 发生更改时,实时获取新的数据。


# 服务消费端流程


下面是一个服务消费的流程图:


阿里面试官:你读过Dubbo的源码,给我说说服务之间是怎么进行消费的?


上图中可以看到,服务消费的流程与服务暴露的流程有点类似逆向的。同样,Dubbo 服务也是分为两个大步骤:第一步就是将远程服务通过 Protocol转换成 Invoker(概念在上篇文章中有解释)。第二步通过动态代理将 Invoker转换成消费服务需要的接口。


org.apache.dubbo.config.ReferenceConfig类是 ReferenceBean的父类,与生产端服务的 ServiceBean一样,存放解析出来的 XML 和注解信息。类关系如下:


阿里面试官:你读过Dubbo的源码,给我说说服务之间是怎么进行消费的?


# 服务初始化中转换的入口


当我们消费端调用本地接口就能实现远程服务的调用,这是怎么实现的呢?根据上面的流程图,来分析消费原理。在消费端进行初始化时 ReferenceConfig#init,会执行 ReferenceConfig#createProxy来完成这一系列操作。以下为 ReferenceConfig#createProxy主要的代码部分:

 
   
   
 
private T createProxy(Map<String, String> map) {// 判断是否为 Jvm 本地引用if (shouldJvmRefer(map)) {// 通过 injvm 协议,获取本地服务 URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map); invoker = REF_PROTOCOL.refer(interfaceClass, url);} else { urls.clear();// 判断是否有自定义的直连地址,或注册中心地址if (url != null && url.length() > 0) { String[] us = SEMICOLON_SPLIT_PATTERN.split(url);if (us != null && us.length > 0) {for (String u : us) { URL url = URL.valueOf(u);if (StringUtils.isEmpty(url.getPath())) { url = url.setPath(interfaceName);}if (UrlUtils.isRegistry(url)) {// 如果是注册中心Protocol类型,则向地址中添加 refer 服务消费元数据 urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));} else {// 直连服务提供端 urls.add(ClusterUtils.mergeUrl(url, map));}}}} else {// 组装注册中心的配置if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {// 检查配置中心 checkRegistry();List<URL> us = ConfigValidationUtils.loadRegistries(this, false);if (CollectionUtils.isNotEmpty(us)) {for (URL u : us) { URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);if (monitorUrl != null) {// 监控上报信息 map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));}// 注册中心地址添加 refer 服务消费元数据 urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));}}}}

// 只有一条注册中心数据,即单注册中心if (urls.size() == 1) {// 将远程服务转化成 Invoker invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));} else {// 因为多注册中心就会存在多个 Invoker,这里用保存在 List 中List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); URL registryURL = null;for (URL url : urls) {// 将每个注册中心转换成 Invoker 数据 invokers.add(REF_PROTOCOL.refer(interfaceClass, url));if (UrlUtils.isRegistry(url)) {// 会覆盖前遍历的注册中心,使用最后一条注册中心数据 registryURL = url;}}if (registryURL != null) {// 默认使用 zone-aware 策略来处理多个订阅 URL u = registryURL.addParameterIfAbsent(CLUSTER_KEY, ZoneAwareCluster.NAME);// 将转换后的多个 Invoker 合并成一个 invoker = CLUSTER.join(new StaticDirectory(u, invokers));} else { invoker = CLUSTER.join(new StaticDirectory(invokers));}}}// 利用动态代理,将 Invoker 转换成本地接口代理return (T) PROXY_FACTORY.getProxy(invoker);}


上面转换的过程中,主要可概括为:先分为本地引用和远程引用两类。本地就是以 inJvm 协议的获取本地服务,这不做过多说明;远程引用分为直连服务和通过注册中心。注册中心分为单注册中心和多注册中心的情况,单注册中心好解决,直接使用即可,多注册中心时,将转换后的 Invoker 合并成一个 Invoker。最后通过动态代理将 Invoker 转换成本地接口代理。


# 获取 Invoker 实例


由于本地服务时直接从缓存中获取,这里就注册中心的消费进行分析,上面代码片段中使用的是 REF_PROTOCOL.refer进行转换,该方法代码:

 
   
   
 
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {// 获取服务的注册中心url,里面会设置注册中心的协议和移除 registry 的参数 url = getRegistryUrl(url);// 获取注册中心实例Registry registry = registryFactory.getRegistry(url);if (RegistryService.class.equals(type)) {return proxyFactory.getInvoker((T) registry, type, url);}

// 获取服务消费元数据Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));// 从服务消费元数据中获取分组信息String group = qs.get(GROUP_KEY);if (group != null && group.length() > 0) {if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {// 执行 Invoker 转换工作return doRefer(getMergeableCluster(), registry, type, url);}}// 执行 Invoker 转换工作return doRefer(cluster, registry, type, url);}


上面主要是获取服务消费的注册中心实例和进行服务分组,最后调用 doRefer方法进行转换工作,以下为 doRefer的代码:

 
   
   
 
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {// 创建 RegistryDirectory 对象RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);// 设置注册中心 directory.setRegistry(registry);// 设置协议 directory.setProtocol(protocol);// directory.getUrl().getParameters() 是服务消费元数据Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) { directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));// 消费消息注册到注册中心 registry.register(directory.getRegisteredConsumerUrl());}

directory.buildRouterChain(subscribeUrl);// 服务消费者订阅:服务提供端,动态配置,路由的通知 directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY, PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));

// 多个Invoker合并为一个Invoker invoker = cluster.join(directory);return invoker;}


上面实现主要是完成创建 RegistryDirectory 对象,将消费服务元数据注册到注册中心,通过 RegistryDirectory 对象里的信息,实现服务提供端,动态配置及路由的订阅相关功能。


RegistryDirectory 这个类实现了 NotifyListener 这个通知监听接口,当订阅的服务,配置或路由发生变化时,会接收到通知,进行相应改变:

 
   
   
 
public synchronized void notify(List<URL> urls) {// 将服务提供方配置,路由配置,服务提供方的服务分别以不同的 key 保存在 Map 中Map<String, List<URL>> categoryUrls = urls.stream().filter(Objects::nonNull).filter(this::isValidCategory).filter(this::isNotCompatibleFor26x).collect(Collectors.groupingBy(url -> {if (UrlUtils.isConfigurator(url)) {return CONFIGURATORS_CATEGORY;} else if (UrlUtils.isRoute(url)) {return ROUTERS_CATEGORY;} else if (UrlUtils.isProvider(url)) {return PROVIDERS_CATEGORY;}return "";}));

// 更新服务提供方配置List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);

// 更新路由配置List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList()); toRouters(routerURLs).ifPresent(this::addRouters);

// 加载服务提供方的服务信息List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());/** * 3.x added for extend URL address */ExtensionLoader<AddressListener> addressListenerExtensionLoader = ExtensionLoader.getExtensionLoader(AddressListener.class);List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);if (supportedListeners != null && !supportedListeners.isEmpty()) {for (AddressListener addressListener : supportedListeners) { providerURLs = addressListener.notify(providerURLs, getUrl(),this);}}// 重新加载 Invoker 实例 refreshOverrideAndInvoker(providerURLs);}


RegistryDirectory#notify里面最后会刷新 Invoker 进行重新加载,下面是核心代码的实现:

 
   
   
 
private void refreshOverrideAndInvoker(List<URL> urls) {// mock zookeeper://xxx?mock=return null overrideDirectoryUrl();// 刷新 invoker refreshInvoker(urls);}

private void refreshInvoker(List<URL> invokerUrls) {Assert.notNull(invokerUrls, "invokerUrls should not be null");

if (invokerUrls.size() == 1&& invokerUrls.get(0) != null&& EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {

......

} else {// 刷新之前的 InvokerMap<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference// 加载新的 Invoker MapMap<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map// 获取新的 InvokersList<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));// 缓存新的 Invokers routerChain.setInvokers(newInvokers);this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;this.urlInvokerMap = newUrlInvokerMap;

try {// 通过新旧 Invokers 对比,销毁无用的 Invokers destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker} catch (Exception e) { logger.warn("destroyUnusedInvokers error. ", e);}}}


获取刷新前后的 Invokers,将新的 Invokers 重新缓存起来,通过对比,销毁无用的 Invoker。


上面将 URL 转换 Invoker 是在 RegistryDirectory#toInvokers中进行。

 
   
   
 
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();

Set<String> keys = new HashSet<>();String queryProtocols = this.queryMap.get(PROTOCOL_KEY);for (URL providerUrl : urls) {

// 过滤消费端不匹配的协议,及非法协议......

// 合并服务提供端配置数据 URL url = mergeUrl(providerUrl);// 过滤重复的服务提供端配置数据String key = url.toFullString();if (keys.contains(key)) {continue;} keys.add(key);

// 缓存键是不与使用者端参数合并的url,无论使用者如何合并参数,如果服务器url更改,则再次引用Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local referenceInvoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);

// 缓存无对应 invoker,再次调用 protocol#refer 是否有数据if (invoker == null) {try {boolean enabled = true;if (url.hasParameter(DISABLED_KEY)) { enabled = !url.getParameter(DISABLED_KEY, false);} else { enabled = url.getParameter(ENABLED_KEY, true);}if (enabled) { invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);}} catch (Throwable t) { logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);}// 将新的 Invoker 缓存起来if (invoker != null) { // Put new invoker in cache newUrlInvokerMap.put(key, invoker);}} else {// 缓存里有数据,则进行重新覆盖 newUrlInvokerMap.put(key, invoker);}} keys.clear();return newUrlInvokerMap;}


# 总结


我们可以看到,不管是暴露还是消费,Dubbo 都是以 Invoker 为数据交换主体进行,通过对 Invoker 发起调用,实现一个远程或本地的实现。



 
   
   
 

 往期推荐 

🔗




 

点击阅读原文,获得更多精彩内容