负载均衡 一致性Hash算法
从实际出发,研究在开源软件中已实现的算法。本文具体分析负载均衡-一致性Hash算法,基于dubbo的ConsistentHashLoadBalance源码。
一致性Hash算法需要ConsistentHashSelector的数据结构, 内部使用TreeMap来存储Invoker和其对应的HashCode值,为了保证请求的均匀调用,一般都采用一个Invoker映射为多个虚拟节点的方式,好处很多,自行搜索。当有请求到达时,根据请求的参数来计算本次请求的Hash值,可以自行定义选择N个参数来参与Hash值的计算,相同参数的请求可以定位至同一个Invoke上(前提是Invoker列表未发生变化)
10个Invoker在Hash环上的分布情况:
分布可能十分不均匀,造成某些服务器压力过大。
增加多个虚拟Invoker后的分布情况:
仅画出了Invoker1的分布情况
测试代码:
准备多个Invoker, 每个Invoker设置具体的名字,权重,调用次数
启动多个线程,每个线程分别多次调用selectInvoker方法
每次选择完成后,更新调用次数+1
全部调用完成后,输出每个Invoker的调用次数
设置同样的请求参数,打印每次选择的Invoker name
上代码
import java.nio.charset.StandardCharsets;import java.security.MessageDigest;import java.security.NoSuchAlgorithmException;import java.util.*;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ConcurrentMap;import java.util.concurrent.CountDownLatch;public class ConsistentHashLoadBalance {public static ThreadLocal<MessageDigest> MD5 = new ThreadLocal<>();public static final String NAME = "consistenthash";/*** Hash nodes name*/public static final String HASH_NODES = "hash.nodes";/*** Hash arguments name*/public static final String HASH_ARGUMENTS = "hash.arguments";/*** 参与Hash计算的方法参数的索引值, 只有在其中的索引参数才会参与hash值计算* 本次测试假设第0个、第2个参数参与Hash值计算*/public static final int[] HASH_PARAMETER_INDEX = new int[] {0, 2};private final ConcurrentMap<String, ConsistentHashSelector> selectors = new ConcurrentHashMap<>();protected Invoker doSelect(List<Invoker> invokers, String methodName, Object[] arguments) {String key = methodName;// using the hashcode of list to compute the hash only pay attention to the elements in the listint invokersHashCode = invokers.hashCode();ConsistentHashSelector selector = selectors.get(key);// 对应的ConsistentHashSelector不存在, 或invokers有变化, 则更新ConsistentHashSelector// 本次实验中invokers是固定的, 所以只有第一次调用会设置ConsistentHashSelectorif (selector == null || selector.identityHashCode != invokersHashCode) {selectors.put(key, new ConsistentHashSelector(invokers, methodName, invokersHashCode));selector = selectors.get(key);}return selector.select(arguments);}/*** 获取MessageDigest, 注意此类不是线程安全的, 使用ThreadLocal来获取* @return*/public static MessageDigest getMD5() {MessageDigest md5 = MD5.get();if (md5 == null) {try {md5 = MessageDigest.getInstance("MD5");MD5.set(md5);} catch (NoSuchAlgorithmException e) {e.printStackTrace();}}return md5;}private static final class ConsistentHashSelector {/*** 使用TreeMap来存储digest+Invoker, 主要是利用cellingEntry方法快速查询Invoker*/private final TreeMap<Long, Invoker> virtualInvokers;/*** 副本数目, 在这表示虚拟Invoker的数量*/private final int replicaNumber = 160;private final int identityHashCode;private final int[] argumentIndex;ConsistentHashSelector(List<Invoker> invokers, String methodName, int identityHashCode) {this.virtualInvokers = new TreeMap<>();this.identityHashCode = identityHashCode;argumentIndex = HASH_PARAMETER_INDEX;for (Invoker invoker : invokers) {// 使用Invoker唯一的名称来替代地址, 作为区分String address = invoker.name;// 创建 replicaNumber / 4 * 4 个虚拟节点, 本例中每个Invoker都创建160个虚拟节点for (int i = 0; i < replicaNumber / 4; i++) {byte[] digest = getMD5().digest((address + i).getBytes(StandardCharsets.UTF_8));for (int h = 0; h < 4; h++) {long m = hash(digest, h);virtualInvokers.put(m, invoker);}}}}/*** 因为我们只测试一个方法, 所以直接传递参数来选择Invoker* @param arguments* @return*/public Invoker select(Object[] arguments) {String key = toKey(arguments);byte[] digest = getMD5().digest(key.getBytes(StandardCharsets.UTF_8));return selectForKey(hash(digest, 0));}/*** 得到HashKey* @param args* @return*/private String toKey(Object[] args) {StringBuilder buf = new StringBuilder();// 在这里我们只选择argumentIndex中的参数作为key参与Hash值的计算for (int i : argumentIndex) {if (i >= 0 && i < args.length) {// 一般来说, 对应的参数都应该重写toString方法buf.append(args[i]);}}return buf.toString();}/*** 根据Hash值选择Invoker* @param hash* @return*/private Invoker selectForKey(long hash) {// 在虚拟节点对应的TreeMap中选择刚大于hash值的InvokerMap.Entry<Long, Invoker> entry = virtualInvokers.ceilingEntry(hash);if (entry == null) {// 如果不存在, hash值大于最大的虚拟节点的hash值, 则应该选择第一个Invokerentry = virtualInvokers.firstEntry();}return entry.getValue();}/*** 计算Hash值* @param digest* @param number* @return*/private long hash(byte[] digest, int number) {/*** 在初始化虚拟Invoker的时候, number分别为 0,1,2,3* 也就是说分别以 0~3, 4~7, 8~11, 12~15 的byte来计算hash值* 保证4次计算结果比较分散*/return (((long) (digest[3 + number * 4] & 0xFF) << 24)| ((long) (digest[2 + number * 4] & 0xFF) << 16)| ((long) (digest[1 + number * 4] & 0xFF) << 8)| (digest[number * 4] & 0xFF))& 0xFFFFFFFFL;}}public static void main(String[] args) {ConsistentHashLoadBalance consistentHashLoadBalance = new ConsistentHashLoadBalance();// 初始化10个Invoker, 命名 Service i, Weight i; i -> [1, 10]List<Invoker> invokers = new ArrayList<>(10);for (int i = 1; i <= 10; i++) {invokers.add(new Invoker("Service " + i, i));}CountDownLatch cdl = new CountDownLatch(10);// 启动10个线程, 每个线程调用10万次, 共调用100万次for (int i = 0; i < 10; i++) {new Thread(() -> {Object[] arguments = new Object[4];for (int j = 0; j < 100000; j++) {// UUID随机生成4个参数arguments[0] = UUID.randomUUID().toString();arguments[1] = UUID.randomUUID().toString();arguments[2] = UUID.randomUUID().toString();arguments[3] = UUID.randomUUID().toString();Invoker selected = consistentHashLoadBalance.doSelect(invokers, "defaultMethod", arguments);// 统计调用次数selected.invokeCount.incrementAndGet();}cdl.countDown();}).start();}try {cdl.await();} catch (InterruptedException e) {e.printStackTrace();}// 打印调用次数for (Invoker invoker : invokers) {System.out.println(invoker.name + ": invokeCount = " + invoker.invokeCount.get());}// 测试Hash一致性, 固定第0个、第2个参数, 第1个、第3个参数随机生成, 看是否一直选择同一个InvokerString firstArg = UUID.randomUUID().toString();String thirdArg = UUID.randomUUID().toString();Object[] arguments = new Object[4];arguments[0] = firstArg;arguments[2] = thirdArg;for (int i = 0; i < 20; i++) {arguments[1] = UUID.randomUUID().toString();arguments[3] = UUID.randomUUID().toString();Invoker selected = consistentHashLoadBalance.doSelect(invokers, "defaultMethod", arguments);System.out.println("当前选择的Invoker = " + selected.name);}}}
输出结果
Service 1: invokeCount = 107767
Service 2: invokeCount = 90664
Service 3: invokeCount = 111901
Service 4: invokeCount = 97779
Service 5: invokeCount = 96242
Service 6: invokeCount = 97307
Service 7: invokeCount = 109512
Service 8: invokeCount = 102675
Service 9: invokeCount = 82314
Service 10: invokeCount = 103839
当前选择的Invoker = Service 8
当前选择的Invoker = Service 8
当前选择的Invoker = Service 8
当前选择的Invoker = Service 8
当前选择的Invoker = Service 8
当前选择的Invoker = Service 8
当前选择的Invoker = Service 8
当前选择的Invoker = Service 8
当前选择的Invoker = Service 8
当前选择的Invoker = Service 8
当前选择的Invoker = Service 8
当前选择的Invoker = Service 8
当前选择的Invoker = Service 8
当前选择的Invoker = Service 8
当前选择的Invoker = Service 8
当前选择的Invoker = Service 8
当前选择的Invoker = Service 8
当前选择的Invoker = Service 8
当前选择的Invoker = Service 8
当前选择的Invoker = Service 8
由结果可知,请求大致分配均匀,当增大虚拟节点个数,会更加均匀。
参数0、2不变时,每次请求返回的Invoker都一样。
附dubbo ConsistentHashLoadBalance源码 version 3.0.2
package org.apache.dubbo.rpc.cluster.loadbalance;import org.apache.dubbo.common.URL;import org.apache.dubbo.common.io.Bytes;import org.apache.dubbo.rpc.Invocation;import org.apache.dubbo.rpc.Invoker;import org.apache.dubbo.rpc.support.RpcUtils;import java.util.List;import java.util.Map;import java.util.TreeMap;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ConcurrentMap;import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN;/*** ConsistentHashLoadBalance*/public class ConsistentHashLoadBalance extends AbstractLoadBalance {public static final String NAME = "consistenthash";/*** Hash nodes name*/public static final String HASH_NODES = "hash.nodes";/*** Hash arguments name*/public static final String HASH_ARGUMENTS = "hash.arguments";private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();("unchecked")protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {String methodName = RpcUtils.getMethodName(invocation);String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;// using the hashcode of list to compute the hash only pay attention to the elements in the listint invokersHashCode = invokers.hashCode();ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);if (selector == null || selector.identityHashCode != invokersHashCode) {selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, invokersHashCode));selector = (ConsistentHashSelector<T>) selectors.get(key);}return selector.select(invocation);}private static final class ConsistentHashSelector<T> {private final TreeMap<Long, Invoker<T>> virtualInvokers;private final int replicaNumber;private final int identityHashCode;private final int[] argumentIndex;ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {this.virtualInvokers = new TreeMap<Long, Invoker<T>>();this.identityHashCode = identityHashCode;URL url = invokers.get(0).getUrl();this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160);String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0"));argumentIndex = new int[index.length];for (int i = 0; i < index.length; i++) {argumentIndex[i] = Integer.parseInt(index[i]);}for (Invoker<T> invoker : invokers) {String address = invoker.getUrl().getAddress();for (int i = 0; i < replicaNumber / 4; i++) {byte[] digest = Bytes.getMD5(address + i);for (int h = 0; h < 4; h++) {long m = hash(digest, h);virtualInvokers.put(m, invoker);}}}}public Invoker<T> select(Invocation invocation) {String key = toKey(invocation.getArguments());byte[] digest = Bytes.getMD5(key);return selectForKey(hash(digest, 0));}private String toKey(Object[] args) {StringBuilder buf = new StringBuilder();for (int i : argumentIndex) {if (i >= 0 && i < args.length) {buf.append(args[i]);}}return buf.toString();}private Invoker<T> selectForKey(long hash) {Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash);if (entry == null) {entry = virtualInvokers.firstEntry();}return entry.getValue();}private long hash(byte[] digest, int number) {return (((long) (digest[3 + number * 4] & 0xFF) << 24)| ((long) (digest[2 + number * 4] & 0xFF) << 16)| ((long) (digest[1 + number * 4] & 0xFF) << 8)| (digest[number * 4] & 0xFF))& 0xFFFFFFFFL;}}}
