百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT知识 > 正文

「一文搞懂」ConcurrentHashMap实现原理及源码解析

liuian 2025-05-27 15:53 122 浏览

本章内容

简介

ConcurrentHashMap是Java并发包(JUC)中提供的一种线程安全的并发容器(即:HashMap的线程安全版本),主要用于高并发场景下key-value数据的存储。

实现原理

ConcurrentHashMap的实现原理在JDK的不同版本中存在很大的差异:

  • 在JDK1.7版本中,ConcurrentHashMap通过数组+Segment分段锁+链表的方式实现,通过Segment分段锁(Segment继承了ReentrantLock)来保证并发场景下多线程操作的并发安全性。
  • 在JDK1.8版本中,ConcurrentHashMap通过数组+链表+红黑树的方式实现,通过CAS+synchronized来保证并发场景下多线程操作的并发安全性。

本文主要分享ConcurrentHashMap在JDK1.8版本中的实现原理。

整体结构

ConcurrentHashMap的整体结构,如图所示:

其中:

  • 数组:Node类型的数组,数组中的每个元素相当于一个哈希桶:
    • 如果不发生哈希冲突,则每个元素都存储在数组中。
    • 如果发生哈希冲突,则每个元素存储的是链表的头节点或红黑树的根节点。
  • 链表:Node类型的链表,当发生哈希冲突时,通过链地址法将具有相同哈希值的Node节点组成一个链表。
  • 红黑树:当链表的长度大于8且数组的长度大于等于64时,将链表转换为红黑树进行存储。

重要属性

ConcurrentHashMap的重要属性及常量:

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
    implements ConcurrentMap<K,V>, Serializable {
    // Node数组,用于存储ConcurrentHashMap中的数据
    transient volatile Node<K,V>[] table;
    // 扩容时产生的新的Node数组,其容量为table数组的2倍
    private transient volatile Node<K,V>[] nextTable;
    /**
     * sizeCtl值:
     *   0:表示数组未初始化,且数组的初始容量为16
     *   正数:如果数组未初始化,则该值记录的是数组的初始容量;如果数组已经初始化,则该值记录的是数组的扩容阈值(数组初始容量*0.75)
     *   -1:表示数组正在进行初始化
     *   -N:表示数组正在进行扩容,参与扩容的线程数为N-1
     */
    private transient volatile int sizeCtl;
    // Node数组最大容量
    private static final int MAXIMUM_CAPACITY = 1 << 30;
    // Node数组默认容量
    private static final int DEFAULT_CAPACITY = 16;
    // 扩容因子
    private static final float LOAD_FACTOR = 0.75f;
    /**
     * 链表转红黑树阀值:
     *   如果链表长度大于达到阀值,但数组容量小于64,则进行数组扩容。
     *   如果链表长度大于达到阀值,且数组容量大于等于64,则将链表转为红黑树。
     */
    static final int TREEIFY_THRESHOLD = 8;
    // 红黑树转链表阀值,如果红黑树中的节点个数达到阀值,则将红黑树转为链表。
    static final int UNTREEIFY_THRESHOLD = 6;
    // 链表转红黑树数组容量最小值
    static final int MIN_TREEIFY_CAPACITY = 64;
}

重要结构

Node

Node节点主要用于封装key-value键值对,ConcurrentHashMap中的链表由多个相连的Node节点组成。

static class Node<K,V> implements Map.Entry<K,V> {
    // key的hash值
    final int hash;
    // key键
    final K key;
    // value值
    volatile V val;
    // 指向下一个Node节点的指针
    volatile Node<K,V> next;
    ...
}

ForwardingNode

ForwardingNode主要用于数组进行扩容时标识哈希桶对应的头节点,ForwardingNode中不保存key-value键值对,只保存扩容时产生的新的Node数组(nextTable)的引用。ForwardingNode中的hash值为MOVED(-1),它定义了一个find()方法,用于扩容时从nextTable中查找对应的Node节点。

TreeBin

