vlambda博客
学习文章列表

dubbo源码阅读之负载均衡

负载均衡

我们分析了通过监听注册中心可以获取到多个服务提供者,并创建多个Invoker,然后通过集群类如FailoverClusterInvoker将多个Invoker封装在一起,而外部的调用者以这个封装的Invoker为入口调用内部的多个Invoker,但是我们一次调用实际只能调用一个真实的Invoker(这里的真实的Invoker对应一个提供者),所以怎么在多个Invoker中选择出一个Invoker来,使得整体的服务调用的性能最大化,这就是负载均衡策略。另外,除了负载均衡,集群类的一个重要功能就是处理服务调用故障,当一个invoker调用失败后怎么办,根据对这种情况的不同处理策略分为不同的集群类。当然除了以上两个功能,集群类的另一个功能就是粘滞调用,这里不再赘述。
值得一提的是,消费端的Invoker其实是一个分层的结构,我们可以在配置文件中指定多个注册中心,这样就会封装出多个Invoker(每个Invoker对应一个注册中心),对于这多个Invoker再通过集群类的join方法封装为一个总的Invoker(StaticDirectory),所以集群类在执行故障处理,负载均衡策略时实际上也是分层级的。
好了,集群类的回顾就到这里,说这么多主要是想引出一点:负载均衡逻辑的执行是在集群类中,所以分析负载均衡就以集群类为切入点。
dubbo提供了五种负载均衡器,下面我们一一分析。

RandomLoadBalance(随机)

光看名字,会以为真的就是从Invoker列表中随机选择一个,实际上并不是这么简单,dubbo的随机负载均衡考虑到了权重因素,每个服务提供者的每个方法都有一个有一个权重,权重大的意味着被调用的可能性更大。 so, talk is cheap ,show me your code !

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// Number of invokers
int length = invokers.size();
// Every invoker has the same weight?
// 表示是否所有的invoker的权重都相等??
boolean sameWeight = true;
// the weight of every invokers
// 用于记录invoker权重的数组
int[] weights = new int[length];
// the first invoker's weight
// 第一个invoker的权重
int firstWeight = getWeight(invokers.get(0), invocation);
weights[0] = firstWeight;
// The sum of weights
// 所有invoker权重的和
int totalWeight = firstWeight;
// 遍历所有的invoker,获取权重,更新簿记量
for (int i = 1; i < length; i++) {
// 获取权重,默认是100
// 考虑到了服务预热的特性,
// 所谓服务预热是指服务在刚启动时不会一下子被赋予全部的权重,而是缓慢第达到设定的权重值
// 默认权重是100, 默认的预热时间是10分钟
int weight = getWeight(invokers.get(i), invocation);
// save for later use
// 记录权重
weights[i] = weight;
// Sum
// 记录权重和
totalWeight += weight;
// 如果与第一个权重不相等,那么更新簿记量
if (sameWeight && weight != firstWeight) {
sameWeight = false;
}
}
// 如果invoker的权重有区别,那么需要根据权重来随机
if (totalWeight > 0 && !sameWeight) {
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
int offset = ThreadLocalRandom.current().nextInt(totalWeight);
// Return a invoker based on the random value.
// 这是根据权重值随机的典型用法
for (int i = 0; i < length; i++) {
offset -= weights[i];
if (offset < 0) {
return invokers.get(i);
}
}
}
// If all invokers have the same weight value or totalWeight=0, return evenly.
// 如果所有invoker的权重相等,或者权重和为0, 那么随机选择一个invoker
return invokers.get(ThreadLocalRandom.current().nextInt(length));
}

这里指的关注的是所谓的随机并不是完全随机,而是在考虑权重的基础上的随机,这种算法值得我们学习,以后再需要的场景中也可以这样做。
此外,dubbo还有服务预热的特性,就像我在注释中说的,这种策略应该是为了避免在服务刚启动时就涌上来大量的请求,导致服务崩溃,设置一个缓冲时间可以有效避免这种情况,默认的预热时间是10分钟,默认的权重值是100。

RoundRobinLoadBalance(轮询)

接下来,我们分析一下轮询调度算法。

