vlambda博客
学习文章列表

dubbo学习-负载均衡源码剖析

1. 负载均衡介绍

负载均衡是集群服务中非常主要的组成部分,负载均衡可以通过将请求均摊到每台机器,从而避免了某台机器忙不过来,而某台机器闲的一批的状态。负载均衡分为软件负载均衡和硬件负载均衡,硬件负载均衡在日常开发中一般也接触不到,但是软件负载均衡是可以接触到的,比如Nignx。Dubbo中也提供了负载均衡的实现,避免某些服务提供者接收的请求过大,负载过高,从而导致请求超时,因此将请求通过负载均衡到每个提供者上是非常有必要的。

2. 源码分析

Dubbo中提供了四种负载均衡实现。

  • 基于加权的随机算法—RandomLoadbalance。

  • 基于最少活跃调用算法—LeastActiveLoadbalance。

  • 基于一致性哈希算法—ConsistentHashLoadBalance。

  • 基于加权轮询算法—RoundRobinLoadBalance。

这四个负载均衡实现类都继承了AbstractLoadBalance类,并重写了doSelect方法,各个负载均衡算法在这个方法中体现。AbstactLoaderBalace中提供了通用的getWeight方法,用于计算提供者的权重,下面我们来看下getWeight方法的实现。

// 获取提供者的权重,这里考虑了预热时间,如果正常运行时间在预热时间之内,权重将按比例减少protected int getWeight(Invoker<?> invoker, Invocation invocation) {  // 获取配置的权重值 ① int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), WEIGHT_KEY, DEFAULT_WEIGHT);  if (weight > 0) {    // 获取timestramp的值,默认为0    long timestamp = invoker.getUrl().getParameter(TIMESTAMP_KEY, 0L); // ② if (timestamp > 0L) {      // 计算当前invoker已经启动了多久      int uptime = (int) (System.currentTimeMillis() - timestamp); // ③      // 预热时间 int warmup = invoker.getUrl().getParameter(WARMUP_KEY, DEFAULT_WARMUP);      if (uptime > 0 && uptime < warmup) { // ④        // 计算权重 weight = calculateWarmupWeight(uptime, warmup, weight); } } } // 返回权重值 ⑤ return weight >= 0 ? weight : 0; }

①: 获取方法配置weight值作为权重值,缺省配置为100。

②: 获取timestamp的值,默认为0。

③: 当前时间减去timestamp的值为服务启动时间。

④: 获取配置的wramup值,作为服务预热时间。启动时间大于0,且启动时间小于预热时间,调用calculateWarmupWeight方法计算权重。

⑤: 返回权重值,权重值最小为0。

getWeight方法第7行代码,在dubbo2.7.4.1版本之前,是有bug的。在2.7.4.1版本之前,invoker的URL信息中是没有remote.timestamp这个key的,所以导致timestamp一直都是0,也就是说getWeight方法一直返回的都是0,相关说明

static int calculateWarmupWeight(int uptime, int warmup, int weight) {  int ww = (int) ((float) uptime / ((float) warmup / (float) weight));  return ww < 1 ? 1 : (ww > weight ? weight : ww);}

根据最后三目运算公式可知返回的权重值在[1, weight]之间,分析下计算过程,首先warmup和weight都是读取配置来的,且都有缺省值,所以warmup和weight都是常量,那么warmup / weight也是个常量。ww = uptime / (warmup / weight), 那么uptime的值越大,ww的值也就越大,ww越大的话,最后返回的结果就越接近于weight值。

2.1 RandomLoadbalance源码实现

RandomLoadbalance是基于加权的随机算法实现,算法的基本思想: 假设有3台服务器 A、B、C,对应的权重为 5、3、 2,权重总和为10。把这个比例映射到[0, 10)的区间,那么[0, 5) 为A,[5, 8)为B,(8, 10]为C。那么生成一个[0, 10)的随机数,这个数落到那个区间,对应就是选择那个服务器。区间跨度越大,随机数落到这个区间的概率也就越大。经过大量的选择之后,各个服务器被选择的次数会约等于各自的比例。

下面看下RandomLoadbalance的源码实现。

