vlambda博客
学习文章列表

Dubbo路由功能实现灰度发布及源码分析

关注 “Java艺术”一起来充电吧!

灰度发布是实现新旧版本平滑过渡的一种发布方式,即让一部分服务更新到新版本,如果这部分服务没有什么问题,再将其它旧版本的服务更新。而实现简单的灰度发布我们可以使用版本号控制,每次发布都更新版本号,新更新的服务就不会调用旧的服务提供者。


较复杂的灰度发布场景可以由版本号加路由功能实现。如果服务部署在很多区域,如华南、华北两个区域,就可以通过标签实现分区的隔离。想要每个区域都只更新部服务,只需要更新版本号即可。


路由就是在消费者发起一次RPC调用前根据路由规则过滤目标服务提供者列表,再将过滤后的服务提供者列表作为消费端最终发起RPC调用的备选提供者。发起一次RPC调用都是先经过路由过滤,再到负载均衡选出最终调用的服务提供者发起调用。


标签配置支持两种:

  • 条件路由。支持以服务或Consumer应用为粒度配置路由规则。

  • 标签路由。Provider应用为粒度配置路由规则。


本篇只介绍标签路由的使用及实现

  • dubbo版本:2.7.3


使用路由功能实现区域隔离


以区域隔离为例,介绍Dubbo路由功能的使用。在源码提供的demo基础上,修改代码及配置实现,感兴趣的读者可以下载源码跟着步骤实现。

  • 服务提供者: apache-dubbo-2.7.2-src/dubbo-demo/dubbo-demo-annotation/dubbo-demo-annotation-provider

  • 服务消费者:apache-dubbo-2.7.2-src/dubbo-demo/dubbo-demo-annotation/dubbo-demo-annotation-consumer


1、服务提供端


为看到效果,需要启动两个服务提供者。idea需要配置ProviderApplcation允许多开,即下图勾选Allow parallel run

Dubbo路由功能实现灰度发布及源码分析

启动服务提供者1修改dubbo-provider.properties配置文件,添加标签为guangdong

dubbo.protocol.port=20880
dubbo.provider.tag=gaungdong


启动服务提供者2修改dubbo-provider.properties配置文件,添加标签为guangxi

dubbo.protocol.port=20881
dubbo.provider.tag=gaungxi


服务启动后,到注册中心验证配置是否生效。本例中使用redis作为注册中心,通过redis-cli可以查看注册的两个服务提供者的url都加上了dubbo.tag参数。其中dubborpc协议。


127.0.0.1:6379> hkeys /dubbo/org.apache.dubbo.demo.DemoService/providers
1) "dubbo://192.168.1.9:20881/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-annotation-provider&bean.name=ServiceBean:org.apache.dubbo.demo.DemoService:1.1.0:demo&deprecated=false&dubbo=2.0.2&dubbo.tag=gaungxi&dynamic=true&generic=false&group=demo&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=22417&register=true&release=&revision=1.1.0&side=provider&timestamp=1581320006701&version=1.1.0"
2) "dubbo://192.168.1.9:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-annotation-provider&bean.name=ServiceBean:org.apache.dubbo.demo.DemoService:1.1.0:demo&deprecated=false&dubbo=2.0.2&dubbo.tag=gaungdong&dynamic=true&generic=false&group=demo&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=22414&register=true&release=&revision=1.1.0&side=provider&timestamp=1581319993284&version=1.1.0"


2、服务消费端


服务消费端可通过RpcContext动态配置调用的路由标签。RpcContext就是一个ThreadLocal。在调用接口方法之前,设置标签。


 RpcContext.getContext().setAttachment("dubbo.tag", "gaungdong");


为看效果,将demo改为循环调用。

public static void main(String[] args) throws InterruptedException {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ConsumerConfiguration.class);
context.start();
DemoServiceComponent1 service = context.getBean(DemoServiceComponent1.class);
while (!Thread.interrupted()) {
RpcContext.getContext().setAttachment("dubbo.tag", "gaungdong");
String hello = service.sayHello("world");
System.out.println("result :" + hello);
Thread.sleep(10000);
}
}


启动服务消费者,观察一段时间。结果就不展示了,只有服务提供者1会被调用到。


路由功能的实现原理源码分析