当ConcurrentHashMap中的链表转为红黑树后,数组中保存的引用为TreeBin,TreeBin内部不保存key-value键值对,而是保存TreeNode的List和红黑树Root。

TreeNode

红黑树的节点,TreeNode继承了ConcurrentHashMap.Node。

构造函数

ConcurrentHashMap的构造函数:

// 无参构造函数
public ConcurrentHashMap() {}
// 带初始化容量的构造函数
public ConcurrentHashMap(int initialCapacity) {
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    /**
     * 如果数组的初始容量大于(最大容量/2),则初始化容量大小为最大容量。
     * 否则初始化容量大小为大于(initialCapacity+initialCapacity/2+1)的最小2^n值。
     * 假设initialCapacity为8,则大于(8+8/2+1)的最小2^n为16,即:cap = 16
     */
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
               MAXIMUM_CAPACITY :
               tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    // 设置sizeCtl变量的值为数组的初始化容量
    this.sizeCtl = cap;
}
// 带集合的构造函数
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
    // 设置sizeCtl变量的值为数组的默认容量
    this.sizeCtl = DEFAULT_CAPACITY;
    // 将集合中所有的key-value键值对添加到ConcurrentHashMap中
    putAll(m);
}
// 带初始化容量和负载因子的构造函数
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
    this(initialCapacity, loadFactor, 1);
}
// 带初始化容量、负载因子以及并发线程数的构造函数
public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    // 如果初始容量小于并发线程数,则初始容量为并发线程数的大小
    if (initialCapacity < concurrencyLevel) 
        initialCapacity = concurrencyLevel; 
    // 数组大小
    long size = (long)(1.0 + (long)initialCapacity / loadFactor);
    /**
     * 如果数组大小大于等于最大容量,则初始化容量大小为最大容量。
     * 否则初始化容量大小为大于size的最小2^n值。
     */
    int cap = (size >= (long)MAXIMUM_CAPACITY) ?
        MAXIMUM_CAPACITY : tableSizeFor((int)size);
    // 设置sizeCtl变量的值为数组的初始化容量
    this.sizeCtl = cap;
}

put方法

实现原理

ConcurrentHashMap通过ConcurrentHashMap#put方法向其中添加key-value键值对。

ConcurrentHashMap#put方法执行流程,如图所示:

处理流程:

  • 1)向ConcurrentHashMap中添加key-value键值对,计算key对应的hash值,自旋遍历table数组并进行判断:
    • 如果table数组还未初始化,则初始化table数组。
    • 如果key的hash值对应的数组下标处不存在Node节点,则根据key的hash值、key、value等值创建Node节点,并将创建的Node节点通过CAS的方式设置到hash值对应的数组下标处,退出自旋。
    • 如果key的hash值对应的数组下标处存在Node节点,且Node节点的hash值为-1(即:ForwardingNode),说明正在进行数组扩容,则当前线程协助数组扩容,直到数组扩容完成,继续自旋。
    • 如果key的hash值对应的数组下标处存在Node节点,且Node节点的hash值不为-1(即:正常的Node节点),则锁定该下标对应的哈希槽,并根据哈希槽中头节点类型进行判断:
      • 如果节点类型为Node,说明哈希槽存储的是链表,则遍历链表并进行判断:
        • 如果存在与key的hash值及key值相等的节点,则根据onlyIfAbsent值进行判断:
          • true:退出链表遍历。
          • false:用传入的value替换旧value,退出链表遍历。
        • 如果不存在与key的hash值及key值相等的节点,则根据key的hash值、key、value等值创建Node节点并追加到链表的末尾(尾插法),退出链表遍历。
      • 如果节点类型为TreeBin,说明哈希槽存储的是红黑树,则遍历红黑树并进行判断:
        • 如果存在与key的hash值及key值相等的节点,则根据onlyIfAbsent值进行判断:
          • true:不做处理。
          • false:用传入的value替换旧value。
        • 如果不存在与key的hash值及key值相等的节点,则根据key的hash值、key、value等值创建TreeNode节点添加到红黑树中。
      • 判断binCount的值是否为0:
        • 0:表示添加失败,继续自旋。
        • 非0:表示添加成功,链表中的节点数是否达到8个:
          • 节点数达到8个:如果table数组大小达到64,则将链表转为红黑树;否则进行数组扩容。
        • 旧value是否为空:
          • 不为空,则返回旧value。
  • 2)增加节点总数,并返回null。