public class RandomLoadBalance extends AbstractLoadBalance {
public static final String NAME = "random";
@Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { // Invoker的个数 int length = invokers.size(); // 表示每个invoker是否具有相同的权重 boolean sameWeight = true; // 记录每个invoker的权重 int[] weights = new int[length]; // 选算出第一个Invoker的权重值 int firstWeight = getWeight(invokers.get(0), invocation); weights[0] = firstWeight; // 权重之和 int totalWeight = firstWeight; for (int i = 1; i < length; i++) { // 计算权重值 int weight = getWeight(invokers.get(i), invocation); // 将权重值存储在weights数组中 weights[i] = weight; // 求和 totalWeight += weight; // 当前weight != firstWeight,则sameWeight为false if (sameWeight && weight != firstWeight) { sameWeight = false; } } // 只有所有的Invoker的权重不相等,且总权重之和大于零,基于总权重之后随机选择一个Invoker if (totalWeight > 0 && !sameWeight) { // 获取[0, totalWeight)间的随机数 int offset = ThreadLocalRandom.current().nextInt(totalWeight); // Return a invoker based on the random value. for (int i = 0; i < length; i++) { offset -= weights[i]; // offset 减去 weights[i] < 0,说明offset在第i个invoker权重所处的区间 if (offset < 0) { return invokers.get(i); } } } // 如果所有的invoker权重相等,则随便返回那个都可以 return invokers.get(ThreadLocalRandom.current().nextInt(length)); }}

RandomLoadbalance的源码分析到此结束,实现相对比较简单,代码中也加了注释帮助理解。

2.2 LeastActiveLoadbalance源码实现

LeastActiveLoadbalance是基于最小活跃数算法的实现。算法的基本思想: 活跃调用数越小,说明服务提供者效率提高,单位时间内可以处理更多的请求,那么应该优先将请求分配给该服务提供者。在具体实现中,每个服务提供者对应一个活跃数active,初始情况下,所有的服务提供者active都为0。每收到一个请求,活跃数加1,完成请求后活跃数减1。在服务运行一段时间之后,性能好的服务提供者处理请求的速度更快,因此活跃数减小的也越快,此时这样的服务提供者应该优先获取到新的请求。

下面看下LeastActiveLoadbalance的源码实现。

public class LeastActiveLoadBalance extends AbstractLoadBalance {
public static final String NAME = "leastactive";
@Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { // invoker的个数 int length = invokers.size(); // 最小活跃调用数的值 int leastActive = -1; // 相同最小活跃调用数的invoker个数 int leastCount = 0; // 记录相同最小活跃调用数invoker所在下标的数组 int[] leastIndexes = new int[length]; // 记录每个invoker权重的数组 int[] weights = new int[length]; // 相同最小活跃调用数invoker的权重和 int totalWeight = 0; // 第一个具有最小活跃调用数的invoker权重 int firstWeight = 0; // 相同最小活跃调用数的invoker权重是否相等 boolean sameWeight = true;
// 过滤出具有最小活跃调用数的Invoker for (int i = 0; i < length; i++) { Invoker<T> invoker = invokers.get(i); // 获取invoker的active值 int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // 获取invoker权重值,默认为100 int afterWarmup = getWeight(invoker, invocation); // 记录权重 weights[i] = afterWarmup; // 如果是第一个invoker,或者active小于当前leastActive if (leastActive == -1 || active < leastActive) { // 更新leastActive为当前invoker的active leastActive = active; // 更新leastCount值为1 leastCount = 1; // 把第一个具有最小活跃调用数的invoke放在数组首位 leastIndexes[0] = i; // 更新相同最小活跃调用数invoker的权重和 totalWeight = afterWarmup; // 更新第一个具有最小活跃调用数的invoker权重 firstWeight = afterWarmup; // 更新相同调用数的invoker是否相等,这里为true,因为是最小的active值,所以肯定是一个active,如果是相等的active走下面的分支 sameWeight = true; } else if (active == leastActive) { // 如果当前invoker的active等于leastActive,则进行累加 // 记录这个invoker的下标 leastIndexes[leastCount++] = i; // 累加具有相同最小活跃调用数invoker的权重值 totalWeight += afterWarmup; // 当前invoker的权重是否等于第一个具有最小活跃调用数Invoker的权重(firstWeight) // 如果不相等,说明相同最小活跃调用数的invoker权重不相等。 if (sameWeight && i > 0 && afterWarmup != firstWeight) { sameWeight = false; } } } // 如果具有最小活跃调用数的invoke只有一个,那么直接返回这个invoker就可以 if (leastCount == 1) { return invokers.get(leastIndexes[0]); } if (!sameWeight && totalWeight > 0) { // 相同最小活跃调用数不具有相同权重,且总权重大于0 // 获取[0, totalWeight)区间的随机数 int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight); // Return a invoker based on the random value. for (int i = 0; i < leastCount; i++) { // 获取第i个具有最小活跃调用数invoker在invokers列表的下标 int leastIndex = leastIndexes[i]; // offset 减去 weights[i] < 0,说明offset在第i个invoker权重所处的区间 offsetWeight -= weights[leastIndex]; if (offsetWeight < 0) { return invokers.get(leastIndex); } } } // 如果相同最小活跃调用的invoker权重都相等,则随便返回哪一个都可以 return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]); }}

这个方法的实现,总体分为两步,首先筛选出拥有最小活跃调用数的invoker,可能会有多个,所以用leastIndexes记录这些invoker的下标。第二部分是从具有相同最小活跃调用数的invoker列表中选择一个invoker,如果只有一个,则返回这个invoker即可;如果有多个,且不具有相同的权重,且它们的总权重大于0,则基于总权重选择其中一个;如果权重都一样,则选择这些invoker中的任意一个。这个实现思路和RandomLoadbalance相似。

2.3 ConsistentHashLoaderBalance源码实现

ConsistentHashLoaderBalance是基于一致性hash算法的实现。算法的基本思想: 选择一个参数(ip或者请求参数)为缓存节点生成一个hash,并将这个hash投射到[0, 232-1]的圆环上。当有请求过来时,则为请求中的缓存项的key值生成一个hash,然后查找第一个大于或等于该hash值的缓存节点,这个节点就作为最后的服务节点。如果当前节点挂了,查找下一个大于该hash值的节点即可。效果如图所示:

dubbo学习-负载均衡源码剖析

在dubbo中,引入了虚拟节点的概念,通过引入虚拟节点,将提供者在圆环上分散开来,避免了数据倾斜问题,数据倾斜是指,由于节点不够分散,导致大量请求落到了同一个点,而其它几点值会收到少量请求的情况。

下面开始分析ConsistentHashLoaderBalance的源码。

public class ConsistentHashLoadBalance extends AbstractLoadBalance { public static final String NAME = "consistenthash";
/** * hash节点的名称 */ public static final String HASH_NODES = "hash.nodes";
/** * 进行hash的参数名 */ public static final String HASH_ARGUMENTS = "hash.arguments";
/** * 缓存ConsistentHashSelector */ private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();
@SuppressWarnings("unchecked") @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { String methodName = RpcUtils.getMethodName(invocation); String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName; // 获取invokers的原始hashcode值 int identityHashCode = System.identityHashCode(invokers); ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key); // 缓存中没有对应的selector实例,或者invoker列表更新了,新增或者减少了Invoker实例,invokers的hash值也会改变,那么selector.identityHashCode != identityHashCode就成立 if (selector == null || selector.identityHashCode != identityHashCode) { // 创建新的ConsistentHashSelector对象 selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode)); selector = (ConsistentHashSelector<T>) selectors.get(key); } // 调用ConsistentHashSelector的select方法选择Invoker return selector.select(invocation); }}

该方法主要是创建ConsistentHashSelector实例,首先生成selectors缓存的key,然后获取invokers列表的hashcode值,通过判断缓存中对应的ConsistentHashSelector是否存在,以及判断invokers列表是否变更了, 来决定是否创建ConsistentHashSelector实例。最后通过调用ConsistentHashSelector的select方法选择Invoker实例。所以这个算法的实现逻辑主要实现在ConsistentHashSelector中,ConsistentHashSelector是ConsistentHashLoaderBalance的内部类,首先来看下它的构造方法。

private static final class ConsistentHashSelector<T> { // 存储虚拟节点和invoker实例的映射 private final TreeMap<Long, Invoker<T>> virtualInvokers; // 虚拟接待的个数 private final int replicaNumber; // invokes列表的hashcode值 private final int identityHashCode; // 进行hash计算的参数下标 private final int[] argumentIndex;  ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) { this.virtualInvokers = new TreeMap<Long, Invoker<T>>(); this.identityHashCode = identityHashCode; URL url = invokers.get(0).getUrl(); // 获取虚拟节点的个数,缺省配置为160个 this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160); // 获取进行hash的参数,默认选择第一个参数 String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0")); argumentIndex = new int[index.length]; for (int i = 0; i < index.length; i++) { argumentIndex[i] = Integer.parseInt(index[i]); } // 循环每个invoker,计算出它在hash环上的位置,并将它放入到virtualInvokers缓存 for (Invoker<T> invoker : invokers) { // 获取提供者地址 String address = invoker.getUrl().getAddress(); for (int i = 0; i < replicaNumber / 4; i++) { // 对address+i进行md5运算,得到一个长度为16的数组 byte[] digest = md5(address + i); for (int h = 0; h < 4; h++) { // 每次拿数组中的4个byte数进行位运算 long m = hash(digest, h); // 将算出来的hash值作为key,当前invoker作为value,放入virtualInvokers缓存中。 virtualInvokers.put(m, invoker); } } } }}

ConsistentHashSelector的构造函数主要是初始化virtualInvokers。在处理每个invoker,将它映射到虚拟节点的处理中,会获取提供者的值,然后计算replicaNumber / 4address + i的md5值,md5值一个由16个byte数组成的数组。然后每次拿4个byte数进行hash计算,得出一个hash值,作为这个虚拟节点的值,并将这个hash值作为key,invoker作为value放入到virtualInvokers缓冲中。因为生成md5次的次数是replicaNumber / 4,每个md5值会每4个数计算一次hash值,所以最后也会计算出replicaNumber个hash值,相当于一个invoker被投射到了replicaNumber个虚拟节点。

public Invoker<T> select(Invocation invocation) { String key = toKey(invocation.getArguments()); byte[] digest = md5(key); return selectForKey(hash(digest, 0));}

首先将请求的方法参数转换成key,比如sayHello方法的参数为[aaa, bbb],且参与hash计算的参数下标为[0, 1],那么生成的key值为aaabbb。然后会计算这个key值的md5值,还是由16个byte数组成的数组。然后取这个数组的前4个数进行hash计算得到一个hash值,然后将这个hash值作为key调用selectForKey方法选择virtualInvokers中的一个Invoker实例。

private Invoker<T> selectForKey(long hash) { // 返回等于hash值,或者所有key中最小的但是大于hash值的key的值 Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash); if (entry == null) { // 如果上述情况的key不存在,也就是说hash值大于所有的key值,则返回第一个元素 entry = virtualInvokers.firstEntry(); } return entry.getValue();}

selectForKey根据hash值寻找virtualInvokers中符合条件的Invoker实例。

ConsistentHashLoadbalance的实现只和请求的参数值有关系,具有相同的参数值的请求会被分配到同一个服务提供者,因为所有的Invoker实例被映射到虚拟节点的位置是确认的(不是固定不变的,因为如果invokers列表发生变化,相应的ConsistentHashSelector也会重新生成),相同参数的请求生成的hash值也是相同的,所以最后这个请求落到那个虚拟节点也是确定的。这个负载均衡的实现不关系权重。

2.4 RoundRobinLoadBalance源码实现

RoundRobinLoadBalance是基于加权轮询算法的实现。轮询算法的基本思想: 轮询就是将请求轮流分配给每台服务器,例如有[A, B, C]三台提供者,第一个请求分配给A,第二个请求分配给B,第三个请求分配个C,第四个请求分配给A,这就是轮询。轮询是一种无状态负载均衡算法,通常用于每台服务器性能相近的场景。但是实际场景中,并不能保证每台服务器性能均相近,如果将等量的请求分配给性能较差的服务器,这显然是不合理的。因此需要对轮询过程进行加权,以调控每台服务的负载。

下面分析RoundRobinLoadBalance的源码实现。

public class RoundRobinLoadBalance extends AbstractLoadBalance { public static final String NAME = "roundrobin"; // 清除过期数据的周期 private static final int RECYCLE_PERIOD = 60000;
// 封装每个invoker的权重 protected static class WeightedRoundRobin { // invoker配置的权重值 private int weight; // 并发场景下当前invoker会被同时选中,表示该节点被所有线程选中的权重之和 private AtomicLong current = new AtomicLong(0); // 最后一次更新的时间,用户后续缓存超时的判断 private long lastUpdate; public int getWeight() { return weight; } public void setWeight(int weight) { this.weight = weight; current.set(0); } public long increaseCurrent() { return current.addAndGet(weight); } public void sel(int total) { current.addAndGet(-1 * total); } public long getLastUpdate() { return lastUpdate; } public void setLastUpdate(long lastUpdate) { this.lastUpdate = lastUpdate; } } // 存储method权重容器,第一层map的key=接口全限定名 + "." + 方法名,第二层map的key为invoke的url生成的IdentityString private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>(); // 更新锁 private AtomicBoolean updateLock = new AtomicBoolean(); @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key); if (map == null) { methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>()); map = methodWeightMap.get(key); } int totalWeight = 0; long maxCurrent = Long.MIN_VALUE; long now = System.currentTimeMillis(); Invoker<T> selectedInvoker = null; WeightedRoundRobin selectedWRR = null; for (Invoker<T> invoker : invokers) { // 从缓存中获取WeightedRoundRobin实例 String identifyString = invoker.getUrl().toIdentityString(); WeightedRoundRobin weightedRoundRobin = map.get(identifyString); // 计算invoker的权重 int weight = getWeight(invoker, invocation);
// 如果weightedRoundRobin,创建weightedRoundRobin实例,并设置权重值 if (weightedRoundRobin == null) { weightedRoundRobin = new WeightedRoundRobin(); weightedRoundRobin.setWeight(weight); map.putIfAbsent(identifyString, weightedRoundRobin); } // 如果计算出来的权重,和weightedRoundRobin中的权重值不相等,说明权重发生了变化,更新 if (weight != weightedRoundRobin.getWeight()) { //weight changed weightedRoundRobin.setWeight(weight); } // 获取当前weightedRoundRobin的总权重值 long cur = weightedRoundRobin.increaseCurrent(); // 更新weightedRoundRobin的lastUpdate为当前时间 weightedRoundRobin.setLastUpdate(now); // 如果weightedRoundRobin的总权重值大于maxCurrent,更新maxCurrent为cur if (cur > maxCurrent) { maxCurrent = cur; // 选中当前invoker selectedInvoker = invoker; // 选中当前weightedRoundRobin实例 selectedWRR = weightedRoundRobin; } // 更新总权重 totalWeight += weight; } // 更新缓存数据,获取updateLock锁并且invoker的size不等于map的size if (!updateLock.get() && invokers.size() != map.size()) { // 通过cas操作,获取锁 if (updateLock.compareAndSet(false, true)) { try { // 定义newMap为新的容器,将原map容器所有元素放入newMap ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<String, WeightedRoundRobin>(); newMap.putAll(map); // newMap中的每个Entry,当前时间减去上一次更新时间大于60s的元素,将被清除 Iterator<Entry<String, WeightedRoundRobin>> it = newMap.entrySet().iterator(); while (it.hasNext()) { Entry<String, WeightedRoundRobin> item = it.next(); if (now - item.getValue().getLastUpdate() > RECYCLE_PERIOD) { it.remove(); } } // 根据methodWeightMap中key的value methodWeightMap.put(key, newMap); } finally { // 释放锁 updateLock.set(false); } } } // 最终要选择的invoker,将选中的invoker的current值减掉totalWeight if (selectedInvoker != null) { selectedWRR.sel(totalWeight); return selectedInvoker; } // should not happen here return invokers.get(0); }
}

该方法的逻辑总体分为以下几个步骤:

  • 根据请求方法的key获取ConcurrentMap<String, WeightedRoundRobin>缓存,没有的话,创建。

  • 循环invokers列表

    • 根据url的IdentityString获取WeightedRoundRobin实例,如果没有的话,创建。

    • 如果计算出来的invoker权重不等于WeightedRoundRobin实例的权重,说明权重变化了, 更新。

    • WeightedRoundRobin的current值累加weight值。

    • 更新lastupdated值为当前时间。

    • 选择具有最大current值的invoker和对应的WeightedRoundRobin实例。

    • 更新invokers列表权重之和

  • 更新当前key值对应的ConcurrentMap<String, WeightedRoundRobin>数据。过滤掉长时间没更新的数据,ConcurrentMap<String, WeightedRoundRobin>这个map的key和提供者信息相关,如果该提供者挂掉了, 那么对应的WeightedRoundRobin数据就不能更新, 将会被清除掉,默认时间为60s。

  • 返回选中的invoker实例。

参考资料:

http://dubbo.apache.org/zh/docs/v2.7/dev/source/loadbalance/