路由功能是使用过滤器链实现的,每个接口(服务)对应一个路由器链,与shiro框架实现权限验证类似。路由器通过路由器工厂创建,路由器工厂可以配置多个,通过SPI@Activate注解自动激活,最后将这些路由器封装为一条链,路由器的调用顺序可通过配置@Activaeorder属性指定。每个路由器过滤后返回可调用的服务提供者列表。


源码在dubbo-cluster模块。


核心类介绍:

  • Router路由器,实现具体的路由功能;

  • RouterChain路由器链,封装路由器,链式调用所有路由器;

  • RouterFactory路由器工厂,创建路由器;


路由器


路由器Router的接口定义


public interface Router extends Comparable<Router> {

int DEFAULT_PRIORITY = Integer.MAX_VALUE;

URL getUrl();

/**
* 过滤,返回可用的服务提供者
*/
<T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;

/**
* 订阅到注册中心事件时被调用 (由路由器链RouterChain调用)
*/
default <T> void notify(List<Invoker<T>> invokers) {
}

/**
* 实现排序
*/
@Override
default int compareTo(Router o) {
if (o == null) {
throw new IllegalArgumentException();
}
return Integer.compare(this.getPriority(), o.getPriority());
}
}


本篇只分析标签路由TagRouter,看下标签路由器是如何实现路由功能的。


public class TagRouter extends AbstractRouter implements ConfigurationListener {
public static final String NAME = "TAG_ROUTER";
private static final int TAG_ROUTER_DEFAULT_PRIORITY = 100;
private static final Logger logger = LoggerFactory.getLogger(TagRouter.class);
private static final String RULE_SUFFIX = ".tag-router";

private TagRouterRule tagRouterRule;
private String application;

public TagRouter(DynamicConfiguration configuration, URL url) {
super(configuration, url);
this.priority = TAG_ROUTER_DEFAULT_PRIORITY;
}

// 解析更新路由规则
@Override
public synchronized void process(ConfigChangeEvent event) {
try {
if (event.getChangeType().equals(ConfigChangeType.DELETED)) {
this.tagRouterRule = null;
} else {
this.tagRouterRule = TagRuleParser.parse(event.getValue());
}
} catch (Exception e) {
}
}

/**
* 由路由器链RouterChain调用,更新路由规则
* 发送一个ConfigChangeEvent事件,交由process方法更新标签路由规则
*/
@Override
public <T> void notify(List<Invoker<T>> invokers) {
if (CollectionUtils.isEmpty(invokers)) {
return;
}
// 获取服务提供者应用名称
Invoker<T> invoker = invokers.get(0);
URL url = invoker.getUrl();
String providerApplication = url.getParameter(CommonConstants.REMOTE_APPLICATION_KEY);
if (StringUtils.isEmpty(providerApplication)) {
return;
}
synchronized (this) {
if (!providerApplication.equals(application)) {
// 移除当前的路由配置改变监听器
if (!StringUtils.isEmpty(application)) {
configuration.removeListener(application + RULE_SUFFIX, this);
}
String key = providerApplication + RULE_SUFFIX;
// 设置新的路由配置改变监听器
configuration.addListener(key, this);
application = providerApplication;
// 获取规则配置
String rawRule = configuration.getConfig(key);
if (StringUtils.isNotEmpty(rawRule)) {
this.process(new ConfigChangeEvent(key, rawRule));
}
}
}
}


具体的路由器只需要实现Router接口的route方法即可,而标签路由器就是过滤掉没有与当前消费者相同标签的服务提供者。服务提供者标签可以设置多个,多个标签使用‘,’号分割,只要服务提供者的标签中含有与当前消费者相同的标签就是匹配的。


@Override
public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
.......
List<Invoker<T>> result = invokers;
// 获取消费端指定的标签
String tag = StringUtils.isEmpty(invocation.getAttachment(Constants.TAG_KEY)) ? url.getParameter(Constants.TAG_KEY) :
invocation.getAttachment(Constants.TAG_KEY);

// 如果请求具有特定标记的服务(tag不为空)
if (StringUtils.isNotEmpty(tag)) {
// 获取标签tag匹配的所有服务提供者的addresse
// 如下这种配置会用到:
// tags:
// - name: tag1
// addresses: ["127.0.0.1:20880"]
// - name: tag2
// addresses: ["127.0.0.1:20881"]
List<String> addresses = tagRouterRuleCopy.getTagnameToAddresses().get(tag);
if (CollectionUtils.isNotEmpty(addresses)) {
result = filterInvoker(invokers, invoker -> addressMatches(invoker.getUrl(), addresses));
// 如果result不为空,或为空但force=true,则直接返回result
if (CollectionUtils.isNotEmpty(result) || tagRouterRuleCopy.isForce()) {
return result;
}
} else {
// 从所有服务提供者中,过滤获取url中dubbo.tag参数与tag相同的
result = filterInvoker(invokers, invoker -> tag.equals(invoker.getUrl().getParameter(Constants.TAG_KEY)));
}
// 如果过滤后的结果不为空,且强制使用标签,则返回过滤结果
if (CollectionUtils.isNotEmpty(result) || isForceUseTag(invocation)) {
return result;
}
// 返回所有未设置标签的服务提供者
else {
List<Invoker<T>> tmp = filterInvoker(invokers,
invoker -> addressNotMatches(invoker.getUrl(), tagRouterRuleCopy.getAddresses()));
return filterInvoker(tmp, invoker -> StringUtils.isEmpty(invoker.getUrl().getParameter(Constants.TAG_KEY)));
}
}
.......
}