源码解析

ConcurrentHashMap#put方法源码解析:

public V put(K key, V value) {
    // 添加key-value键值对
    return putVal(key, value, false);
}
/**
 * 添加key-value键值对具体实现
 */
final V putVal(K key, V value, boolean onlyIfAbsent) {
    // 如果key或者value为空,则抛出空指针异常
    if (key == null || value == null) throw new NullPointerException();
    // 计算key的hash值
    int hash = spread(key.hashCode());
    int binCount = 0;
    // 自旋遍历table数组
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        // 如果table数组未初始化,则初始化table数组
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        // 如果根据key的hash值计算得到的数组下标(i)处不存在对应的Node节点,则根据key的hash值、key、value等值创建新的Node节点,并以CAS的方式将新创建的Node节点添加到数组对应的下标处
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 通过CAS的方式在下标i处创建Node节点
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        // 如果Node节点的hash值为MOVED(-1),说明正在进行数组扩容,则协助数组扩容,直到数组扩容完成
        else if ((fh = f.hash) == MOVED)
            // 协助数组扩容
            tab = helpTransfer(tab, f);
        else {  // 此处表示根据key的hash值计算得到的数组下标(i)处已经存在对应的Node节点
            V oldVal = null;
            // 锁定该下标对应的哈希槽
            synchronized (f) {
                // 判断数组下标i处的Node节点是否被修改
                if (tabAt(tab, i) == f) {
                    // f节点的hash值大于等于0,说明是正常的Node节点(非转移节点)
                    if (fh >= 0) {
                        // 链表中已遍历的节点数
                        binCount = 1;
                        // 遍历链表
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            /** 
                             * 如果存在与key的hash值及key值相等的节点,则根据onlyIfAbsent值进行判断:
                             *   true:退出链表遍历,继续自旋。
                             *   false:用传入的value替换旧value,退出链表遍历,继续自旋。
                             */
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                // onlyIfAbsent为false,则用传入的value替换旧value
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            // 如果不存在与key的hash值及key值相等的节点,则根据key的hash值、key、value等值创建Node节点并追加到链表的末尾(尾插法)
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    // 如果根据key的hash值计算得到的数组下标(i)处存放的是红黑树,则根据key的hash值、key、value等值创建TreeNode节点并添加到红黑树中
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            // binCount不等于0,说明已经添加成功
            if (binCount != 0) {
                // 如果binCount大于等于8,则将链表转化为红黑树
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}

table数组初始化

ConcurrentHashMap中的table数组采用懒加载方式进行初始化。调用ConcurrentHashMap#initTable方法进行table数组初始化时,首先通过自旋来保证初始化成功,然后通过CAS的方式将SizeCtl变量的值设置为-1(-1表示数组正在进行初始化)来保证同一时刻只有一个线程对数组进行初始化,CAS成功后,再次判断当前数组是否已经初始化完成,如果已经初始化完成,则不会再次初始化。通过自旋+CAS+双重检查等手段保证了数组初始化时的线程安全。

ConcurrentHashMap#initTable方法源码解析:

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    // 如果table数组未初始化,则自旋进行初始化
    while ((tab = table) == null || tab.length == 0) {
        // sizeCtl小于0,说明存在其他线程正在进行初始化,则让出CPU使用权
        if ((sc = sizeCtl) < 0)
            // 让出CPU使用权
            Thread.yield(); // lost initialization race; just spin
        // 通过CAS的方式将sizeCtl的值更新为-1(-1表示数组正在进行初始化)
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                // 双重检查,再次判断table数组是否初始化
                if ((tab = table) == null || tab.length == 0) {
                    // table数组容量:如果sc大于0,则设置sc的值为table数组的初始容量,否则设置table数组的初始容量为默认值(16)
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    // 创建Node类型的数组,并赋值给table变量
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    // 设置sc的值为table数组初始容量的3/4(即:75%)
                    sc = n - (n >>> 2);
                }
            } finally {
                // 将sc的值赋给sizeCtl变量,初始化完成
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

根据key的hash值计算数组下标

根据key的hash值计算方法:

static final int spread(int h) {
    // h ^ (h >>> 16):高位运算,即:key.hashCode()的高16位异或低16位
    return (h ^ (h >>> 16)) & HASH_BITS;
}

其中,参数h为key调用hashCode()方法得到的值。

假设数组长度为16,数组下标计算过程,如图所示:

协助扩容

在put方法中,通过遍历table数组中的Node节点向ConcurrentHashMap中添加key-value键值对时,如果key的hash值对应的数组下标处的头节点为ForwardingNode节点(即:节点中的hash值为-1),说明正在进行数组扩容,则通过调用ConcurrentHashMap#helpTransfer方法协助数组扩容,直到数组扩容完成。

ConcurrentHashMap#helpTransfer方法源码解析:

final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    // 如果table数组不为空且Node节点类型为ForwardingNode节点,并且ForwardingNode节点的nextTable引用不为空,则协助数组扩容
    if (tab != null && (f instanceof ForwardingNode) &&
        (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
        // 获取扩容标识
        int rs = resizeStamp(tab.length);
        // 自旋,其中sizeCtl<0表示正在进行数组扩容
        while (nextTab == nextTable && table == tab &&
               (sc = sizeCtl) < 0) {
            /**
             * 满足以下四个条件,则退出自旋(即:不执行扩容操作)
             * 条件1:当前线程获取的扩容标识是否与通过sc计算出来的扩容标识一致,不一致则退出循环
             * 条件2:默认第一个线程设置为sc==(rs<<16)+2,当第一个线程扩容结束sc的值-1,即:sc==(rs<<16)+1,表示扩容结束
             * 条件3:当前协助扩容的线程数达到最大值
             * 条件4:转移下标小于0,表示扩容完成
             */
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || transferIndex <= 0)
                break;
            // 通过CAS的方式将sizeCtl变量的值+1,更新成功则执行数据转移
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                // 执行数据转移
                transfer(tab, nextTab);
                break;
            }
        }
        // 返回扩容后的数组
        return nextTab;
    }
    return table;
}

get方法

实现原理

ConcurrentHashMap通过ConcurrentHashMap#get方法获取key对应的value值。

ConcurrentHashMap#get方法执行流程,如图所示:

处理流程:

  • 根据key从ConcurrentHashMap中获取value值,计算key对应的hash值,根据table数组及hash值对应的数组下标处是否存在节点进行判断:
    • 如果数组为空或者hash值对应的下标处不存在节点,则返回null。
    • 如果数组不为空且hash值对应的下标处存在节点:
      • 继续判断头节点的hash值以及key值是否与传入key的hash值以及key值相等:
        • 相等,则返回头节点的value值。
      • 头节点的hash值与传入key的hash值不相等且头节点的hash值小于0:
        • 头节点的hash值为-1:从nextTable数组(扩容后的数组)中查询,并返回查询结果。
        • 头节点的hash值为-2:从红黑树中查询,并返回查询结果。
      • 头节点的hash值与传入key的hash值不相等且头节点的hash值大于等于0,则遍历key的hash值对应的数组下标处的链表:
        • 存在与key值相等的节点,则返回节点的value值。
        • 不存在与key值相等的节点,则返回null。

源码解析

ConcurrentHashMap#get方法源码解析:

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    // 计算key的hash值
    int h = spread(key.hashCode());
    // table数组不为空且key的hash值对应的数组下标处存在节点
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        // 如果头节点的hash值与key的hash相等,则继续判断头节点的key值与传入的key值相等
        if ((eh = e.hash) == h) {
            // 如果头节点的key值与传入key值相等,则返回头节点的value值
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        /**
         * 如果key的hash值对应的数组下标处的头节点hash值小于0,说明正在进行数组扩容,则从nextTable数组(扩容后的数组)中查找key对应的value值
         * 根据eh的值分两种情况:
         *   1)-1:表示ForwardingNode,调用ForwardingNode中的find()方法
         *   2)-2:表示TreeBin,调用TreeBin中的find()方法
         */
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        // 如果key的hash值对应的数组下标处的头节点的key与key不相等,则遍历链表查找key对应的value值
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    // 没有找到key对应的节点,则返回null
    return null;
}

扩容机制

实现原理

当table数组中保存的数据越来越多,数组中的空位会越来越少,从而导致添加数据时hash冲突增加,最终导致数组中的链表越来越长,降低数据查询效率。

为了防止数组中的链表过长,提高数据查询效率,ConcurrentHashMap引入了扩容机制。扩容操作并不是在数组中没有空位时才进行,当数组中已使用的哈希槽位达到扩容因子设定的比例(默认:0.75)时,就会触发对数组进行扩容操作,扩容后的数组容量为table数组的2倍。

扩容操作通过ConcurrentHashMap#tryPresize方法实现,整体流程如图所示:

处理流程:

  • 1)设置扩容后数组容量为table数组容量的2倍。
  • 2)判断table数组是否初始化:
    • 未初始化,则初始化table数组。
    • 已初始化,则判断table数组已扩容或数组大小达到最大值:
      • 是则不执行扩容操作。
      • 否则执行扩容操作。
  • 3)根据sizeCtl变量值进行判断:
    • sizeCtl变量值小于0,说明其他线程正在进行数组扩容,则以CAS的方式将sizeCtl变量值+1,协助数组扩容。
    • sizeCtl变量值大于等于0,说明当前数组为第一个参与数组扩容的线程。
  • 4)开始数据转移,根据table数据容量及CPU核数设置当前线程负责转移的哈希槽。并根据哈希槽中节点的hash值判断:
    • hash值大于等于0,则转移链表中的数据。
    • hash值小于0,则转移红黑树中的数据。
  • 5)根据节点hash值&数组扩容位的结果(二进制与操作的结果)进行判断:
    • 如果与操作结果为0,则数据转移后在新数组中的位置保持不变。
    • 如果与操作结果为1,则数据转移后在新数组中的位置保持为原来位置+table数组容量。
  • 6)设置哈希槽点头节点为ForwardingNode(表示正在进行数组扩容)。
  • 7)当前线程负责的哈希槽数据转移完成,判断table数组中的所有数据是否转移完成:
    • 否,则通过CAS的方式将sizeCtl变量的值-1,当前线程是否为最后一个扩容线程:
      • 否,退出扩容操作。
      • 是,设置转移完成标志,下次循环执行table数组中的所有数据转移完成操作。
    • 是,则清空nextTable数组,设置table数组为扩容后的数组,设置sizeCtl的值为table数组容量的0.75倍。