@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// 方法的key值
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
if (map == null) {
// 注意,这里使用的是putIfAbsent方法,考虑到并发问题
methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>());
map = methodWeightMap.get(key);
}
int totalWeight = 0;
// 簿记量,记录当前的最大权重
long maxCurrent = Long.MIN_VALUE;
long now = System.currentTimeMillis();
// 被选中的Invoker
Invoker<T> selectedInvoker = null;
WeightedRoundRobin selectedWRR = null;
for (Invoker<T> invoker : invokers) {
// 获取该url的唯一标识
String identifyString = invoker.getUrl().toIdentityString();
WeightedRoundRobin weightedRoundRobin = map.get(identifyString);
// 获取权重,考虑到预热
int weight = getWeight(invoker, invocation);

if (weightedRoundRobin == null) {
weightedRoundRobin = new WeightedRoundRobin();
// 设置权重
weightedRoundRobin.setWeight(weight);
// 仍然调用putIfAbsent方法
map.putIfAbsent(identifyString, weightedRoundRobin);
// TODO 个人认为这里应该加下面这句
// weightedRoundRobin = map.get(identifyString);
}
if (weight != weightedRoundRobin.getWeight()) {
//weight changed
// 更新权重
weightedRoundRobin.setWeight(weight);
}
// 获取当前的累积权重
long cur = weightedRoundRobin.increaseCurrent();
// 设置更新时间
weightedRoundRobin.setLastUpdate(now);
// 如果累积权重大于当前的最大权重,那么将当前选中的invoker设为这个invoker
// 并更新最大权重值
if (cur > maxCurrent) {
maxCurrent = cur;
selectedInvoker = invoker;
selectedWRR = weightedRoundRobin;
}
// 累加权重和
totalWeight += weight;
}

// 如果invoker有变化,那么将那些长时间未被选到的WeightedRoundRobin清除出去
// 这里用AtomicBoolean来加锁,cas锁
if (!updateLock.get() && invokers.size() != map.size()) {
if (updateLock.compareAndSet(false, true)) {
try {
// copy -> modify -> update reference
ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<String, WeightedRoundRobin>();
newMap.putAll(map);
Iterator<Entry<String, WeightedRoundRobin>> it = newMap.entrySet().iterator();
while (it.hasNext()) {
Entry<String, WeightedRoundRobin> item = it.next();
if (now - item.getValue().getLastUpdate() > RECYCLE_PERIOD) {
it.remove();
}
}
methodWeightMap.put(key, newMap);
} finally {
updateLock.set(false);
}
}
}
if (selectedInvoker != null) {
// 这一句很重要
// 选中这个invoker之后,将他的累积权重减去这一轮的总权重,
// 相当于将其放到队尾,当然仍然还是考虑到权重的,权重的大的即是减去总权重也还是有较高的累积权重
selectedWRR.sel(totalWeight);
return selectedInvoker;
}
// should not happen here
// 逻辑上将,代码是不会到这里的
// 但是由于语法的要求这里必须要有返回语句
return invokers.get(0);
}

这里的轮询仍然是考虑到权重因素的,这里对于轮询的实现很巧妙,使用累积权重的方法,巧妙地将权重因素考虑进去;而当所有的服务提供者的权重都相等时,这种算法就会退化为标准的轮询算法,实际上累积权重在这里还起到了记忆轮询状态的作用。可以这么形象地理解,当调用过某个提供者后,就把这个提供者扔到靠后面的一个位置,权重越小越靠后,这样就实现了轮询的功能。

LeastActiveLoadBalance

// 最小活跃数,考虑权重因素
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// Number of invokers
int length = invokers.size();
// The least active value of all invokers
// 用于记录最小活跃数
int leastActive = -1;
// The number of invokers having the same least active value (leastActive)
// 相同最小活跃数的invoker数量
int leastCount = 0;
// The index of invokers having the same least active value (leastActive)
// 拥有相同的最小活跃数的那些invoker的下标
int[] leastIndexes = new int[length];
// the weight of every invokers
// 每个invoker的权重
int[] weights = new int[length];
// The sum of the warmup weights of all the least active invokes
// 权重和
int totalWeight = 0;
// The weight of the first least active invoke
// 第一个最小活跃数的invoker的权重
int firstWeight = 0;
// Every least active invoker has the same weight value?
// 如果有多个相同最小活跃数的invoker,他们的权重是否相同
boolean sameWeight = true;