不是直接通过SPI注册路由器的,而是注册路由器工厂,由路由器工厂创建路由器,因此每个路由器都会提供一个对应的创建工厂,如标签路由器的创建工厂是TagRouterFactory


@Activate(order = 100)
public class TagRouterFactory extends CacheableRouterFactory {
public static final String NAME = "tag";
@Override
protected Router createRouter(URL url) {
return new TagRouter(DynamicConfiguration.getDynamicConfiguration(), url);
}
}


路由器链


路由器链RouterChain负载管理所有激活的路由器,按顺序调用路由器,将前一个路由器过滤后的可用服务提供者列表传递给后一个过滤器,直到所有路由器都被调用过,剩下的就是最终可调用的服务提供者。


public List<Invoker<T>> route(URL url, Invocation invocation) {
List<Invoker<T>> finalInvokers = invokers;
// 按排好序的顺序调用路由方法
for (Router router : routers) {
finalInvokers = router.route(finalInvokers, url, invocation);
}
return finalInvokers;
}


route方法中用到的invokers是由RegistryDirectory订阅到注册中心事件时调用RouterChainsetInvokers方法更新的。后面会分析到。


/**
* 由注册目录(RegistryDirectory)调用更新
*
* @param invokers 当前注册在注册中心的所有可用提供者
* Notify router chain of the initial addresses from registry at the first time.
* Notify whenever addresses in registry change.
*/
public void setInvokers(List<Invoker<T>> invokers) {
this.invokers = (invokers == null ? Collections.emptyList() : invokers);
// 所有路由器都要更新路由规则
routers.forEach(router -> router.notify(this.invokers));
}


而路由器集合routers是在RouterChain的构造方法中通过SPI机制初始化的。


private RouterChain(URL url) {
// 获取SPI注册的所有路由工厂,@Activate注解声明的RouterFactory
List<RouterFactory> extensionFactories = ExtensionLoader.getExtensionLoader(RouterFactory.class)
.getActivateExtension(url, (String[]) null);

// 遍历SPI注册的路由工厂,由路由工厂创建路由器
List<Router> routers = extensionFactories.stream()
.map(factory -> factory.getRouter(url))
.collect(Collectors.toList());

initWithRouters(routers);
}


排序则在initWithRouters方法中,调用Collections.sort(routers);实现的。因为Router接口继承了Comparable<Router>接口。


RouterChain提供了一个静态方法buildChain,用于创建RouterChain对象,而将构造方法设为private那是在哪里调用buildChain创建RouterChain实例的呢,这就需要回顾前面分析的服务引入与调用流程了。如果你对服务引入过程,以及服务调用过程不熟悉,可以看下我的往期文章。


路由器是什么时候创建的


服务提供者并不需要做什么,只是在服务注册到注册中心时带上路由规则的配置即可,以标签路由为例,服务提供者注册到注册中心的URL会带上标签路由规则属性,如dubbo.tag=tag1


