dubbo学习-负载均衡源码剖析
1. 负载均衡介绍
负载均衡是集群服务中非常主要的组成部分,负载均衡可以通过将请求均摊到每台机器,从而避免了某台机器忙不过来,而某台机器闲的一批的状态。负载均衡分为软件负载均衡和硬件负载均衡,硬件负载均衡在日常开发中一般也接触不到,但是软件负载均衡是可以接触到的,比如Nignx。Dubbo中也提供了负载均衡的实现,避免某些服务提供者接收的请求过大,负载过高,从而导致请求超时,因此将请求通过负载均衡到每个提供者上是非常有必要的。
2. 源码分析
Dubbo中提供了四种负载均衡实现。
基于加权的随机算法—RandomLoadbalance。
基于最少活跃调用算法—LeastActiveLoadbalance。
基于一致性哈希算法—ConsistentHashLoadBalance。
基于加权轮询算法—RoundRobinLoadBalance。
这四个负载均衡实现类都继承了AbstractLoadBalance类,并重写了doSelect方法,各个负载均衡算法在这个方法中体现。AbstactLoaderBalace中提供了通用的getWeight方法,用于计算提供者的权重,下面我们来看下getWeight方法的实现。
// 获取提供者的权重,这里考虑了预热时间,如果正常运行时间在预热时间之内,权重将按比例减少
protected int getWeight(Invoker<?> invoker, Invocation invocation) {
// 获取配置的权重值 ①
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), WEIGHT_KEY, DEFAULT_WEIGHT);
if (weight > 0) {
// 获取timestramp的值,默认为0
long timestamp = invoker.getUrl().getParameter(TIMESTAMP_KEY, 0L); // ②
if (timestamp > 0L) {
// 计算当前invoker已经启动了多久
int uptime = (int) (System.currentTimeMillis() - timestamp); // ③
// 预热时间
int warmup = invoker.getUrl().getParameter(WARMUP_KEY, DEFAULT_WARMUP);
if (uptime > 0 && uptime < warmup) { // ④
// 计算权重
weight = calculateWarmupWeight(uptime, warmup, weight);
}
}
}
// 返回权重值 ⑤
return weight >= 0 ? weight : 0;
}
①: 获取方法配置weight值作为权重值,缺省配置为100。
②: 获取timestamp的值,默认为0。
③: 当前时间减去timestamp的值为服务启动时间。
④: 获取配置的wramup值,作为服务预热时间。启动时间大于0,且启动时间小于预热时间,调用calculateWarmupWeight方法计算权重。
⑤: 返回权重值,权重值最小为0。
getWeight方法第7行代码,在dubbo2.7.4.1版本之前,是有bug的。在2.7.4.1版本之前,invoker的URL信息中是没有remote.timestamp这个key的,所以导致timestamp一直都是0,也就是说getWeight方法一直返回的都是0,相关说明。
static int calculateWarmupWeight(int uptime, int warmup, int weight) {
int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
return ww < 1 ? 1 : (ww > weight ? weight : ww);
}
根据最后三目运算公式可知返回的权重值在[1, weight]
之间,分析下计算过程,首先warmup和weight都是读取配置来的,且都有缺省值,所以warmup和weight都是常量,那么warmup / weight
也是个常量。ww = uptime / (warmup / weight)
, 那么uptime的值越大,ww的值也就越大,ww越大的话,最后返回的结果就越接近于weight值。
2.1 RandomLoadbalance源码实现
RandomLoadbalance是基于加权的随机算法实现,算法的基本思想: 假设有3台服务器 A、B、C,对应的权重为 5、3、 2,权重总和为10。把这个比例映射到[0, 10)
的区间,那么[0, 5)
为A,[5, 8)
为B,(8, 10]为C。那么生成一个[0, 10)
的随机数,这个数落到那个区间,对应就是选择那个服务器。区间跨度越大,随机数落到这个区间的概率也就越大。经过大量的选择之后,各个服务器被选择的次数会约等于各自的比例。
下面看下RandomLoadbalance的源码实现。
public class RandomLoadBalance extends AbstractLoadBalance {
public static final String NAME = "random";
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// Invoker的个数
int length = invokers.size();
// 表示每个invoker是否具有相同的权重
boolean sameWeight = true;
// 记录每个invoker的权重
int[] weights = new int[length];
// 选算出第一个Invoker的权重值
int firstWeight = getWeight(invokers.get(0), invocation);
weights[0] = firstWeight;
// 权重之和
int totalWeight = firstWeight;
for (int i = 1; i < length; i++) {
// 计算权重值
int weight = getWeight(invokers.get(i), invocation);
// 将权重值存储在weights数组中
weights[i] = weight;
// 求和
totalWeight += weight;
// 当前weight != firstWeight,则sameWeight为false
if (sameWeight && weight != firstWeight) {
sameWeight = false;
}
}
// 只有所有的Invoker的权重不相等,且总权重之和大于零,基于总权重之后随机选择一个Invoker
if (totalWeight > 0 && !sameWeight) {
// 获取[0, totalWeight)间的随机数
int offset = ThreadLocalRandom.current().nextInt(totalWeight);
// Return a invoker based on the random value.
for (int i = 0; i < length; i++) {
offset -= weights[i];
// offset 减去 weights[i] < 0,说明offset在第i个invoker权重所处的区间
if (offset < 0) {
return invokers.get(i);
}
}
}
// 如果所有的invoker权重相等,则随便返回那个都可以
return invokers.get(ThreadLocalRandom.current().nextInt(length));
}
}
RandomLoadbalance的源码分析到此结束,实现相对比较简单,代码中也加了注释帮助理解。
2.2 LeastActiveLoadbalance源码实现
LeastActiveLoadbalance是基于最小活跃数算法的实现。算法的基本思想: 活跃调用数越小,说明服务提供者效率提高,单位时间内可以处理更多的请求,那么应该优先将请求分配给该服务提供者。在具体实现中,每个服务提供者对应一个活跃数active,初始情况下,所有的服务提供者active都为0。每收到一个请求,活跃数加1,完成请求后活跃数减1。在服务运行一段时间之后,性能好的服务提供者处理请求的速度更快,因此活跃数减小的也越快,此时这样的服务提供者应该优先获取到新的请求。
下面看下LeastActiveLoadbalance的源码实现。
public class LeastActiveLoadBalance extends AbstractLoadBalance {
public static final String NAME = "leastactive";
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// invoker的个数
int length = invokers.size();
// 最小活跃调用数的值
int leastActive = -1;
// 相同最小活跃调用数的invoker个数
int leastCount = 0;
// 记录相同最小活跃调用数invoker所在下标的数组
int[] leastIndexes = new int[length];
// 记录每个invoker权重的数组
int[] weights = new int[length];
// 相同最小活跃调用数invoker的权重和
int totalWeight = 0;
// 第一个具有最小活跃调用数的invoker权重
int firstWeight = 0;
// 相同最小活跃调用数的invoker权重是否相等
boolean sameWeight = true;
// 过滤出具有最小活跃调用数的Invoker
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
// 获取invoker的active值
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
// 获取invoker权重值,默认为100
int afterWarmup = getWeight(invoker, invocation);
// 记录权重
weights[i] = afterWarmup;
// 如果是第一个invoker,或者active小于当前leastActive
if (leastActive == -1 || active < leastActive) {
// 更新leastActive为当前invoker的active
leastActive = active;
// 更新leastCount值为1
leastCount = 1;
// 把第一个具有最小活跃调用数的invoke放在数组首位
leastIndexes[0] = i;
// 更新相同最小活跃调用数invoker的权重和
totalWeight = afterWarmup;
// 更新第一个具有最小活跃调用数的invoker权重
firstWeight = afterWarmup;
// 更新相同调用数的invoker是否相等,这里为true,因为是最小的active值,所以肯定是一个active,如果是相等的active走下面的分支
sameWeight = true;
} else if (active == leastActive) { // 如果当前invoker的active等于leastActive,则进行累加
// 记录这个invoker的下标
leastIndexes[leastCount++] = i;
// 累加具有相同最小活跃调用数invoker的权重值
totalWeight += afterWarmup;
// 当前invoker的权重是否等于第一个具有最小活跃调用数Invoker的权重(firstWeight)
// 如果不相等,说明相同最小活跃调用数的invoker权重不相等。
if (sameWeight && i > 0 && afterWarmup != firstWeight) {
sameWeight = false;
}
}
}
// 如果具有最小活跃调用数的invoke只有一个,那么直接返回这个invoker就可以
if (leastCount == 1) {
return invokers.get(leastIndexes[0]);
}
if (!sameWeight && totalWeight > 0) { // 相同最小活跃调用数不具有相同权重,且总权重大于0
// 获取[0, totalWeight)区间的随机数
int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
// Return a invoker based on the random value.
for (int i = 0; i < leastCount; i++) {
// 获取第i个具有最小活跃调用数invoker在invokers列表的下标
int leastIndex = leastIndexes[i];
// offset 减去 weights[i] < 0,说明offset在第i个invoker权重所处的区间
offsetWeight -= weights[leastIndex];
if (offsetWeight < 0) {
return invokers.get(leastIndex);
}
}
}
// 如果相同最小活跃调用的invoker权重都相等,则随便返回哪一个都可以
return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
}
}
这个方法的实现,总体分为两步,首先筛选出拥有最小活跃调用数的invoker,可能会有多个,所以用leastIndexes记录这些invoker的下标。第二部分是从具有相同最小活跃调用数的invoker列表中选择一个invoker,如果只有一个,则返回这个invoker即可;如果有多个,且不具有相同的权重,且它们的总权重大于0,则基于总权重选择其中一个;如果权重都一样,则选择这些invoker中的任意一个。这个实现思路和RandomLoadbalance相似。
2.3 ConsistentHashLoaderBalance源码实现
ConsistentHashLoaderBalance是基于一致性hash算法的实现。算法的基本思想: 选择一个参数(ip或者请求参数)为缓存节点生成一个hash,并将这个hash投射到[0, 232-1]的圆环上。当有请求过来时,则为请求中的缓存项的key值生成一个hash,然后查找第一个大于或等于该hash值的缓存节点,这个节点就作为最后的服务节点。如果当前节点挂了,查找下一个大于该hash值的节点即可。效果如图所示:
在dubbo中,引入了虚拟节点的概念,通过引入虚拟节点,将提供者在圆环上分散开来,避免了数据倾斜问题,数据倾斜是指,由于节点不够分散,导致大量请求落到了同一个节点,而其它几点值会收到少量请求的情况。
下面开始分析ConsistentHashLoaderBalance的源码。
public class ConsistentHashLoadBalance extends AbstractLoadBalance {
public static final String NAME = "consistenthash";
/**
* hash节点的名称
*/
public static final String HASH_NODES = "hash.nodes";
/**
* 进行hash的参数名
*/
public static final String HASH_ARGUMENTS = "hash.arguments";
/**
* 缓存ConsistentHashSelector
*/
private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();
"unchecked") (
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String methodName = RpcUtils.getMethodName(invocation);
String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;
// 获取invokers的原始hashcode值
int identityHashCode = System.identityHashCode(invokers);
ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
// 缓存中没有对应的selector实例,或者invoker列表更新了,新增或者减少了Invoker实例,invokers的hash值也会改变,那么selector.identityHashCode != identityHashCode就成立
if (selector == null || selector.identityHashCode != identityHashCode) {
// 创建新的ConsistentHashSelector对象
selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode));
selector = (ConsistentHashSelector<T>) selectors.get(key);
}
// 调用ConsistentHashSelector的select方法选择Invoker
return selector.select(invocation);
}
}
该方法主要是创建ConsistentHashSelector实例,首先生成selectors缓存的key,然后获取invokers列表的hashcode值,通过判断缓存中对应的ConsistentHashSelector是否存在,以及判断invokers列表是否变更了, 来决定是否创建ConsistentHashSelector实例。最后通过调用ConsistentHashSelector的select方法选择Invoker实例。所以这个算法的实现逻辑主要实现在ConsistentHashSelector中,ConsistentHashSelector是ConsistentHashLoaderBalance的内部类,首先来看下它的构造方法。
private static final class ConsistentHashSelector<T> {
// 存储虚拟节点和invoker实例的映射
private final TreeMap<Long, Invoker<T>> virtualInvokers;
// 虚拟接待的个数
private final int replicaNumber;
// invokes列表的hashcode值
private final int identityHashCode;
// 进行hash计算的参数下标
private final int[] argumentIndex;
ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
this.identityHashCode = identityHashCode;
URL url = invokers.get(0).getUrl();
// 获取虚拟节点的个数,缺省配置为160个
this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160);
// 获取进行hash的参数,默认选择第一个参数
String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0"));
argumentIndex = new int[index.length];
for (int i = 0; i < index.length; i++) {
argumentIndex[i] = Integer.parseInt(index[i]);
}
// 循环每个invoker,计算出它在hash环上的位置,并将它放入到virtualInvokers缓存
for (Invoker<T> invoker : invokers) {
// 获取提供者地址
String address = invoker.getUrl().getAddress();
for (int i = 0; i < replicaNumber / 4; i++) {
// 对address+i进行md5运算,得到一个长度为16的数组
byte[] digest = md5(address + i);
for (int h = 0; h < 4; h++) {
// 每次拿数组中的4个byte数进行位运算
long m = hash(digest, h);
// 将算出来的hash值作为key,当前invoker作为value,放入virtualInvokers缓存中。
virtualInvokers.put(m, invoker);
}
}
}
}
}
ConsistentHashSelector的构造函数主要是初始化virtualInvokers。在处理每个invoker,将它映射到虚拟节点的处理中,会获取提供者的值,然后计算replicaNumber / 4
次address + i
的md5值,md5值一个由16个byte数组成的数组。然后每次拿4个byte数进行hash计算,得出一个hash值,作为这个虚拟节点的值,并将这个hash值作为key,invoker作为value放入到virtualInvokers缓冲中。因为生成md5次的次数是replicaNumber / 4
,每个md5值会每4个数计算一次hash值,所以最后也会计算出replicaNumber个hash值,相当于一个invoker被投射到了replicaNumber个虚拟节点。
public Invoker<T> select(Invocation invocation) {
String key = toKey(invocation.getArguments());
byte[] digest = md5(key);
return selectForKey(hash(digest, 0));
}
首先将请求的方法参数转换成key,比如sayHello方法的参数为[aaa, bbb],且参与hash计算的参数下标为[0, 1],那么生成的key值为aaabbb。然后会计算这个key值的md5值,还是由16个byte数组成的数组。然后取这个数组的前4个数进行hash计算得到一个hash值,然后将这个hash值作为key调用selectForKey方法选择virtualInvokers中的一个Invoker实例。
private Invoker<T> selectForKey(long hash) {
// 返回等于hash值,或者所有key中最小的但是大于hash值的key的值
Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash);
if (entry == null) {
// 如果上述情况的key不存在,也就是说hash值大于所有的key值,则返回第一个元素
entry = virtualInvokers.firstEntry();
}
return entry.getValue();
}
selectForKey根据hash值寻找virtualInvokers中符合条件的Invoker实例。
ConsistentHashLoadbalance的实现只和请求的参数值有关系,具有相同的参数值的请求会被分配到同一个服务提供者,因为所有的Invoker实例被映射到虚拟节点的位置是确认的(不是固定不变的,因为如果invokers列表发生变化,相应的ConsistentHashSelector也会重新生成),相同参数的请求生成的hash值也是相同的,所以最后这个请求落到那个虚拟节点也是确定的。这个负载均衡的实现不关系权重。
2.4 RoundRobinLoadBalance源码实现
RoundRobinLoadBalance是基于加权轮询算法的实现。轮询算法的基本思想: 轮询就是将请求轮流分配给每台服务器,例如有[A, B, C]三台提供者,第一个请求分配给A,第二个请求分配给B,第三个请求分配个C,第四个请求分配给A,这就是轮询。轮询是一种无状态负载均衡算法,通常用于每台服务器性能相近的场景。但是实际场景中,并不能保证每台服务器性能均相近,如果将等量的请求分配给性能较差的服务器,这显然是不合理的。因此需要对轮询过程进行加权,以调控每台服务的负载。
下面分析RoundRobinLoadBalance的源码实现。
public class RoundRobinLoadBalance extends AbstractLoadBalance {
public static final String NAME = "roundrobin";
// 清除过期数据的周期
private static final int RECYCLE_PERIOD = 60000;
// 封装每个invoker的权重
protected static class WeightedRoundRobin {
// invoker配置的权重值
private int weight;
// 并发场景下当前invoker会被同时选中,表示该节点被所有线程选中的权重之和
private AtomicLong current = new AtomicLong(0);
// 最后一次更新的时间,用户后续缓存超时的判断
private long lastUpdate;
public int getWeight() {
return weight;
}
public void setWeight(int weight) {
this.weight = weight;
current.set(0);
}
public long increaseCurrent() {
return current.addAndGet(weight);
}
public void sel(int total) {
current.addAndGet(-1 * total);
}
public long getLastUpdate() {
return lastUpdate;
}
public void setLastUpdate(long lastUpdate) {
this.lastUpdate = lastUpdate;
}
}
// 存储method权重容器,第一层map的key=接口全限定名 + "." + 方法名,第二层map的key为invoke的url生成的IdentityString
private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>();
// 更新锁
private AtomicBoolean updateLock = new AtomicBoolean();
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
if (map == null) {
methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>());
map = methodWeightMap.get(key);
}
int totalWeight = 0;
long maxCurrent = Long.MIN_VALUE;
long now = System.currentTimeMillis();
Invoker<T> selectedInvoker = null;
WeightedRoundRobin selectedWRR = null;
for (Invoker<T> invoker : invokers) {
// 从缓存中获取WeightedRoundRobin实例
String identifyString = invoker.getUrl().toIdentityString();
WeightedRoundRobin weightedRoundRobin = map.get(identifyString);
// 计算invoker的权重
int weight = getWeight(invoker, invocation);
// 如果weightedRoundRobin,创建weightedRoundRobin实例,并设置权重值
if (weightedRoundRobin == null) {
weightedRoundRobin = new WeightedRoundRobin();
weightedRoundRobin.setWeight(weight);
map.putIfAbsent(identifyString, weightedRoundRobin);
}
// 如果计算出来的权重,和weightedRoundRobin中的权重值不相等,说明权重发生了变化,更新
if (weight != weightedRoundRobin.getWeight()) {
//weight changed
weightedRoundRobin.setWeight(weight);
}
// 获取当前weightedRoundRobin的总权重值
long cur = weightedRoundRobin.increaseCurrent();
// 更新weightedRoundRobin的lastUpdate为当前时间
weightedRoundRobin.setLastUpdate(now);
// 如果weightedRoundRobin的总权重值大于maxCurrent,更新maxCurrent为cur
if (cur > maxCurrent) {
maxCurrent = cur;
// 选中当前invoker
selectedInvoker = invoker;
// 选中当前weightedRoundRobin实例
selectedWRR = weightedRoundRobin;
}
// 更新总权重
totalWeight += weight;
}
// 更新缓存数据,获取updateLock锁并且invoker的size不等于map的size
if (!updateLock.get() && invokers.size() != map.size()) {
// 通过cas操作,获取锁
if (updateLock.compareAndSet(false, true)) {
try {
// 定义newMap为新的容器,将原map容器所有元素放入newMap
ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<String, WeightedRoundRobin>();
newMap.putAll(map);
// newMap中的每个Entry,当前时间减去上一次更新时间大于60s的元素,将被清除
Iterator<Entry<String, WeightedRoundRobin>> it = newMap.entrySet().iterator();
while (it.hasNext()) {
Entry<String, WeightedRoundRobin> item = it.next();
if (now - item.getValue().getLastUpdate() > RECYCLE_PERIOD) {
it.remove();
}
}
// 根据methodWeightMap中key的value
methodWeightMap.put(key, newMap);
} finally {
// 释放锁
updateLock.set(false);
}
}
}
// 最终要选择的invoker,将选中的invoker的current值减掉totalWeight
if (selectedInvoker != null) {
selectedWRR.sel(totalWeight);
return selectedInvoker;
}
// should not happen here
return invokers.get(0);
}
}
该方法的逻辑总体分为以下几个步骤:
根据请求方法的key获取ConcurrentMap<String, WeightedRoundRobin>缓存,没有的话,创建。
循环invokers列表
根据url的IdentityString获取WeightedRoundRobin实例,如果没有的话,创建。
如果计算出来的invoker权重不等于WeightedRoundRobin实例的权重,说明权重变化了, 更新。
WeightedRoundRobin的current值累加weight值。
更新lastupdated值为当前时间。
选择具有最大current值的invoker和对应的WeightedRoundRobin实例。
更新invokers列表权重之和
更新当前key值对应的ConcurrentMap<String, WeightedRoundRobin>数据。过滤掉长时间没更新的数据,ConcurrentMap<String, WeightedRoundRobin>这个map的key和提供者信息相关,如果该提供者挂掉了, 那么对应的WeightedRoundRobin数据就不能更新, 将会被清除掉,默认时间为60s。
返回选中的invoker实例。
参考资料:
http://dubbo.apache.org/zh/docs/v2.7/dev/source/loadbalance/