Dubbo路由功能实现灰度发布及源码分析
灰度发布是实现新旧版本平滑过渡的一种发布方式,即让一部分服务更新到新版本,如果这部分服务没有什么问题,再将其它旧版本的服务更新。而实现简单的灰度发布我们可以使用版本号控制,每次发布都更新版本号,新更新的服务就不会调用旧的服务提供者。
较复杂的灰度发布场景可以由版本号加路由功能实现。如果服务部署在很多区域,如华南、华北两个区域,就可以通过标签实现分区的隔离。想要每个区域都只更新部分服务,只需要更新版本号即可。
路由就是在消费者发起一次RPC
调用前根据路由规则过滤目标服务提供者列表,再将过滤后的服务提供者列表作为消费端最终发起RPC
调用的备选提供者。发起一次RPC
调用都是先经过路由过滤,再到负载均衡选出最终调用的服务提供者发起调用。
标签配置支持两种:
条件路由。支持以服务或
Consumer
应用为粒度配置路由规则。标签路由。以
Provider
应用为粒度配置路由规则。
本篇只介绍标签路由的使用及实现
dubbo
版本:2.7.3
以区域隔离为例,介绍Dubbo
路由功能的使用。在源码提供的demo
基础上,修改代码及配置实现,感兴趣的读者可以下载源码跟着步骤实现。
服务提供者:
apache-dubbo-2.7.2-src/dubbo-demo/dubbo-demo-annotation/dubbo-demo-annotation-provider
服务消费者:
apache-dubbo-2.7.2-src/dubbo-demo/dubbo-demo-annotation/dubbo-demo-annotation-consumer
1、服务提供端
为看到效果,需要启动两个服务提供者。idea
需要配置ProviderApplcation
允许多开,即下图勾选Allow parallel run
。
启动服务提供者1
。修改dubbo-provider.properties
配置文件,添加标签为guangdong
dubbo.protocol.port=20880
dubbo.provider.tag=gaungdong
启动服务提供者2
。修改dubbo-provider.properties
配置文件,添加标签为guangxi
dubbo.protocol.port=20881
dubbo.provider.tag=gaungxi
服务启动后,到注册中心验证配置是否生效。本例中使用redis
作为注册中心,通过redis-cli
可以查看注册的两个服务提供者的url
都加上了dubbo.tag
参数。其中dubbo
是rpc
协议。
127.0.0.1:6379> hkeys /dubbo/org.apache.dubbo.demo.DemoService/providers
1) "dubbo://192.168.1.9:20881/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-annotation-provider&bean.name=ServiceBean:org.apache.dubbo.demo.DemoService:1.1.0:demo&deprecated=false&dubbo=2.0.2&dubbo.tag=gaungxi&dynamic=true&generic=false&group=demo&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=22417®ister=true&release=&revision=1.1.0&side=provider×tamp=1581320006701&version=1.1.0"
2) "dubbo://192.168.1.9:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-annotation-provider&bean.name=ServiceBean:org.apache.dubbo.demo.DemoService:1.1.0:demo&deprecated=false&dubbo=2.0.2&dubbo.tag=gaungdong&dynamic=true&generic=false&group=demo&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=22414®ister=true&release=&revision=1.1.0&side=provider×tamp=1581319993284&version=1.1.0"
2、服务消费端
服务消费端可通过RpcContext
动态配置调用的路由标签。RpcContext
就是一个ThreadLocal
。在调用接口方法之前,设置标签。
RpcContext.getContext().setAttachment("dubbo.tag", "gaungdong");
为看效果,将demo
改为循环调用。
public static void main(String[] args) throws InterruptedException {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ConsumerConfiguration.class);
context.start();
DemoServiceComponent1 service = context.getBean(DemoServiceComponent1.class);
while (!Thread.interrupted()) {
RpcContext.getContext().setAttachment("dubbo.tag", "gaungdong");
String hello = service.sayHello("world");
System.out.println("result :" + hello);
Thread.sleep(10000);
}
}
启动服务消费者,观察一段时间。结果就不展示了,只有服务提供者1
会被调用到。
路由功能是使用过滤器链实现的,每个接口(服务)对应一个路由器链,与shiro
框架实现权限验证类似。路由器通过路由器工厂创建,路由器工厂可以配置多个,通过SPI
的@Activate
注解自动激活,最后将这些路由器封装为一条链,路由器的调用顺序可通过配置@Activae
的order
属性指定。每个路由器过滤后返回可调用的服务提供者列表。
源码在dubbo-cluster
模块。
核心类介绍:
Router
:路由器,实现具体的路由功能;RouterChain
:路由器链,封装路由器,链式调用所有路由器;RouterFactory
:路由器工厂,创建路由器;
路由器Router
的接口定义
public interface Router extends Comparable<Router> {
int DEFAULT_PRIORITY = Integer.MAX_VALUE;
URL getUrl();
/**
* 过滤,返回可用的服务提供者
*/
<T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
/**
* 订阅到注册中心事件时被调用 (由路由器链RouterChain调用)
*/
default <T> void notify(List<Invoker<T>> invokers) {
}
/**
* 实现排序
*/
@Override
default int compareTo(Router o) {
if (o == null) {
throw new IllegalArgumentException();
}
return Integer.compare(this.getPriority(), o.getPriority());
}
}
本篇只分析标签路由TagRouter
,看下标签路由器是如何实现路由功能的。
public class TagRouter extends AbstractRouter implements ConfigurationListener {
public static final String NAME = "TAG_ROUTER";
private static final int TAG_ROUTER_DEFAULT_PRIORITY = 100;
private static final Logger logger = LoggerFactory.getLogger(TagRouter.class);
private static final String RULE_SUFFIX = ".tag-router";
private TagRouterRule tagRouterRule;
private String application;
public TagRouter(DynamicConfiguration configuration, URL url) {
super(configuration, url);
this.priority = TAG_ROUTER_DEFAULT_PRIORITY;
}
// 解析更新路由规则
@Override
public synchronized void process(ConfigChangeEvent event) {
try {
if (event.getChangeType().equals(ConfigChangeType.DELETED)) {
this.tagRouterRule = null;
} else {
this.tagRouterRule = TagRuleParser.parse(event.getValue());
}
} catch (Exception e) {
}
}
/**
* 由路由器链RouterChain调用,更新路由规则
* 发送一个ConfigChangeEvent事件,交由process方法更新标签路由规则
*/
@Override
public <T> void notify(List<Invoker<T>> invokers) {
if (CollectionUtils.isEmpty(invokers)) {
return;
}
// 获取服务提供者应用名称
Invoker<T> invoker = invokers.get(0);
URL url = invoker.getUrl();
String providerApplication = url.getParameter(CommonConstants.REMOTE_APPLICATION_KEY);
if (StringUtils.isEmpty(providerApplication)) {
return;
}
synchronized (this) {
if (!providerApplication.equals(application)) {
// 移除当前的路由配置改变监听器
if (!StringUtils.isEmpty(application)) {
configuration.removeListener(application + RULE_SUFFIX, this);
}
String key = providerApplication + RULE_SUFFIX;
// 设置新的路由配置改变监听器
configuration.addListener(key, this);
application = providerApplication;
// 获取规则配置
String rawRule = configuration.getConfig(key);
if (StringUtils.isNotEmpty(rawRule)) {
this.process(new ConfigChangeEvent(key, rawRule));
}
}
}
}
具体的路由器只需要实现Router
接口的route
方法即可,而标签路由器就是过滤掉没有与当前消费者相同标签的服务提供者。服务提供者标签可以设置多个,多个标签使用‘,’
号分割,只要服务提供者的标签中含有与当前消费者相同的标签就是匹配的。
@Override
public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
.......
List<Invoker<T>> result = invokers;
// 获取消费端指定的标签
String tag = StringUtils.isEmpty(invocation.getAttachment(Constants.TAG_KEY)) ? url.getParameter(Constants.TAG_KEY) :
invocation.getAttachment(Constants.TAG_KEY);
// 如果请求具有特定标记的服务(tag不为空)
if (StringUtils.isNotEmpty(tag)) {
// 获取标签tag匹配的所有服务提供者的addresse
// 如下这种配置会用到:
// tags:
// - name: tag1
// addresses: ["127.0.0.1:20880"]
// - name: tag2
// addresses: ["127.0.0.1:20881"]
List<String> addresses = tagRouterRuleCopy.getTagnameToAddresses().get(tag);
if (CollectionUtils.isNotEmpty(addresses)) {
result = filterInvoker(invokers, invoker -> addressMatches(invoker.getUrl(), addresses));
// 如果result不为空,或为空但force=true,则直接返回result
if (CollectionUtils.isNotEmpty(result) || tagRouterRuleCopy.isForce()) {
return result;
}
} else {
// 从所有服务提供者中,过滤获取url中dubbo.tag参数与tag相同的
result = filterInvoker(invokers, invoker -> tag.equals(invoker.getUrl().getParameter(Constants.TAG_KEY)));
}
// 如果过滤后的结果不为空,且强制使用标签,则返回过滤结果
if (CollectionUtils.isNotEmpty(result) || isForceUseTag(invocation)) {
return result;
}
// 返回所有未设置标签的服务提供者
else {
List<Invoker<T>> tmp = filterInvoker(invokers,
invoker -> addressNotMatches(invoker.getUrl(), tagRouterRuleCopy.getAddresses()));
return filterInvoker(tmp, invoker -> StringUtils.isEmpty(invoker.getUrl().getParameter(Constants.TAG_KEY)));
}
}
.......
}
不是直接通过SPI
注册路由器的,而是注册路由器工厂,由路由器工厂创建路由器,因此每个路由器都会提供一个对应的创建工厂,如标签路由器的创建工厂是TagRouterFactory
。
@Activate(order = 100)
public class TagRouterFactory extends CacheableRouterFactory {
public static final String NAME = "tag";
@Override
protected Router createRouter(URL url) {
return new TagRouter(DynamicConfiguration.getDynamicConfiguration(), url);
}
}
路由器链RouterChain
负载管理所有激活的路由器,按顺序调用路由器,将前一个路由器过滤后的可用服务提供者列表传递给后一个过滤器,直到所有路由器都被调用过,剩下的就是最终可调用的服务提供者。
public List<Invoker<T>> route(URL url, Invocation invocation) {
List<Invoker<T>> finalInvokers = invokers;
// 按排好序的顺序调用路由方法
for (Router router : routers) {
finalInvokers = router.route(finalInvokers, url, invocation);
}
return finalInvokers;
}
route
方法中用到的invokers
是由RegistryDirectory
订阅到注册中心事件时调用RouterChain
的setInvokers
方法更新的。后面会分析到。
/**
* 由注册目录(RegistryDirectory)调用更新
*
* @param invokers 当前注册在注册中心的所有可用提供者
* Notify router chain of the initial addresses from registry at the first time.
* Notify whenever addresses in registry change.
*/
public void setInvokers(List<Invoker<T>> invokers) {
this.invokers = (invokers == null ? Collections.emptyList() : invokers);
// 所有路由器都要更新路由规则
routers.forEach(router -> router.notify(this.invokers));
}
而路由器集合routers
是在RouterChain
的构造方法中通过SPI
机制初始化的。
private RouterChain(URL url) {
// 获取SPI注册的所有路由工厂,@Activate注解声明的RouterFactory
List<RouterFactory> extensionFactories = ExtensionLoader.getExtensionLoader(RouterFactory.class)
.getActivateExtension(url, (String[]) null);
// 遍历SPI注册的路由工厂,由路由工厂创建路由器
List<Router> routers = extensionFactories.stream()
.map(factory -> factory.getRouter(url))
.collect(Collectors.toList());
initWithRouters(routers);
}
排序则在initWithRouters
方法中,调用Collections.sort(routers);
实现的。因为Router
接口继承了Comparable<Router>
接口。
RouterChain
提供了一个静态方法buildChain
,用于创建RouterChain
对象,而将构造方法设为private
。那是在哪里调用buildChain
创建RouterChain
实例的呢,这就需要回顾前面分析的服务引入与调用流程了。如果你对服务引入过程,以及服务调用过程不熟悉,可以看下我的往期文章。
服务提供者并不需要做什么,只是在服务注册到注册中心时带上路由规则的配置即可,以标签路由为例,服务提供者注册到注册中心的URL会带上标签路由规则属性,如dubbo.tag=tag1
。
路由与负载均衡都是在服务消费端实现的,在消费者发起rpc
调用时生效。
在服务引入时,会创建一个Directory
,将多个可调用的服务提供者抽象为一个Directory
,上层不必关心都有哪些服务提供者,由Directory
订阅注册中心的事件,更新所有可用的服务提供者。调用Directory
的list
方法可以获取到当前所有可调用的服务提供者。而路由器过滤链RouterChain
也是由Directory
管理的。
RegistryProtocol
的doRefer
方法。
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 创建RegistryDirectory
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
// 注入注册中心
directory.setRegistry(registry);
// 设置协议(此protocol为DubboProtocol)
directory.setProtocol(protocol);
.......
if (!ANY_VALUE.equals(url.getServiceInterface())
// 是否注册到注册中心
&& url.getParameter(REGISTER_KEY, true)) {
// 注册到注册中心
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
registry.register(directory.getRegisteredConsumerUrl());
}
// 路由,为当前引入的服务提供者创建RouterChain
directory.buildRouterChain(subscribeUrl);
// 服务消费者订阅:提供者、配置、路由改变通知
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY, PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
// 将directory转为cluster,交给cluster调度
Invoker invoker = cluster.join(directory);
.......
return invoker;
}
因此,RouterChain
是由RegistryDirectory
管理的,RegistryDirectory
实现了NotifyListener
接口,在订阅到注册中心事件时,负责更新RouterChain
持有的所有可用Invoker
,实现与注册中心的同步。
需要回忆一下服务的引入过程。
图为服务引入过程中,DubboInvoker
被包装的次数及顺序。由于默认使用的集群容错策略为Failover
,因此DubboInvoker
还会被FailoverClusterInvoker
包装,实现调用失败重试。
从集群容错层开始,一次请求开始到获取到目标服务提供者的调用过程为:
Cluster
管理Directory
,Cluster
调用Directory
的list
方法获取可调用的所有服务提供者;Directory
调用RouterChain
的router
方法,获取所有路由器过滤后返回的可用服务提供者;Cluster
调用负载均衡器的select
方法,返回本次调用的服务提供者。
以默认使用的Failover
失败重试的集群容错为例,路由器链的调用由FailoverClusterInvoker
的doInvoke
方法实现。但dubbo
使用模版方法模式,将路由器链的调用逻辑封装在了抽象类AbstractClusterInvoker
,以实现通用逻辑的重用。因为涉及到Cluster
,所以也将默认的FailoverCluster
简单分析一遍。
AbstractClusterInvoker
类的invoke方法:
@Override
public Result invoke(final Invocation invocation) throws RpcException {
.......
// 路由功能,获取路由器过滤后的可调用提供者
List<Invoker<T>> invokers = list(invocation);
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
.....
// 由子类实现
return doInvoke(invocation, invokers, loadbalance);
}
FailoverClusterInvoker
类的doInvoke方法:
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyInvokers = invokers;
........
for (int i = 0; i < len; i++) {
// 重试时重新路由
if (i > 0) {
.....
copyInvokers = list(invocation);
.....
}
// 负载均衡策略选择一个服务提供者
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
.....
try {
// rpc调用
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
logger.warn();
}
return result;
}
.......
}
throw new RpcException();
}
AbstractClusterInvoker
类的list方法
protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
return directory.list(invocation);
}
最终调用到RegistryDirectory
的doList
方法
@Override
public List<Invoker<T>> doList(Invocation invocation) {
......
List<Invoker<T>> invokers = null;
try {
// 路由,调用RouterChain的route方法。
invokers = routerChain.route(getConsumerUrl(), invocation);
} catch (Throwable t) {
}
.......
return invokers == null ? Collections.emptyList() : invokers;
}
在注册中心层引入服务时,通过抽象服务的所有提供者为Directory
,调用Cluster
的join
方法巧妙的将Directory
包装为FailoverClusterInvoker
返回给上层调用。由Directory
订阅注册中心事件获取所有服务提供者,当上层调用到FailoverClusterInvoker
时,再由FailoverClusterInvoker
从Directory
获取当前注册到注册中心的所有可用服务提供者。而集群容错的失败重试、路由、负载均衡,则都是由Cluster
调度完成的。