扩容时二进制与操作

假设扩容前table数组容量为16,扩容后nextTable数组容量为32。

转移过程如图所示:

图中,table数组容量与nextTable数组容量的区别是二进制值中将第5位的值设置为1,即:将table数组容量向左移动一位得到nextTable数组容量。其中:

  • key1值的第5位为1,扩容后的值为扩容前的值+table数组容量(即:18)。
  • key2值的第5位为0,扩容后的值保持不变(即:5)。

源码解析

ConcurrentHashMap#tryPresize方法源码解析:

private final void tryPresize(int size) {
    // 设置变量c为扩容后的数组容量为table数组的2倍,最大不超过MAXIMUM_CAPACITY值
    int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
        tableSizeFor(size + (size >>> 1) + 1);
    int sc;
    // 如果sizeCtl的值大于等于0,则自旋
    while ((sc = sizeCtl) >= 0) {
        Node<K,V>[] tab = table; int n;
        // 如果table数组未初始化,则初始化table数组
        if (tab == null || (n = tab.length) == 0) {
            // 设置n值为sizeCtl和变量c中的较大值
            n = (sc > c) ? sc : c;
            // 以CAS的方式设置sizeCtl的值为-1,表示正在进行数组初始化
            if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if (table == tab) {
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = nt;
                        // 设置变量sc的值为table数组容量的75%
                        sc = n - (n >>> 2);
                    }
                } finally {
                    // 将变量sc的值赋值给sizeCtl变量(即:sizeCtl的值为table数组容量的75%)
                    sizeCtl = sc;
                }
            }
        }
        // 如果数组已经扩容或者table数组的容量达到最大作,则退出自旋(即:不执行扩容操作)
        else if (c <= sc || n >= MAXIMUM_CAPACITY)
            break;
        // 执行扩容操作
        else if (tab == table) {
            // 获取扩容标识
            int rs = resizeStamp(n);
            // 如果sizeCtl的值小于0,说明正在进行数组扩容,则协助数组扩容
            if (sc < 0) {
                Node<K,V>[] nt;
                /**
                 * 满足以下四个条件,则退出自旋(即:不执行扩容操作)
                 * 条件1:当前线程获取的扩容标识是否与通过sc计算出来的扩容标识一致,不一致则退出循环
                 * 条件2:默认第一个线程设置为sc==(rs<<16)+2,当第一个线程扩容结束sc的值-1,即:sc==(rs<<16)+1,表示扩容结束
                 * 条件3:当前协助扩容的线程数达到最大值
                 * 条件4:转移下标小于0,表示扩容完成
                 */
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                // 通过CAS的方式将sizeCtl变量的值+1,更新成功则执行数据转移
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    // 执行数据转移
                    transfer(tab, nt);
            }
            // 此处表示当前线程为第一个扩容线程,则nextTable数组传null(会在数据转移时新建一个容量为table数组2被的数组)
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                // 执行数据转移,注意:此处nextTable数组传null
                transfer(tab, null);
        }
    }
}
// 执行数据转移方法
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    /**
     * stride变量表示每个线程进行数据转移时负责的哈希槽数
     * 计算规则:
     *   如果CPU核数大于1,则每个线程负责的哈希槽数为table数组容量/8
     *   如果CPU核数等于1,则该线程负责整个table数组的所有哈希槽
     *   每个消息负责的哈希槽数最少为16个
     */
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    // 如果传入的nextTable数组为空,则创建nextTable数组,容量为table数组的2倍
    if (nextTab == null) {            // initiating
        try {
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        // 当前线程进行数据转移的哈希槽区间
        transferIndex = n;
    }
    int nextn = nextTab.length;
    // 创建ForwardingNode节点,并设置nextTable属性为nextTable数组的引用
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    // 标识某个哈希槽中的数据是否完成转移,true-表示转移完成,开启下一个哈希槽转移
    boolean advance = true;
    // 标识table数组中的数据是否完成转移,true-表示转移完成
    boolean finishing = false; // to ensure sweep before committing nextTab
    // 开始转移某个哈希槽中的数据,i-表示当前需要转移的哈希槽,bound-表示需要转移的哈希槽边界
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        /**
         * 根据哈希槽的转移情况设置变量i和bound
         * i:transferIndex-1
         * bound:transferIndex-stride
         */
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            // 以CAS的方式设置transferIndex = transferIndex - stride
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        // 当前线程负责的哈希槽数据转移完成
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            // table数组的所有数据转移完成
            if (finishing) {
                // 清空nextTable数组
                nextTable = null;
                // 设置table数组为扩容后的数组
                table = nextTab;
                // 设置sizeCtl的值为table数组容量的0.75倍
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            // 通过CAS的方式将sizeCtl变量的值-1
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                // 如果当前线程不是本次执行扩容操作的最后一个转移线程,则退出
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
        // 如果当前哈希槽中不存在节点,则设置哈希槽的头节点为ForwardingNode节点
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        // 如果当前哈希槽中的数据已经转移完成,则设置advance为true
        else if ((fh = f.hash) == MOVED)
            advance = true; // already processed
        // 当前哈希槽中的数据未转移完成,则继续转移数据
        else {
            // 锁定当前哈希槽
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    // 节点的hash值大于0,说明转移的事链表中的数据
                    if (fh >= 0) {
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            // 根据节点的哈希值与n的结果是否为0将链表拆分为两个子链表
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        // (ph & n)==0,将链表中的节点设置到ln链表中,ln链表保存转移后在新数组中位置保持不变的节点
                        setTabAt(nextTab, i, ln);
                        // (ph & n)!=0,将链表中的节点设置到hn链表中,hn链表保存转移后在新数组中位置为i+n的节点
                        setTabAt(nextTab, i + n, hn);
                        // 设置哈希槽点头节点为ForwardingNode
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                    // 转移红黑树中的数据
                    else if (f instanceof TreeBin) {
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                }
            }
        }
    }
}

