dubbo学习-负载均衡源码剖析
1. 负载均衡介绍
负载均衡是集群服务中非常主要的组成部分,负载均衡可以通过将请求均摊到每台机器,从而避免了某台机器忙不过来,而某台机器闲的一批的状态。负载均衡分为软件负载均衡和硬件负载均衡,硬件负载均衡在日常开发中一般也接触不到,但是软件负载均衡是可以接触到的,比如Nignx。Dubbo中也提供了负载均衡的实现,避免某些服务提供者接收的请求过大,负载过高,从而导致请求超时,因此将请求通过负载均衡到每个提供者上是非常有必要的。
2. 源码分析
Dubbo中提供了四种负载均衡实现。
基于加权的随机算法—RandomLoadbalance。
基于最少活跃调用算法—LeastActiveLoadbalance。
基于一致性哈希算法—ConsistentHashLoadBalance。
基于加权轮询算法—RoundRobinLoadBalance。
这四个负载均衡实现类都继承了AbstractLoadBalance类,并重写了doSelect方法,各个负载均衡算法在这个方法中体现。AbstactLoaderBalace中提供了通用的getWeight方法,用于计算提供者的权重,下面我们来看下getWeight方法的实现。
// 获取提供者的权重,这里考虑了预热时间,如果正常运行时间在预热时间之内,权重将按比例减少protected int getWeight(Invoker<?> invoker, Invocation invocation) {// 获取配置的权重值 ①int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), WEIGHT_KEY, DEFAULT_WEIGHT);if (weight > 0) {// 获取timestramp的值,默认为0long timestamp = invoker.getUrl().getParameter(TIMESTAMP_KEY, 0L); // ②if (timestamp > 0L) {// 计算当前invoker已经启动了多久int uptime = (int) (System.currentTimeMillis() - timestamp); // ③// 预热时间int warmup = invoker.getUrl().getParameter(WARMUP_KEY, DEFAULT_WARMUP);if (uptime > 0 && uptime < warmup) { // ④// 计算权重weight = calculateWarmupWeight(uptime, warmup, weight);}}}// 返回权重值 ⑤return weight >= 0 ? weight : 0;}
①: 获取方法配置weight值作为权重值,缺省配置为100。
②: 获取timestamp的值,默认为0。
③: 当前时间减去timestamp的值为服务启动时间。
④: 获取配置的wramup值,作为服务预热时间。启动时间大于0,且启动时间小于预热时间,调用calculateWarmupWeight方法计算权重。
⑤: 返回权重值,权重值最小为0。
getWeight方法第7行代码,在dubbo2.7.4.1版本之前,是有bug的。在2.7.4.1版本之前,invoker的URL信息中是没有remote.timestamp这个key的,所以导致timestamp一直都是0,也就是说getWeight方法一直返回的都是0,相关说明。
static int calculateWarmupWeight(int uptime, int warmup, int weight) {int ww = (int) ((float) uptime / ((float) warmup / (float) weight));return ww < 1 ? 1 : (ww > weight ? weight : ww);}
根据最后三目运算公式可知返回的权重值在[1, weight]之间,分析下计算过程,首先warmup和weight都是读取配置来的,且都有缺省值,所以warmup和weight都是常量,那么warmup / weight也是个常量。ww = uptime / (warmup / weight), 那么uptime的值越大,ww的值也就越大,ww越大的话,最后返回的结果就越接近于weight值。
2.1 RandomLoadbalance源码实现
RandomLoadbalance是基于加权的随机算法实现,算法的基本思想: 假设有3台服务器 A、B、C,对应的权重为 5、3、 2,权重总和为10。把这个比例映射到[0, 10)的区间,那么[0, 5) 为A,[5, 8)为B,(8, 10]为C。那么生成一个[0, 10)的随机数,这个数落到那个区间,对应就是选择那个服务器。区间跨度越大,随机数落到这个区间的概率也就越大。经过大量的选择之后,各个服务器被选择的次数会约等于各自的比例。
下面看下RandomLoadbalance的源码实现。
public class RandomLoadBalance extends AbstractLoadBalance {public static final String NAME = "random";protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {// Invoker的个数int length = invokers.size();// 表示每个invoker是否具有相同的权重boolean sameWeight = true;// 记录每个invoker的权重int[] weights = new int[length];// 选算出第一个Invoker的权重值int firstWeight = getWeight(invokers.get(0), invocation);weights[0] = firstWeight;// 权重之和int totalWeight = firstWeight;for (int i = 1; i < length; i++) {// 计算权重值int weight = getWeight(invokers.get(i), invocation);// 将权重值存储在weights数组中weights[i] = weight;// 求和totalWeight += weight;// 当前weight != firstWeight,则sameWeight为falseif (sameWeight && weight != firstWeight) {sameWeight = false;}}// 只有所有的Invoker的权重不相等,且总权重之和大于零,基于总权重之后随机选择一个Invokerif (totalWeight > 0 && !sameWeight) {// 获取[0, totalWeight)间的随机数int offset = ThreadLocalRandom.current().nextInt(totalWeight);// Return a invoker based on the random value.for (int i = 0; i < length; i++) {offset -= weights[i];// offset 减去 weights[i] < 0,说明offset在第i个invoker权重所处的区间if (offset < 0) {return invokers.get(i);}}}// 如果所有的invoker权重相等,则随便返回那个都可以return invokers.get(ThreadLocalRandom.current().nextInt(length));}}
RandomLoadbalance的源码分析到此结束,实现相对比较简单,代码中也加了注释帮助理解。
2.2 LeastActiveLoadbalance源码实现
LeastActiveLoadbalance是基于最小活跃数算法的实现。算法的基本思想: 活跃调用数越小,说明服务提供者效率提高,单位时间内可以处理更多的请求,那么应该优先将请求分配给该服务提供者。在具体实现中,每个服务提供者对应一个活跃数active,初始情况下,所有的服务提供者active都为0。每收到一个请求,活跃数加1,完成请求后活跃数减1。在服务运行一段时间之后,性能好的服务提供者处理请求的速度更快,因此活跃数减小的也越快,此时这样的服务提供者应该优先获取到新的请求。
下面看下LeastActiveLoadbalance的源码实现。
public class LeastActiveLoadBalance extends AbstractLoadBalance {public static final String NAME = "leastactive";protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {// invoker的个数int length = invokers.size();// 最小活跃调用数的值int leastActive = -1;// 相同最小活跃调用数的invoker个数int leastCount = 0;// 记录相同最小活跃调用数invoker所在下标的数组int[] leastIndexes = new int[length];// 记录每个invoker权重的数组int[] weights = new int[length];// 相同最小活跃调用数invoker的权重和int totalWeight = 0;// 第一个具有最小活跃调用数的invoker权重int firstWeight = 0;// 相同最小活跃调用数的invoker权重是否相等boolean sameWeight = true;// 过滤出具有最小活跃调用数的Invokerfor (int i = 0; i < length; i++) {Invoker<T> invoker = invokers.get(i);// 获取invoker的active值int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();// 获取invoker权重值,默认为100int afterWarmup = getWeight(invoker, invocation);// 记录权重weights[i] = afterWarmup;// 如果是第一个invoker,或者active小于当前leastActiveif (leastActive == -1 || active < leastActive) {// 更新leastActive为当前invoker的activeleastActive = active;// 更新leastCount值为1leastCount = 1;// 把第一个具有最小活跃调用数的invoke放在数组首位leastIndexes[0] = i;// 更新相同最小活跃调用数invoker的权重和totalWeight = afterWarmup;// 更新第一个具有最小活跃调用数的invoker权重firstWeight = afterWarmup;// 更新相同调用数的invoker是否相等,这里为true,因为是最小的active值,所以肯定是一个active,如果是相等的active走下面的分支sameWeight = true;} else if (active == leastActive) { // 如果当前invoker的active等于leastActive,则进行累加// 记录这个invoker的下标leastIndexes[leastCount++] = i;// 累加具有相同最小活跃调用数invoker的权重值totalWeight += afterWarmup;// 当前invoker的权重是否等于第一个具有最小活跃调用数Invoker的权重(firstWeight)// 如果不相等,说明相同最小活跃调用数的invoker权重不相等。if (sameWeight && i > 0 && afterWarmup != firstWeight) {sameWeight = false;}}}// 如果具有最小活跃调用数的invoke只有一个,那么直接返回这个invoker就可以if (leastCount == 1) {return invokers.get(leastIndexes[0]);}if (!sameWeight && totalWeight > 0) { // 相同最小活跃调用数不具有相同权重,且总权重大于0// 获取[0, totalWeight)区间的随机数int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);// Return a invoker based on the random value.for (int i = 0; i < leastCount; i++) {// 获取第i个具有最小活跃调用数invoker在invokers列表的下标int leastIndex = leastIndexes[i];// offset 减去 weights[i] < 0,说明offset在第i个invoker权重所处的区间offsetWeight -= weights[leastIndex];if (offsetWeight < 0) {return invokers.get(leastIndex);}}}// 如果相同最小活跃调用的invoker权重都相等,则随便返回哪一个都可以return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);}}
这个方法的实现,总体分为两步,首先筛选出拥有最小活跃调用数的invoker,可能会有多个,所以用leastIndexes记录这些invoker的下标。第二部分是从具有相同最小活跃调用数的invoker列表中选择一个invoker,如果只有一个,则返回这个invoker即可;如果有多个,且不具有相同的权重,且它们的总权重大于0,则基于总权重选择其中一个;如果权重都一样,则选择这些invoker中的任意一个。这个实现思路和RandomLoadbalance相似。
2.3 ConsistentHashLoaderBalance源码实现
ConsistentHashLoaderBalance是基于一致性hash算法的实现。算法的基本思想: 选择一个参数(ip或者请求参数)为缓存节点生成一个hash,并将这个hash投射到[0, 232-1]的圆环上。当有请求过来时,则为请求中的缓存项的key值生成一个hash,然后查找第一个大于或等于该hash值的缓存节点,这个节点就作为最后的服务节点。如果当前节点挂了,查找下一个大于该hash值的节点即可。效果如图所示:
在dubbo中,引入了虚拟节点的概念,通过引入虚拟节点,将提供者在圆环上分散开来,避免了数据倾斜问题,数据倾斜是指,由于节点不够分散,导致大量请求落到了同一个节点,而其它几点值会收到少量请求的情况。
下面开始分析ConsistentHashLoaderBalance的源码。
public class ConsistentHashLoadBalance extends AbstractLoadBalance {public static final String NAME = "consistenthash";/*** hash节点的名称*/public static final String HASH_NODES = "hash.nodes";/*** 进行hash的参数名*/public static final String HASH_ARGUMENTS = "hash.arguments";/*** 缓存ConsistentHashSelector*/private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();("unchecked")protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {String methodName = RpcUtils.getMethodName(invocation);String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;// 获取invokers的原始hashcode值int identityHashCode = System.identityHashCode(invokers);ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);// 缓存中没有对应的selector实例,或者invoker列表更新了,新增或者减少了Invoker实例,invokers的hash值也会改变,那么selector.identityHashCode != identityHashCode就成立if (selector == null || selector.identityHashCode != identityHashCode) {// 创建新的ConsistentHashSelector对象selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode));selector = (ConsistentHashSelector<T>) selectors.get(key);}// 调用ConsistentHashSelector的select方法选择Invokerreturn selector.select(invocation);}}
该方法主要是创建ConsistentHashSelector实例,首先生成selectors缓存的key,然后获取invokers列表的hashcode值,通过判断缓存中对应的ConsistentHashSelector是否存在,以及判断invokers列表是否变更了, 来决定是否创建ConsistentHashSelector实例。最后通过调用ConsistentHashSelector的select方法选择Invoker实例。所以这个算法的实现逻辑主要实现在ConsistentHashSelector中,ConsistentHashSelector是ConsistentHashLoaderBalance的内部类,首先来看下它的构造方法。
private static final class ConsistentHashSelector<T> {// 存储虚拟节点和invoker实例的映射private final TreeMap<Long, Invoker<T>> virtualInvokers;// 虚拟接待的个数private final int replicaNumber;// invokes列表的hashcode值private final int identityHashCode;// 进行hash计算的参数下标private final int[] argumentIndex;ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {this.virtualInvokers = new TreeMap<Long, Invoker<T>>();this.identityHashCode = identityHashCode;URL url = invokers.get(0).getUrl();// 获取虚拟节点的个数,缺省配置为160个this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160);// 获取进行hash的参数,默认选择第一个参数String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0"));argumentIndex = new int[index.length];for (int i = 0; i < index.length; i++) {argumentIndex[i] = Integer.parseInt(index[i]);}// 循环每个invoker,计算出它在hash环上的位置,并将它放入到virtualInvokers缓存for (Invoker<T> invoker : invokers) {// 获取提供者地址String address = invoker.getUrl().getAddress();for (int i = 0; i < replicaNumber / 4; i++) {// 对address+i进行md5运算,得到一个长度为16的数组byte[] digest = md5(address + i);for (int h = 0; h < 4; h++) {// 每次拿数组中的4个byte数进行位运算long m = hash(digest, h);// 将算出来的hash值作为key,当前invoker作为value,放入virtualInvokers缓存中。virtualInvokers.put(m, invoker);}}}}}
ConsistentHashSelector的构造函数主要是初始化virtualInvokers。在处理每个invoker,将它映射到虚拟节点的处理中,会获取提供者的值,然后计算replicaNumber / 4次address + i的md5值,md5值一个由16个byte数组成的数组。然后每次拿4个byte数进行hash计算,得出一个hash值,作为这个虚拟节点的值,并将这个hash值作为key,invoker作为value放入到virtualInvokers缓冲中。因为生成md5次的次数是replicaNumber / 4,每个md5值会每4个数计算一次hash值,所以最后也会计算出replicaNumber个hash值,相当于一个invoker被投射到了replicaNumber个虚拟节点。
public Invoker<T> select(Invocation invocation) {String key = toKey(invocation.getArguments());byte[] digest = md5(key);return selectForKey(hash(digest, 0));}
首先将请求的方法参数转换成key,比如sayHello方法的参数为[aaa, bbb],且参与hash计算的参数下标为[0, 1],那么生成的key值为aaabbb。然后会计算这个key值的md5值,还是由16个byte数组成的数组。然后取这个数组的前4个数进行hash计算得到一个hash值,然后将这个hash值作为key调用selectForKey方法选择virtualInvokers中的一个Invoker实例。
private Invoker<T> selectForKey(long hash) {// 返回等于hash值,或者所有key中最小的但是大于hash值的key的值Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash);if (entry == null) {// 如果上述情况的key不存在,也就是说hash值大于所有的key值,则返回第一个元素entry = virtualInvokers.firstEntry();}return entry.getValue();}
selectForKey根据hash值寻找virtualInvokers中符合条件的Invoker实例。
ConsistentHashLoadbalance的实现只和请求的参数值有关系,具有相同的参数值的请求会被分配到同一个服务提供者,因为所有的Invoker实例被映射到虚拟节点的位置是确认的(不是固定不变的,因为如果invokers列表发生变化,相应的ConsistentHashSelector也会重新生成),相同参数的请求生成的hash值也是相同的,所以最后这个请求落到那个虚拟节点也是确定的。这个负载均衡的实现不关系权重。
2.4 RoundRobinLoadBalance源码实现
RoundRobinLoadBalance是基于加权轮询算法的实现。轮询算法的基本思想: 轮询就是将请求轮流分配给每台服务器,例如有[A, B, C]三台提供者,第一个请求分配给A,第二个请求分配给B,第三个请求分配个C,第四个请求分配给A,这就是轮询。轮询是一种无状态负载均衡算法,通常用于每台服务器性能相近的场景。但是实际场景中,并不能保证每台服务器性能均相近,如果将等量的请求分配给性能较差的服务器,这显然是不合理的。因此需要对轮询过程进行加权,以调控每台服务的负载。
下面分析RoundRobinLoadBalance的源码实现。
public class RoundRobinLoadBalance extends AbstractLoadBalance {public static final String NAME = "roundrobin";// 清除过期数据的周期private static final int RECYCLE_PERIOD = 60000;// 封装每个invoker的权重protected static class WeightedRoundRobin {// invoker配置的权重值private int weight;// 并发场景下当前invoker会被同时选中,表示该节点被所有线程选中的权重之和private AtomicLong current = new AtomicLong(0);// 最后一次更新的时间,用户后续缓存超时的判断private long lastUpdate;public int getWeight() {return weight;}public void setWeight(int weight) {this.weight = weight;current.set(0);}public long increaseCurrent() {return current.addAndGet(weight);}public void sel(int total) {current.addAndGet(-1 * total);}public long getLastUpdate() {return lastUpdate;}public void setLastUpdate(long lastUpdate) {this.lastUpdate = lastUpdate;}}// 存储method权重容器,第一层map的key=接口全限定名 + "." + 方法名,第二层map的key为invoke的url生成的IdentityStringprivate ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>();// 更新锁private AtomicBoolean updateLock = new AtomicBoolean();@Overrideprotected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);if (map == null) {methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>());map = methodWeightMap.get(key);}int totalWeight = 0;long maxCurrent = Long.MIN_VALUE;long now = System.currentTimeMillis();Invoker<T> selectedInvoker = null;WeightedRoundRobin selectedWRR = null;for (Invoker<T> invoker : invokers) {// 从缓存中获取WeightedRoundRobin实例String identifyString = invoker.getUrl().toIdentityString();WeightedRoundRobin weightedRoundRobin = map.get(identifyString);// 计算invoker的权重int weight = getWeight(invoker, invocation);// 如果weightedRoundRobin,创建weightedRoundRobin实例,并设置权重值if (weightedRoundRobin == null) {weightedRoundRobin = new WeightedRoundRobin();weightedRoundRobin.setWeight(weight);map.putIfAbsent(identifyString, weightedRoundRobin);}// 如果计算出来的权重,和weightedRoundRobin中的权重值不相等,说明权重发生了变化,更新if (weight != weightedRoundRobin.getWeight()) {//weight changedweightedRoundRobin.setWeight(weight);}// 获取当前weightedRoundRobin的总权重值long cur = weightedRoundRobin.increaseCurrent();// 更新weightedRoundRobin的lastUpdate为当前时间weightedRoundRobin.setLastUpdate(now);// 如果weightedRoundRobin的总权重值大于maxCurrent,更新maxCurrent为curif (cur > maxCurrent) {maxCurrent = cur;// 选中当前invokerselectedInvoker = invoker;// 选中当前weightedRoundRobin实例selectedWRR = weightedRoundRobin;}// 更新总权重totalWeight += weight;}// 更新缓存数据,获取updateLock锁并且invoker的size不等于map的sizeif (!updateLock.get() && invokers.size() != map.size()) {// 通过cas操作,获取锁if (updateLock.compareAndSet(false, true)) {try {// 定义newMap为新的容器,将原map容器所有元素放入newMapConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<String, WeightedRoundRobin>();newMap.putAll(map);// newMap中的每个Entry,当前时间减去上一次更新时间大于60s的元素,将被清除Iterator<Entry<String, WeightedRoundRobin>> it = newMap.entrySet().iterator();while (it.hasNext()) {Entry<String, WeightedRoundRobin> item = it.next();if (now - item.getValue().getLastUpdate() > RECYCLE_PERIOD) {it.remove();}}// 根据methodWeightMap中key的valuemethodWeightMap.put(key, newMap);} finally {// 释放锁updateLock.set(false);}}}// 最终要选择的invoker,将选中的invoker的current值减掉totalWeightif (selectedInvoker != null) {selectedWRR.sel(totalWeight);return selectedInvoker;}// should not happen herereturn invokers.get(0);}}
该方法的逻辑总体分为以下几个步骤:
根据请求方法的key获取ConcurrentMap<String, WeightedRoundRobin>缓存,没有的话,创建。
循环invokers列表
根据url的IdentityString获取WeightedRoundRobin实例,如果没有的话,创建。
如果计算出来的invoker权重不等于WeightedRoundRobin实例的权重,说明权重变化了, 更新。
WeightedRoundRobin的current值累加weight值。
更新lastupdated值为当前时间。
选择具有最大current值的invoker和对应的WeightedRoundRobin实例。
更新invokers列表权重之和
更新当前key值对应的ConcurrentMap<String, WeightedRoundRobin>数据。过滤掉长时间没更新的数据,ConcurrentMap<String, WeightedRoundRobin>这个map的key和提供者信息相关,如果该提供者挂掉了, 那么对应的WeightedRoundRobin数据就不能更新, 将会被清除掉,默认时间为60s。
返回选中的invoker实例。
参考资料:
http://dubbo.apache.org/zh/docs/v2.7/dev/source/loadbalance/