// Filter out all the least active invokers
// 这个循环的作用是选出最小活跃数的invoker,如果有多个相同最小活跃数的invoker,全部选择出来
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
// Get the active number of the invoke
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
// Get the weight of the invoke configuration. The default value is 100.
int afterWarmup = getWeight(invoker, invocation);
// save for later use
weights[i] = afterWarmup;
// If it is the first invoker or the active number of the invoker is less than the current least active number
if (leastActive == -1 || active < leastActive) {
// Reset the active number of the current invoker to the least active number
leastActive = active;
// Reset the number of least active invokers
leastCount = 1;
// Put the first least active invoker first in leastIndexs
leastIndexes[0] = i;
// Reset totalWeight
totalWeight = afterWarmup;
// Record the weight the first least active invoker
firstWeight = afterWarmup;
// Each invoke has the same weight (only one invoker here)
sameWeight = true;
// If current invoker's active value equals with leaseActive, then accumulating.
} else if (active == leastActive) {
// Record the index of the least active invoker in leastIndexs order
leastIndexes[leastCount++] = i;
// Accumulate the total weight of the least active invoker
totalWeight += afterWarmup;
// If every invoker has the same weight?
if (sameWeight && i > 0
&& afterWarmup != firstWeight) {
sameWeight = false;
}
}
}
// Choose an invoker from all the least active invokers
// 如果最小活跃数的invoker只有一个,那么就不用考虑权重因素
if (leastCount == 1) {
// If we got exactly one invoker having the least active value, return this invoker directly.
return invokers.get(leastIndexes[0]);
}
// 如果他们的权重不同,那么需要考虑权重的因素
// 这个方法与RandomLoadBalance中类似
if (!sameWeight && totalWeight > 0) {
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on
// totalWeight.
int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
// Return a invoker based on the random value.
for (int i = 0; i < leastCount; i++) {
int leastIndex = leastIndexes[i];
offsetWeight -= weights[leastIndex];
if (offsetWeight < 0) {
return invokers.get(leastIndex);
}
}
}
// If all invokers have the same weight value or totalWeight=0, return evenly.
// 如果他们的权重相同,那就随机选择一个
return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
}

这个方法相信大家都能看懂,这里我想重点说一下最小活跃数的维护方法。
最小活跃数是通过RpcStatus类维护的,这个类对于每个服务的每个方法都维护了一个RpcStatus类,可以看出来,dubbo对于负载均衡是细化到方法级别的,粒度还是很细的,从前面轮询算法也可以看出来这点。
RpcStatus类中除了维护最小活跃数,还维护了调用事件,总调用时间,调用次数,失败的调用次数等等统计量,那么问题是:这些统计量是在什么时候更新的呢?你肯定会回答当然是在服务调用的时候更新的。那么在代码中是怎么保证在每次服务调用的时候都更新相应的RpcStatus类呢??答案就是ExtensionLoader的AOP特性,可见dubbo的spi机制真是无处不在,贯穿了框架的各个部分。ExtensionLoader在加载一个接口的实现类的时候会将一些包装类缓存起来,当实例化一个实现类的时候会用这些包装类对实例进行层层包装(通过构造方法),这样就实现了多层代理,每个包装类都会加入一些通用的功能,实际上就是切面的思想。 服务调用的统计其实正是一个通用的功能,很适合作为一个横切面,切入所有的Invoker中。而消费端Invoker的创建是在Protocol.refer方法中,我们看一下Protocol实现类,发现了一些包装类:

  • ProtocolFilterWrapper, 嵌入可用的Filter

  • ProtocolListenerWrapper, 加入监听器

  • QosProtocolWrapper, 启动QOS服务

最小活跃数功能的实现是通过过滤器。ProtocolFilterWrapper.refer方法在创建invoker时会通过层层代理的方式形成一个过滤器链,注意对于注册中心的Invoker是不用添加过滤器链的,只有对真正执行远程调用的那些Invoker才会添加过滤器链。

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
}

Filter也是一个SPI接口,是通过ExtensionLoader加载的,在加载Filter接口是调用了getActivateExtension方法,与Activate注解有关。
活跃数的功能是通过ActiveLimitFilter过滤器实现的。

ActiveLimitFilter.invoke

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
// 调用的方法名
String methodName = invocation.getMethodName();
// 最大活跃数,默认是0,0表示不限制,这个逻辑在RpcStatus.beginCount中
int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
// 获取方法对应的RpcStatus对象
RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
// 如果增加活跃数失败,就需要等待,
// 这里的活跃数类似于信号量,作用是控制最大并发量
if (!count.beginCount(url, methodName, max)) {
long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
long start = System.currentTimeMillis();
long remain = timeout;
synchronized (count) {
// 等待活跃数降下来,并试图获取到一个活跃数,知道超时
while (!count.beginCount(url, methodName, max)) {
try {
count.wait(remain);
} catch (InterruptedException e) {
// ignore
}
long elapsed = System.currentTimeMillis() - start;
remain = timeout - elapsed;
if (remain <= 0) {
throw new RpcException("Waiting concurrent invoke timeout in client-side for service: "
+ invoker.getInterface().getName() + ", method: "
+ invocation.getMethodName() + ", elapsed: " + elapsed
+ ", timeout: " + timeout + ". concurrent invokes: " + count.getActive()
+ ". max concurrent invoke limit: " + max);
}
}
}
}