常见问题

为什么数组大小必须为2的n次方?

将数组设置为2的n次方主要为了减少哈希碰撞的概率。

假设向ConcurrentHashMap中添加key为16、17、18三个key-value键值对,16、17、18三个key的二进制值为10000、10001、10010:

  • 如果数组大小不是2的n次方,如:数组大小为21,那么数组大小-1为20,其二进制值为10100,分别与16、17、18的二进制值进行&运算后的结果都是10000。
  • 如果数组大小是2的n次方,如:数组大小为2^4,那么数组大小-1为15,其二进制值为1111,分别与16、17、18的二进制值进行&运算后的结果为0000、0001、0010。

通过对比,将数组设置为2的n次方显然减少了哈希碰撞的概率。

为什么查询数组下标时使用按位与(&)操作?

使用按位与操作的原因是按位与操作更高效。如:数组扩容时通过按位与进行数据迁移。

使用示例

/**
 * @author 南秋同学
 * ConcurrentHashMap使用示例
 */
@Slf4j
public class ConcurrentHashMapExample {
    private static ConcurrentHashMap<String, AtomicInteger> count = new ConcurrentHashMap();
    private static CountDownLatch  countDownLatch = new CountDownLatch(3);
    public static void main(String[] args) {
        Runnable task = new Runnable() {
            @Override
            public void run() {
                AtomicInteger old;
                for (int i = 0; i < 10; i++) {
                    old = count.get("南秋同学");
                    if (null == old) {
                        AtomicInteger zero = new AtomicInteger(0);
                        old = count.putIfAbsent("南秋同学", zero);
                        if (null == old) {
                            old = zero;
                        }
                    }
                    old.incrementAndGet();
                }
                countDownLatch.countDown();
            }
        };
        new Thread(task).start();
        new Thread(task).start();
        new Thread(task).start();
        try {
            countDownLatch.await();
            log.info("count:{}",count);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

相关推荐

驱动网卡(怎么从新驱动网卡)
驱动网卡(怎么从新驱动网卡)

网卡一般是指为电脑主机提供有线无线网络功能的适配器。而网卡驱动指的就是电脑连接识别这些网卡型号的桥梁。网卡只有打上了网卡驱动才能正常使用。并不是说所有的网卡一插到电脑上面就能进行数据传输了,他都需要里面芯片组的驱动文件才能支持他进行数据传输...

2026-01-30 00:37 liuian

win10更新助手装系统(微软win10更新助手)

1、点击首页“系统升级”的按钮,给出弹框,告诉用户需要上传IMEI码才能使用升级服务。同时给出同意和取消按钮。华为手机助手2、点击同意,则进入到“系统升级”功能华为手机助手华为手机助手3、在检测界面,...

windows11专业版密钥最新(windows11专业版激活码永久)

 Windows11专业版的正版密钥,我们是对windows的激活所必备的工具。该密钥我们可以通过微软商城或者通过计算机的硬件供应商去购买获得。获得了windows11专业版的正版密钥后,我...

手机删过的软件恢复(手机删除过的软件怎么恢复)
手机删过的软件恢复(手机删除过的软件怎么恢复)

操作步骤:1、首先,我们需要先打开手机。然后在许多图标中找到带有[文件管理]文本的图标,然后单击“文件管理”进入页面。2、进入页面后,我们将在顶部看到一行文本:手机,最新信息,文档,视频,图片,音乐,收藏,最后是我们正在寻找的[更多],单击...

2026-01-29 23:55 liuian

一键ghost手动备份系统步骤(一键ghost 备份)

  步骤1、首先把装有一键GHOST装系统的U盘插在电脑上,然后打开电脑马上按F2或DEL键入BIOS界面,然后就选择BOOT打USDHDD模式选择好,然后按F10键保存,电脑就会马上重启。  步骤...

怎么创建局域网(怎么创建局域网打游戏)

  1、购买路由器一台。进入路由器把dhcp功能打开  2、购买一台交换机。从路由器lan端口拉出一条网线查到交换机的任意一个端口上。  3、两台以上电脑。从交换机任意端口拉出网线插到电脑上(电脑设置...

精灵驱动器官方下载(精灵驱动手机版下载)

是的。驱动精灵是一款集驱动管理和硬件检测于一体的、专业级的驱动管理和维护工具。驱动精灵为用户提供驱动备份、恢复、安装、删除、在线更新等实用功能。1、全新驱动精灵2012引擎,大幅提升硬件和驱动辨识能力...

一键还原系统步骤(一键还原系统有哪些)

1、首先需要下载安装一下Windows一键还原程序,在安装程序窗口中,点击“下一步”,弹出“用户许可协议”窗口,选择“我同意该许可协议的条款”,并点击“下一步”。  2、在弹出的“准备安装”窗口中,可...

电脑加速器哪个好(电脑加速器哪款好)

我认为pp加速器最好用,飞速土豆太懒,急速酷六根本不工作。pp加速器什么网页都加速,太任劳任怨了!以上是个人观点,具体性能请自己试。ps:我家电脑性能很好。迅游加速盒子是可以加速电脑的。因为有过之...

任何u盘都可以做启动盘吗(u盘必须做成启动盘才能装系统吗)

是的,需要注意,U盘的大小要在4G以上,最好是8G以上,因为启动盘里面需要装系统,内存小的话,不能用来安装系统。内存卡或者U盘或者移动硬盘都可以用来做启动盘安装系统。普通的U盘就可以,不过最好U盘...

u盘怎么恢复文件(u盘文件恢复的方法)

开360安全卫士,点击上面的“功能大全”。点击文件恢复然后点击“数据”下的“文件恢复”功能。选择驱动接着选择需要恢复的驱动,选择接入的U盘。点击开始扫描选好就点击中间的“开始扫描”,开始扫描U盘数据。...

系统虚拟内存太低怎么办(系统虚拟内存占用过高什么原因)

1.检查系统虚拟内存使用情况,如果发现有大量的空闲内存,可以尝试释放一些不必要的进程,以释放内存空间。2.如果系统虚拟内存使用率较高,可以尝试增加系统虚拟内存的大小,以便更多的应用程序可以使用更多...

剪贴板权限设置方法(剪贴板访问权限)
剪贴板权限设置方法(剪贴板访问权限)

1、首先打开iphone手机,触碰并按住单词或图像直到显示选择选项。2、其次,然后选取“拷贝”或“剪贴板”。3、勾选需要的“权限”,最后选择开启,即可完成苹果剪贴板权限设置。仅参考1.打开苹果手机设置按钮,点击【通用】。2.点击【键盘】,再...

2026-01-29 21:37 liuian

平板系统重装大师(平板重装win系统)

如果你的平板开不了机,但可以连接上电脑,那就能好办,楼主下载安装个平板刷机王到你的个人电脑上,然后连接你的平板,平板刷机王会自动识别你的平板,平板刷机王上有你平板的我刷机包,楼主点击下载一个,下载完成...

联想官网售后服务网点(联想官网售后服务热线)

联想3c服务中心是联想旗下的官方售后,是基于互联网O2O模式开发的全新服务平台。可以为终端用户提供多品牌手机、电脑以及其他3C类产品的维修、保养和保险服务。根据客户需求层次,联想服务针对个人及家庭客户...