负载均衡 一致性Hash算法
从实际出发,研究在开源软件中已实现的算法。本文具体分析负载均衡-一致性Hash算法,基于dubbo的ConsistentHashLoadBalance源码。
一致性Hash算法需要ConsistentHashSelector的数据结构, 内部使用TreeMap来存储Invoker和其对应的HashCode值,为了保证请求的均匀调用,一般都采用一个Invoker映射为多个虚拟节点的方式,好处很多,自行搜索。当有请求到达时,根据请求的参数来计算本次请求的Hash值,可以自行定义选择N个参数来参与Hash值的计算,相同参数的请求可以定位至同一个Invoke上(前提是Invoker列表未发生变化)
10个Invoker在Hash环上的分布情况:
分布可能十分不均匀,造成某些服务器压力过大。
增加多个虚拟Invoker后的分布情况:
仅画出了Invoker1的分布情况
测试代码:
准备多个Invoker, 每个Invoker设置具体的名字,权重,调用次数
启动多个线程,每个线程分别多次调用selectInvoker方法
每次选择完成后,更新调用次数+1
全部调用完成后,输出每个Invoker的调用次数
设置同样的请求参数,打印每次选择的Invoker name
上代码
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
public class ConsistentHashLoadBalance {
public static ThreadLocal<MessageDigest> MD5 = new ThreadLocal<>();
public static final String NAME = "consistenthash";
/**
* Hash nodes name
*/
public static final String HASH_NODES = "hash.nodes";
/**
* Hash arguments name
*/
public static final String HASH_ARGUMENTS = "hash.arguments";
/**
* 参与Hash计算的方法参数的索引值, 只有在其中的索引参数才会参与hash值计算
* 本次测试假设第0个、第2个参数参与Hash值计算
*/
public static final int[] HASH_PARAMETER_INDEX = new int[] {0, 2};
private final ConcurrentMap<String, ConsistentHashSelector> selectors = new ConcurrentHashMap<>();
protected Invoker doSelect(List<Invoker> invokers, String methodName, Object[] arguments) {
String key = methodName;
// using the hashcode of list to compute the hash only pay attention to the elements in the list
int invokersHashCode = invokers.hashCode();
ConsistentHashSelector selector = selectors.get(key);
// 对应的ConsistentHashSelector不存在, 或invokers有变化, 则更新ConsistentHashSelector
// 本次实验中invokers是固定的, 所以只有第一次调用会设置ConsistentHashSelector
if (selector == null || selector.identityHashCode != invokersHashCode) {
selectors.put(key, new ConsistentHashSelector(invokers, methodName, invokersHashCode));
selector = selectors.get(key);
}
return selector.select(arguments);
}
/**
* 获取MessageDigest, 注意此类不是线程安全的, 使用ThreadLocal来获取
* @return
*/
public static MessageDigest getMD5() {
MessageDigest md5 = MD5.get();
if (md5 == null) {
try {
md5 = MessageDigest.getInstance("MD5");
MD5.set(md5);
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
}
}
return md5;
}
private static final class ConsistentHashSelector {
/**
* 使用TreeMap来存储digest+Invoker, 主要是利用cellingEntry方法快速查询Invoker
*/
private final TreeMap<Long, Invoker> virtualInvokers;
/**
* 副本数目, 在这表示虚拟Invoker的数量
*/
private final int replicaNumber = 160;
private final int identityHashCode;
private final int[] argumentIndex;
ConsistentHashSelector(List<Invoker> invokers, String methodName, int identityHashCode) {
this.virtualInvokers = new TreeMap<>();
this.identityHashCode = identityHashCode;
argumentIndex = HASH_PARAMETER_INDEX;
for (Invoker invoker : invokers) {
// 使用Invoker唯一的名称来替代地址, 作为区分
String address = invoker.name;
// 创建 replicaNumber / 4 * 4 个虚拟节点, 本例中每个Invoker都创建160个虚拟节点
for (int i = 0; i < replicaNumber / 4; i++) {
byte[] digest = getMD5().digest((address + i).getBytes(StandardCharsets.UTF_8));
for (int h = 0; h < 4; h++) {
long m = hash(digest, h);
virtualInvokers.put(m, invoker);
}
}
}
}
/**
* 因为我们只测试一个方法, 所以直接传递参数来选择Invoker
* @param arguments
* @return
*/
public Invoker select(Object[] arguments) {
String key = toKey(arguments);
byte[] digest = getMD5().digest(key.getBytes(StandardCharsets.UTF_8));
return selectForKey(hash(digest, 0));
}
/**
* 得到HashKey
* @param args
* @return
*/
private String toKey(Object[] args) {
StringBuilder buf = new StringBuilder();
// 在这里我们只选择argumentIndex中的参数作为key参与Hash值的计算
for (int i : argumentIndex) {
if (i >= 0 && i < args.length) {
// 一般来说, 对应的参数都应该重写toString方法
buf.append(args[i]);
}
}
return buf.toString();
}
/**
* 根据Hash值选择Invoker
* @param hash
* @return
*/
private Invoker selectForKey(long hash) {
// 在虚拟节点对应的TreeMap中选择刚大于hash值的Invoker
Map.Entry<Long, Invoker> entry = virtualInvokers.ceilingEntry(hash);
if (entry == null) {
// 如果不存在, hash值大于最大的虚拟节点的hash值, 则应该选择第一个Invoker
entry = virtualInvokers.firstEntry();
}
return entry.getValue();
}
/**
* 计算Hash值
* @param digest
* @param number
* @return
*/
private long hash(byte[] digest, int number) {
/**
* 在初始化虚拟Invoker的时候, number分别为 0,1,2,3
* 也就是说分别以 0~3, 4~7, 8~11, 12~15 的byte来计算hash值
* 保证4次计算结果比较分散
*/
return (((long) (digest[3 + number * 4] & 0xFF) << 24)
| ((long) (digest[2 + number * 4] & 0xFF) << 16)
| ((long) (digest[1 + number * 4] & 0xFF) << 8)
| (digest[number * 4] & 0xFF))
& 0xFFFFFFFFL;
}
}
public static void main(String[] args) {
ConsistentHashLoadBalance consistentHashLoadBalance = new ConsistentHashLoadBalance();
// 初始化10个Invoker, 命名 Service i, Weight i; i -> [1, 10]
List<Invoker> invokers = new ArrayList<>(10);
for (int i = 1; i <= 10; i++) {
invokers.add(new Invoker("Service " + i, i));
}
CountDownLatch cdl = new CountDownLatch(10);
// 启动10个线程, 每个线程调用10万次, 共调用100万次
for (int i = 0; i < 10; i++) {
new Thread(() -> {
Object[] arguments = new Object[4];
for (int j = 0; j < 100000; j++) {
// UUID随机生成4个参数
arguments[0] = UUID.randomUUID().toString();
arguments[1] = UUID.randomUUID().toString();
arguments[2] = UUID.randomUUID().toString();
arguments[3] = UUID.randomUUID().toString();
Invoker selected = consistentHashLoadBalance.doSelect(invokers, "defaultMethod", arguments);
// 统计调用次数
selected.invokeCount.incrementAndGet();
}
cdl.countDown();
}).start();
}
try {
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 打印调用次数
for (Invoker invoker : invokers) {
System.out.println(invoker.name + ": invokeCount = " + invoker.invokeCount.get());
}
// 测试Hash一致性, 固定第0个、第2个参数, 第1个、第3个参数随机生成, 看是否一直选择同一个Invoker
String firstArg = UUID.randomUUID().toString();
String thirdArg = UUID.randomUUID().toString();
Object[] arguments = new Object[4];
arguments[0] = firstArg;
arguments[2] = thirdArg;
for (int i = 0; i < 20; i++) {
arguments[1] = UUID.randomUUID().toString();
arguments[3] = UUID.randomUUID().toString();
Invoker selected = consistentHashLoadBalance.doSelect(invokers, "defaultMethod", arguments);
System.out.println("当前选择的Invoker = " + selected.name);
}
}
}
输出结果
Service 1: invokeCount = 107767
Service 2: invokeCount = 90664
Service 3: invokeCount = 111901
Service 4: invokeCount = 97779
Service 5: invokeCount = 96242
Service 6: invokeCount = 97307
Service 7: invokeCount = 109512
Service 8: invokeCount = 102675
Service 9: invokeCount = 82314
Service 10: invokeCount = 103839
当前选择的Invoker = Service 8
当前选择的Invoker = Service 8
当前选择的Invoker = Service 8
当前选择的Invoker = Service 8
当前选择的Invoker = Service 8
当前选择的Invoker = Service 8
当前选择的Invoker = Service 8
当前选择的Invoker = Service 8
当前选择的Invoker = Service 8
当前选择的Invoker = Service 8
当前选择的Invoker = Service 8
当前选择的Invoker = Service 8
当前选择的Invoker = Service 8
当前选择的Invoker = Service 8
当前选择的Invoker = Service 8
当前选择的Invoker = Service 8
当前选择的Invoker = Service 8
当前选择的Invoker = Service 8
当前选择的Invoker = Service 8
当前选择的Invoker = Service 8
由结果可知,请求大致分配均匀,当增大虚拟节点个数,会更加均匀。
参数0、2不变时,每次请求返回的Invoker都一样。
附dubbo ConsistentHashLoadBalance源码 version 3.0.2
package org.apache.dubbo.rpc.cluster.loadbalance;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.io.Bytes;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.support.RpcUtils;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN;
/**
* ConsistentHashLoadBalance
*/
public class ConsistentHashLoadBalance extends AbstractLoadBalance {
public static final String NAME = "consistenthash";
/**
* Hash nodes name
*/
public static final String HASH_NODES = "hash.nodes";
/**
* Hash arguments name
*/
public static final String HASH_ARGUMENTS = "hash.arguments";
private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();
"unchecked") (
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String methodName = RpcUtils.getMethodName(invocation);
String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;
// using the hashcode of list to compute the hash only pay attention to the elements in the list
int invokersHashCode = invokers.hashCode();
ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
if (selector == null || selector.identityHashCode != invokersHashCode) {
selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, invokersHashCode));
selector = (ConsistentHashSelector<T>) selectors.get(key);
}
return selector.select(invocation);
}
private static final class ConsistentHashSelector<T> {
private final TreeMap<Long, Invoker<T>> virtualInvokers;
private final int replicaNumber;
private final int identityHashCode;
private final int[] argumentIndex;
ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
this.identityHashCode = identityHashCode;
URL url = invokers.get(0).getUrl();
this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160);
String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0"));
argumentIndex = new int[index.length];
for (int i = 0; i < index.length; i++) {
argumentIndex[i] = Integer.parseInt(index[i]);
}
for (Invoker<T> invoker : invokers) {
String address = invoker.getUrl().getAddress();
for (int i = 0; i < replicaNumber / 4; i++) {
byte[] digest = Bytes.getMD5(address + i);
for (int h = 0; h < 4; h++) {
long m = hash(digest, h);
virtualInvokers.put(m, invoker);
}
}
}
}
public Invoker<T> select(Invocation invocation) {
String key = toKey(invocation.getArguments());
byte[] digest = Bytes.getMD5(key);
return selectForKey(hash(digest, 0));
}
private String toKey(Object[] args) {
StringBuilder buf = new StringBuilder();
for (int i : argumentIndex) {
if (i >= 0 && i < args.length) {
buf.append(args[i]);
}
}
return buf.toString();
}
private Invoker<T> selectForKey(long hash) {
Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash);
if (entry == null) {
entry = virtualInvokers.firstEntry();
}
return entry.getValue();
}
private long hash(byte[] digest, int number) {
return (((long) (digest[3 + number * 4] & 0xFF) << 24)
| ((long) (digest[2 + number * 4] & 0xFF) << 16)
| ((long) (digest[1 + number * 4] & 0xFF) << 8)
| (digest[number * 4] & 0xFF))
& 0xFFFFFFFFL;
}
}
}