// 到这里说明成功获取到一个活跃数,那么可以继续后面的调用
boolean isSuccess = true;
long begin = System.currentTimeMillis();
try {
return invoker.invoke(invocation);
} catch (RuntimeException t) {
isSuccess = false;
throw t;
} finally {
// 调用结束,释放获取到的活跃数
count.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
// 这里指的思考一下,为什么只有在max>0时才通知
// 因为max<=0时,最大活跃数是Integer.MAX_VALUE, 相当于不设上限,
// 所以在前面的循环中第一次就能获取到活跃数,压根不会进入循环,也就不会获取锁等待,
// 所以这里也就不需要通知了
if (max > 0) {
synchronized (count) {
count.notifyAll();
}
}
}
}

注释已经很清楚,就不再赘述。

ConsistentHashLoadBalance

public class ConsistentHashLoadBalance extends AbstractLoadBalance {
public static final String NAME = "consistenthash";

private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();

@SuppressWarnings("unchecked")
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String methodName = RpcUtils.getMethodName(invocation);
// 以方法为粒度
String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;
// invokers的唯一标示,
int identityHashCode = System.identityHashCode(invokers);
ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
// 如果invokers列表变了,就重新hash
if (selector == null || selector.identityHashCode != identityHashCode) {
selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode));
selector = (ConsistentHashSelector<T>) selectors.get(key);
}
return selector.select(invocation);
}

private static final class ConsistentHashSelector<T> {

private final TreeMap<Long, Invoker<T>> virtualInvokers;

private final int replicaNumber;

private final int identityHashCode;

private final int[] argumentIndex;

ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
// 虚拟节点,用于使请求更均匀
this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
this.identityHashCode = identityHashCode;
URL url = invokers.get(0).getUrl();
// 虚拟节点个数
this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
// 参与生成hash值的参数的序号
String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
argumentIndex = new int[index.length];
for (int i = 0; i < index.length; i++) {
argumentIndex[i] = Integer.parseInt(index[i]);
}
// 初始化的时候生成hash环
for (Invoker<T> invoker : invokers) {
// 一个地址(ip:port)生成的hash节点实际上是固定的
String address = invoker.getUrl().getAddress();
for (int i = 0; i < replicaNumber / 4; i++) {
byte[] digest = md5(address + i);
for (int h = 0; h < 4; h++) {
// 生成的hash值尽量散列
long m = hash(digest, h);
virtualInvokers.put(m, invoker);
}
}
}
}


public Invoker<T> select(Invocation invocation) {
// 用指定的参数参与生成hash
String key = toKey(invocation.getArguments());
byte[] digest = md5(key);
// 只要参数相同,那么生成的hash值就是相同的,选择到的invoker就是相同的
// 这也就保证了相同的调用(同一个服务的,并且调用参数相同),总会被分配到固定的提供者
return selectForKey(hash(digest, 0));
}

private String toKey(Object[] args) {
StringBuilder buf = new StringBuilder();
for (int i : argumentIndex) {
if (i >= 0 && i < args.length) {
buf.append(args[i]);
}
}
return buf.toString();
}

private Invoker<T> selectForKey(long hash) {
Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash);
// entry == null说明hash比treeMap中最大的hash值还大,
// 根据一致性hash环的算法,这是需要绕回第一个节点,即hash最小的那个节点
if (entry == null) {
entry = virtualInvokers.firstEntry();
}
return entry.getValue();
}

// 散列函数,这种方法使生成的hash尽量均匀分散
private long hash(byte[] digest, int number) {
return (((long) (digest[3 + number * 4] & 0xFF) << 24)
| ((long) (digest[2 + number * 4] & 0xFF) << 16)
| ((long) (digest[1 + number * 4] & 0xFF) << 8)
| (digest[number * 4] & 0xFF))
& 0xFFFFFFFFL;
}

private byte[] md5(String value) {
MessageDigest md5;
try {
md5 = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException(e.getMessage(), e);
}
md5.reset();
byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
md5.update(bytes);
return md5.digest();
}
}

}

首先我们要理解一致性hash的算法原理,明白一致性hash算法之后再来看这个类,就简单多了。
值得注意的是,这里使用了TreeMap保存hash环的所有hash值值得借鉴,这样做的好处是查找快速,效率很高。