vlambda博客
学习文章列表

源码分析:ConcurrentHashMap—JDK1.7版本

简介

ConcurrentHashMap是从JDK 1.5开始支持一定并发性的哈希表,其中所有的操作都是线程安全的,所以常常会被应用于高并发的场景中。

为什么会出现ConcurrentHashMap?

  1. HashMap 非线程安全,不适用于并发场景

  2. HashTable 线程安全,但是效率低下。

结构分析

Segment

Segment 是CHM中非常重要的一个类,它继承至ReentrantLock,作为CHM分段锁实现的重要组成部分 。

static final class Segment<K,Vextends ReentrantLock implements Serializable{
    ...
  transient volatile HashEntry<K,V>[] table;
  transient int threshold;
  final float loadFactor;
    ...
}

Segment 里面还包含了一个HashEntry数组,而HashEntry就是实际组装代表Hash节点的数据。

HashEntry

HashEntry 是实际的数据节点,是一个单向链表结构。

static final class HashEntry<K,V{
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;
}

重要属性

// 默认初始容量 16
static final int DEFAULT_INITIAL_CAPACITY = 16;
// 默认加载因子 0.75f (作用域segment内部)
static final float DEFAULT_LOAD_FACTOR = 0.75f;
// 默认并发级别(和默认初始容量一致) 16
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 允许的最大容量 
static final int MAXIMUM_CAPACITY = 1 << 30;
// 每段segment最小容量(必须是2的幂,至少是2)
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
// 允许的最大segment数
static final int MAX_SEGMENTS = 1 << 16// slightly conservative
// 锁之前,重试次数(默认自旋次数)
static final int RETRIES_BEFORE_LOCK = 2;
// 分段索引的掩码值
final int segmentMask;
// 段内索引的移位值
final int segmentShift;

// segment 数组
final Segment<K,V>[] segments;

transient Set<K> keySet;
transient Set<Map.Entry<K,V>> entrySet;
transient Collection<V> values;

从上面大概可以看出ConcurrentHashMap的结构如下:

方法分析

构造方法

public ConcurrentHashMap() {
    this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}

public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel)
 
{
    // 1. 参数校验
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    // 2. 控制最大并发级别
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    // Find power-of-two sizes best matching arguments
    int sshift = 0;
    int ssize = 1;
    // 3. 找到2的多少次方才能到并发级别,默认16的话sshift 就是4
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    // 默认值计算出来segmentShift 就是28;by:https://jinglingwang.cn
    this.segmentShift = 32 - sshift;
    // 默认值计算出来segmentMask 就是15
    this.segmentMask = ssize - 1;
    // 4. 控制最大初始容量
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    // 5. 如果是默认的并打级别,ssize就是16,initialCapacity 为64的话,c就为4
    // initialCapacity 为65的话,c计算结果为5
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    // 默认最小值2,确保插入第一个值的时候不会触发扩容
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    while (cap < c)
        // 计算找到第一个cap比c大于等于的2的幂。如果c是5,cap就是8
        // 确保cap是2的幂
        cap <<= 1;
    // create segments and segments[0]
    // 6. 创建第一个Segment元素
    //  cap * loadFactor:计算扩容阈值
    //  new HashEntry[cap]:初始化第一个segment的HashEntry数组
    Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                         (HashEntry<K,V>[])new HashEntry[cap]);
    // 7. 创建CHM的segments数组,ssize的大小和并发级别有关
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    // 写入数组第一个元素
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}

小结:

  1. 构造方法确定了初始容量,加载因子,并发级别等参数。

  2. Segment数组的大小是由并发级别的大小确定的,且也必须是2的幂,默认是16。

  3. 确定segment里面的HashEntry初始大小cap,且也必须是2的幂,最小为2。

  4. 完成了初始化Segment数组和写入第一个segment元素。

put方法

public V put(K key, V value) {
    Segment<K,V> s;
    // 1.参数校验,不允许value为null
    if (value == null)
        throw new NullPointerException();
    // 2.计算key的hash(32位)
    int hash = hash(key);
    // 3. 默认值时,hash无符号右移28位,保留高4位,然后做位运算 & 15;
    //    最后j的值就是segments数组槽的下标
    int j = (hash >>> segmentShift) & segmentMask;
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null//  in ensureSegment
        // 4. 构造方法只初始化了Segments[0],其他位置第一次插入的时候还是null,需要初始化
        s = ensureSegment(j);
    // 5.往segments中放入值
    return s.put(key, hash, value, false);
}