路由与负载均衡都是在服务消费端实现的,在消费者发起rpc调用时生效。



在服务引入时,会创建一个Directory,将多个可调用的服务提供者抽象为一个Directory,上层不必关心都有哪些服务提供者,由Directory订阅注册中心的事件,更新所有可用的服务提供者。调用Directorylist方法可以获取到当前所有可调用的服务提供者。而路由器过滤链RouterChain也是由Directory管理的。


RegistryProtocoldoRefer方法。


private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 创建RegistryDirectory
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
// 注入注册中心
directory.setRegistry(registry);
// 设置协议(此protocol为DubboProtocol)
directory.setProtocol(protocol);

.......
if (!ANY_VALUE.equals(url.getServiceInterface())
// 是否注册到注册中心
&& url.getParameter(REGISTER_KEY, true)) {
// 注册到注册中心
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
registry.register(directory.getRegisteredConsumerUrl());
}

// 路由,为当前引入的服务提供者创建RouterChain
directory.buildRouterChain(subscribeUrl);

// 服务消费者订阅:提供者、配置、路由改变通知
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY, PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));

// 将directory转为cluster,交给cluster调度
Invoker invoker = cluster.join(directory);

.......
return invoker;
}


因此,RouterChain是由RegistryDirectory管理的,RegistryDirectory实现了NotifyListener接口,在订阅到注册中心事件时,负责更新RouterChain持有的所有可用Invoker,实现与注册中心的同步。


路由器链是什么时候被调用的


需要回忆一下服务的引入过程。


图为服务引入过程中,DubboInvoker被包装的次数及顺序。由于默认使用的集群容错策略为Failover,因此DubboInvoker还会被FailoverClusterInvoker包装,实现调用失败重试。


从集群容错层开始,一次请求开始到获取到目标服务提供者的调用过程为:

  • Cluster管理DirectoryCluster调用Directorylist方法获取可调用的所有服务提供者;

  • Directory调用RouterChainrouter方法,获取所有路由器过滤后返回的可用服务提供者;

  • Cluster调用负载均衡器的select方法,返回本次调用的服务提供者。


以默认使用的Failover失败重试的集群容错为例,路由器链的调用由FailoverClusterInvokerdoInvoke方法实现。dubbo使用模版方法模式,将路由器链的调用逻辑封装在了抽象类AbstractClusterInvoker,以实现通用逻辑的重用。因为涉及到Cluster,所以也将默认的FailoverCluster简单分析一遍。


AbstractClusterInvoker类的invoke方法:


@Override
public Result invoke(final Invocation invocation) throws RpcException {
.......
// 路由功能,获取路由器过滤后的可调用提供者
List<Invoker<T>> invokers = list(invocation);
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
.....
// 由子类实现
return doInvoke(invocation, invokers, loadbalance);
}


FailoverClusterInvoker类的doInvoke方法:


 public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyInvokers = invokers;
........
for (int i = 0; i < len; i++) {
// 重试时重新路由
if (i > 0) {
.....
copyInvokers = list(invocation);
.....
}
// 负载均衡策略选择一个服务提供者
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
.....
try {
// rpc调用
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
logger.warn();
}
return result;
}
.......
}
throw new RpcException();
}


AbstractClusterInvoker类的list方法


protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
return directory.list(invocation);
}


最终调用到RegistryDirectorydoList方法


@Override
public List<Invoker<T>> doList(Invocation invocation) {
......
List<Invoker<T>> invokers = null;
try {
// 路由,调用RouterChain的route方法。
invokers = routerChain.route(getConsumerUrl(), invocation);
} catch (Throwable t) {
}
.......
return invokers == null ? Collections.emptyList() : invokers;
}


总结



在注册中心层引入服务时,通过抽象服务的所有提供者为Directory,调用Clusterjoin方法巧妙的将Directory包装为FailoverClusterInvoker返回给上层调用。Directory订阅注册中心事件获取所有服务提供者,当上层调用到FailoverClusterInvoker时,再由FailoverClusterInvokerDirectory获取当前注册到注册中心的所有可用服务提供者。而集群容错的失败重试、路由、负载均衡,则都是由Cluster调度完成的。


往期原创精选