dubbo学习(十)路由和负载均衡
一、集群容错流程
再来回顾下集群容错流程,代码如下:
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addAttachments(contextAttachments);
}
//过滤服务列表List<Invoker<T>> invokers = list(invocation);
//负载均衡选择最终要调用的服务LoadBalance loadbalance = initLoadBalance(invokers, invocation);RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
//根据集群容错策略开始远程调用rpcreturn doInvoke(invocation, invokers, loadbalance);
}
总体流程如上所示,主要分为三步:
(1)获取所有可用服务列表。
(2)负载均衡选择服务。
(3)根据集群容错策略远程调用rpc。
这篇文章主要分析步骤一和步骤二的原理。
二、directory的实现
protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
List<Invoker<T>> invokers = directory.list(invocation);
return invokers;
}
首先会调用directory.list方法获取所有可用服务列表。
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
if (destroyed) {
throw new RpcException("Directory already destroyed .url: " + getUrl());}
//获取动态服务列表,是抽象方法,会进入子类中执行方法
List<Invoker<T>> invokers = doList(invocation);
List<Router> localRouters = this.routers;
if (localRouters != null && !localRouters.isEmpty()) {
for (Router router : localRouters) {
try {
if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {//遍历所有的router,进行invoker的过滤
invokers = router.route(invokers, getConsumerUrl(), invocation);
}
} catch (Throwable t) {
logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
}
}
}
return invokers;
}
上述通用逻辑代码被封装在AbstractDirectory中,主要干了以下两件事:
(1)调用抽象方法doList获取所有的invoker列表。
(2)遍历所有的router,进行invoker过滤。
(3)最后返回过滤后的invoker列表。
· 抽象方法doList
(1)StaticDirectory
属于Directory静态列表的实现,即将传入的invoker列表封装成静态的Directory对象,里面的列表不会改变。
protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {
//直接返回invokers
return invokers;
}
(2)RegistryDirectory
属于Directory动态列表的实现,主要区别在于会进行订阅后处理来更新invoker列表、配置信息、路由列表。
public List<Invoker<T>> doList(Invocation invocation) {//服务如果被禁言,直接抛出异常
if (forbidden) {
throw new RpcException(RpcException.FORBIDDEN_EXCEPTION,
"No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " + NetUtils.getLocalHost()
+ " use dubbo version " + Version.getVersion() + ", please check status of providers(disabled, not registered or in blacklist).");
}List<Invoker<T>> invokers = null;
//根据方法名匹配服务列表,methodInvokerMap会在订阅信息更改后刷新
Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {//获取方法名
String methodName = RpcUtils.getMethodName(invocation);
Object[] args = RpcUtils.getArguments(invocation);
//根据方法名+首参数匹配服务列表
if (args != null && args.length > 0 && args[0] != null
&& (args[0] instanceof String || args[0].getClass().isEnum())) {
invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter}
//根据方法名匹配服务列表
if (invokers == null) {
invokers = localMethodInvokerMap.get(methodName);}
//根据‘*’匹配服务列表
if (invokers == null) {
invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);}
//遍历map,找到第一个invoker列表返回
if (invokers == null) {
Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
if (iterator.hasNext()) {
invokers = iterator.next();
}
}}
//如果仍没有返回空列表
return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
}
上述源码主要是在对服务列表的匹配,主要步骤为:
(1)检查服务是否被禁言。如果禁用,直接抛出异常。
(2)获取map,这个map也会在收到订阅后刷新map,存放了最新的服务列表。
(3)根据方法名和首参数作为key,去map查找服务列表。
(4)根据方法名作为key,去map查找服务列表。
(5)根据"*"作为key,去map查找服务列表。
(6)遍历map,找到第一个invoker列表。
(7)如果找到了服务列表则直接返回,仍没有则返回空列表。
以上为directory的主要原理,directory.list方法执行完毕后,对route进行遍历开始进行invoker的过滤。
三、路由的实现
查看路由的SPI文件,对应的实现有以下三个类。这里主要分析下条件路由的原理。
file=org.apache.dubbo.rpc.cluster.router.file.FileRouterFactory
script=org.apache.dubbo.rpc.cluster.router.script.ScriptRouterFactory
condition=org.apache.dubbo.rpc.cluster.router.condition.ConditionRouterFactory
条件路由 - ConditionRouter
条件路由配置示例:
method = find* => host = 192.168.1.22
• 这条配置说明所有调用find开头的方法都会被路由到IP为192.168.1.22的服务节点上。
•=>之前的部分为消费者匹配条件,将所有参数和消费者的URL进行对比,当消费者 满足匹配条件时,对该消费者执行后面的过滤规则。
• 如果匹配条件为空,则表示应用于所有消费方,如=>host != 192.168.1.22。
• 如果过滤条件为空,则表示禁止访问,如host = 192.168.1.22 =>。
整个规则的表达式支持{protocol等占位符方式,也支持=、!=等条件。值可以支持多个, 用逗号分隔,如host = 192.168.1.22,192.168.1.23;如果以“*”号结尾,则说明是通配符, 如host = 192.168.1.*表示匹配192.168.1.网段下所有的IP。
public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation)
throws RpcException {
if (invokers == null || invokers.isEmpty()) {
return invokers;
}try {
//校验
if (!matchWhen(url, invocation)) {
return invokers;
}
List<Invoker<T>> result = new ArrayList<Invoker<T>>();
if (thenCondition == null) {
logger.warn("The current consumer in the service blacklist. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey());
return result;}
//遍历列表
for (Invoker<T> invoker : invokers) {
//只有符合条件的才加入到返回列表中
if (matchThen(invoker.getUrl(), url)) {
result.add(invoker);
}
}
if (!result.isEmpty()) {
return result;
} else if (force) {
logger.warn("The route result is empty and force execute. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey() + ", router: " + url.getParameterAndDecoded(Constants.RULE_KEY));
return result;
}
} catch (Throwable t) {
logger.error("Failed to execute condition router rule: " + getUrl() + ", invokers: " + invokers + ", cause: " + t.getMessage(), t);
}
return invokers;
}
条件路由匹配规则代码就不做阐述了,主要流程如上图所示:
(1)校验。如果规则没有启用,则直接返回。如果返回的invoker列表为空,则直接返回空,如果没有任何的whenRule匹配,则直接返回传入的invoker列表。如果没有匹配上规则的invoker,则返回空。
(2)遍历invoker列表,过滤未匹配的invoker。
(3)返回匹配成功的invoker列表。
四、负载均衡的实现
经历了router路由规则过滤后,然后会进入负载均衡的实现中开始最终的负载均衡过滤选出最终要调用的invoker。
负载均衡SPI实现类:
random=org.apache.dubbo.rpc.cluster.loadbalance.RandomLoadBalance
roundrobin=org.apache.dubbo.rpc.cluster.loadbalance.RoundRobinLoadBalance
leastactive=org.apache.dubbo.rpc.cluster.loadbalance.LeastActiveLoadBalance
consistenthash=org.apache.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance
· AbstractLoadBalance
public abstract class AbstractLoadBalance implements LoadBalance {
//计算权重
static int calculateWarmupWeight(int uptime, int warmup, int weight) {
int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
return ww < 1 ? 1 : (ww > weight ? weight : ww);
}
@Override
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
if (invokers == null || invokers.isEmpty())
return null;
if (invokers.size() == 1)return invokers.get(0);
//负载均衡策略 选择服务return doSelect(invokers, url, invocation);
}
//doSelect抽象方法
protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);
//获取当前invoker的权重
protected int getWeight(Invoker<?> invoker, Invocation invocation) {
//通过url获取当前invoker设置的权重
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
if (weight > 0) {
//获取启动的时间点
long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);if (timestamp > 0L) {
//这里的差值为已经预热了多久
int uptime = (int) (System.currentTimeMillis() - timestamp);
//获取设置的总预热值
int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);if (uptime > 0 && uptime < warmup) {
//计算最后的权重
weight = calculateWarmupWeight(uptime, warmup, weight);
}
}
}
return weight;
}
}
上述中的权重计算逻辑为,由于dubbo考虑了服务刚启动的时候需要有一个预热的过程,因为如果服务刚启动就给100%的流量,可能会让服务崩溃。
计算公式=(启动至今时间/给予的预热总时间)*权重。
例如,我们设置服务的权重是5,让他预热10分钟,那么第一分钟的权重为(1/10)*5=0.5。0.5/5=0.1=10%的流程。表示第一分钟只承担10%的流程,当10分钟后权重会变成100%,承担所有的流量。
负载均衡的核心逻辑在doSelect抽象方法中。按照配置执行对应的负载均衡策略。
负载均衡策略:
(1)RandomLoadBalance
表示随机,也是默认的负载均衡策略。
(2)RoundRobinLoadBalance
表示轮询。
(3)LeastActiveLoadBalance
表示最少活跃调用数,如果活跃数相同则随机调用。
(4)ConsistentHashLoadBalance
表示一致性Hash策略,相同参数的请求总是发到同一提供者。但是当某一台提供者挂时,原本发往该提供者的请求,会平摊到其他提供者,不会引起剧烈变动。
参考资料:
《深入理解Apache Dubbo与实战》 - 第7章 Dubbo集群容错