//  内部类Segment中的实现,真正放segment中添加数据的方法
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 父类 ReentrantLock.tryLock() 尝试获得锁,没有获得锁会调用scanAndLockForPut,直到返回一个节点并持有锁
    HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
    // 旧的值
    V oldValue;
    **try {
        // 当前槽Segment中的HashEntry数组
        HashEntry<K,V>[] tab = table;
        // 1.计算hash所在的索引
        int index = (tab.length - 1) & hash;
        // 定位数组中的HashEntry元素,HashEntry是链表结构,第一个也就是表头
        HashEntry<K,V> first = entryAt(tab, index);
        for (HashEntry<K,V> e = first;;) { // 遍历hashEntry链表
            if (e != null) { // 存在旧值
                K k;
                // 1.1 校验key是否一致
                if ((k = e.key) == key ||  (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;  // 取出覆盖前的旧值
                    if (!onlyIfAbsent) {
                        e.value = value; // 覆盖旧值
                        ++modCount; // 记录修改次数
                    }
                    break;
                }
                e = e.next; // 继续遍历下一个元素
            } else { // 不存在旧值
                if (node != null)
                    node.setNext(first); // 直接设置表头first
                else
                    // 初始化节点并设置表头first
                    node = new HashEntry<K,V>(hash, key, value, first);
                int c = count + 1// 元素的数量+1
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    // 触发扩容
                    rehash(node);
                else
                    setEntryAt(tab, index, node); // 将node放到数组tab的index位置,
                ++modCount; // 记录修改次数+1
                count = c;
                oldValue = null// 第一次添加返回null
                break;
            }
        }
    } finally {
        unlock(); // 释放锁
    }
    return oldValue; // 返回旧值
}

put方法小结:

  1. 首先通过key的hash值计算出所在的槽位,也就是segments[index]。

  2. 如果index不是第一个,第一次插入的时候需要初始化对应的Segment,然后再执行put操作。

  3. 获得锁(锁的是一个Segment),通过hash计算元素所在的HashEntry数组索引,完成值的插入或覆盖,记录修改次数,以及扩容检测。

  4. 释放锁,并返回旧值。

初始化Segment:ensureSegment

// 初始化其他Segment(构造方法只初始化了Segments[0],其他位置第一次插入的时候还是null,需要初始化)
private Segment<K,V> ensureSegment(int k) {
    final Segment<K,V>[] ss = this.segments;
    // 计算原始偏移量
    long u = (k << SSHIFT) + SBASE; // raw offset
    Segment<K,V> seg;
    // 1. 再次确保是没有初始化过的
    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
        // 2. 以第一个segment作为模版原型
        Segment<K,V> proto = ss[0]; // use segment 0 as prototype
        // 取第一个segment的HashEntry数组长度,加载因子等
        // 因为构造方法时已经算过了,后面的都取第一个的初始化参数
        int cap = proto.table.length;
        float lf = proto.loadFactor;
        int threshold = (int)(cap * lf); // 计算阈值
        // 3.初始化segment的HashEntry数组
        HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
        // 4. 再次检查确保是没有这个过程中没有被其他线程初始化
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck
            // 初始化Segment
            Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
            // while自旋检测,并将初始化好的segment设置到segments数组对应的索引位置中
            while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                    break;
            }
        }
    }
    // 返回初始化好的segment by:https://jinglingwang.cn
    return seg;
}

获取写入锁: scanAndLockForPut

在前面put元素的时候会尝试tryLock获得锁,如果没有获得到锁,会进入到这个方法,直到成功获得锁并返回

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
    HashEntry<K,V> first = entryForHash(this, hash);
    HashEntry<K,V> e = first;
    HashEntry<K,V> node = null;
    int retries = -1// negative while locating node
    while (!tryLock()) { // 自旋尝试获得锁,直到获得锁退出循环
        HashEntry<K,V> f; // to recheck first below
        if (retries < 0) {
            if (e == null) {
                if (node == null// speculatively create node
                    // 预先创建节点
                    node = new HashEntry<K,V>(hash, key, value, null);
                retries = 0;
            } else if (key.equals(e.key))
                retries = 0;
            else 
                e = e.next; //链表,下一个
        } else if (++retries > MAX_SCAN_RETRIES) {// 最大重试次数:单核1,多核64
            lock(); // 阻塞线程,直到获得锁
            break;
        } else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) {
            // first节点所在的位置发生了变化,重新赋值first
            e = first = f; // re-traverse if entry changed
            retries = -1// 相当于重新进入scanAndLockForPut方法
        }
    }
    return node;
}

该方法的最终目的是要保证获得锁。

扩容:rehash方法

对Segment中的HashEntry数组进行两倍的扩容,并将入参的节点加入到数组列表中。

只有在put方法时才会触发扩容,调用该方法时已经持有锁。

private void rehash(HashEntry<K,V> node) {

    HashEntry<K,V>[] oldTable = table;
    // 旧的容量
    int oldCapacity = oldTable.length;
    // 扩容,新的值
    int newCapacity = oldCapacity << 1;
    // 计算新的扩容阈值
    threshold = (int)(newCapacity * loadFactor);
    // 新的HashEntry数组
    HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity];
    int sizeMask = newCapacity - 1;
    // 遍历旧的数组元素
    for (int i = 0; i < oldCapacity ; i++) {
        HashEntry<K,V> e = oldTable[i]; // 旧数组中的值
        if (e != null) {
            HashEntry<K,V> next = e.next;
            int idx = e.hash & sizeMask;
            if (next == null)   // 链表,只有一个节点
                newTable[idx] = e; // 直接将旧值放入到新的数组中
            else { // 说明HashEntry数组中的节点是链表
                HashEntry<K,V> lastRun = e; // e是头结点
                int lastIdx = idx;
                // 遍历链表,确定下来一个节点lastRun(重新计算索引后有变化的最后一个节点) 
                for (HashEntry<K,V> last = next; last != null;  last = last.next) {
                    int k = last.hash & sizeMask;
                    if (k != lastIdx) {
                        lastIdx = k;
                        lastRun = last;
                    }
                }
                // 将lastRun节点及它身上所处链表赋值到新的数组中
                newTable[lastIdx] = lastRun;
                // Clone remaining nodes
                // lastRun节点之前的节点,因为重新计算索引有变化,需要赋值到不同的位置
                for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                    V v = p.value;
                    int h = p.hash;
                    int k = h & sizeMask; // 重新计算索引,与lastIdx不一样
                    HashEntry<K,V> n = newTable[k];
                    // 重新构造一个新的hashentry节点并赋值到新的数组中,如果不new需要切断链表更麻烦
                    newTable[k] = new HashEntry<K,V>(h, p.key, v, n); 
                }
            }
        }
    }
    // 新加入的节点,计算索引
    int nodeIndex = node.hash & sizeMask; // add the new node
    // newTable[nodeIndex] 可能已经存在有值,node节点作为头结点设置
    node.setNext(newTable[nodeIndex]);
    newTable[nodeIndex] = node; // 重新设置nodeIndex位置
    table = newTable; // 赋值新的数组
}

扩容小结:

  1. 调用扩容之前已经持有锁,每次对HashEntry数组进行两倍扩容。

  2. 扩容之后需要将旧的数组中的数据赋值到新的数组中

  3. 第二个for循环的目的是找到冲突链表上最后一个重新计算索引有变化的节点,之后直接把后面所有的节点赋值到同一个索引位置的新数组中

  4. 最坏的情况就是遍历到了最后一个节点,那么第二个for循环就是多余的。不过作者 Doug Lea 说了,根据统计,如果使用默认阈值,大约只有 1/6 的节点需要克隆。

获取:get方法

public V get(Object key) {
    // 所在的槽
    Segment<K,V> s; // manually integrate access methods to reduce overhead
    HashEntry<K,V>[] tab;
    // 计算hash
    int h = hash(key);
    // 1.计算槽的索引位置
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) {
        // 2. 计算在HashEntry中的索引位置,遍历链表  
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k))) // 找到元素,并返回值
                return e.value;
        }
    }
    return null// 没有找到,返回null
}

这个方法的实现就简单很多,只要确定如果找到对应的索引位置就行。

  1. 先要确认槽的位置,也就是segment的位置

  2. 然后确定在segment里面的HashEntry数组中的索引位置。

  3. 在链表上找目标key并返回其值

删除:remove

public boolean remove(Object key, Object value) {
    // 计算hash
    int hash = hash(key);
    Segment<K,V> s;
    // segmentForHash: 计算hash所处的槽位,返回对应的segment 
    return value != null && (s = segmentForHash(hash)) != null &&
        s.remove(key, hash, value) != null;
}

// 移除指定值的元素
final V remove(Object key, int hash, Object value) {
    if (!tryLock()) // 先尝试获得锁
        scanAndLock(key, hash); //尝试获得锁或阻塞等待获得锁
    V oldValue = null;
    try {
        HashEntry<K,V>[] tab = table; // segment的数据table
        int index = (tab.length - 1) & hash; // 计算所处的索引
        HashEntry<K,V> e = entryAt(tab, index); // 取出所处位置的HashEntry
        HashEntry<K,V> pred = null// 记录前置节点
        while (e != null) { // 元素存在
            K k;
            HashEntry<K,V> next = e.next;
            if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { // 找到了目标一直的元素
                V v = e.value;
                if (value == null || value == v || value.equals(v)) { // 并且value 一直
                    if (pred == null// 如果第一个元素就是目标元素
                        setEntryAt(tab, index, next); // 直接将对应位置的数据设置为next值
                    else
                        pred.setNext(next); // 删除链表中间的目标节点
                    ++modCount; // 记录修改次数
                    --count;  // -1
                    oldValue = v; //删除的旧值
                }
                break;
            }
            pred = e;
            e = next; // 继续遍历比对下一个节点
        }
    } finally {
        unlock();
    }
    return oldValue;
}

总结

  1. 1.7版本是通过Segment+HashEntry来实现的

  2. Segments的数量在初始化后不支持扩容

  3. 扩容机制只会对segment中的HashEntry数组进行2倍扩容

  4. 最终的并发性能会受Segments数量的限制而限制