jdk 1.8 取消了基于 Segment 的分段锁思想,改用 CAS + synchronized 控制并发操作,在某些方面提升了性能。并且追随 1.8 版本的 HashMap 底层实现,使用数组+链表+红黑树进行数据存储。**

ConcurrentHashMap的属性

  1. // node数组最大容量:2^30=1073741824
  2. private static final int MAXIMUM_CAPACITY = 1 << 30;
  3. // 默认初始值,必须是2的幕数
  4. private static final int DEFAULT_CAPACITY = 16;
  5. //数组可能最大值,需要与toArray()相关方法关联
  6. static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
  7. //并发级别,遗留下来的,为兼容以前的版本
  8. private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
  9. // 负载因子
  10. private static final float LOAD_FACTOR = 0.75f;
  11. // 链表转红黑树阀值> 8 链表转换为红黑树
  12. static final int TREEIFY_THRESHOLD = 8;
  13. //树转链表阀值,小于等于6(tranfer时,lc、hc=0两个计数器分别++记录原bin、新binTreeNode数量<=UNTREEIFY_THRESHOLD 则untreeify(lo))
  14. static final int UNTREEIFY_THRESHOLD = 6;
  15. static final int MIN_TREEIFY_CAPACITY = 64;
  16. private static final int MIN_TRANSFER_STRIDE = 16;
  17. private static int RESIZE_STAMP_BITS = 16;
  18. // 2^15-1,help resize的最大线程数
  19. private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
  20. // 32-16=16,sizeCtl中记录size大小的偏移量
  21. private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
  22. // forwarding nodes的hash值
  23. static final int MOVED = -1;
  24. // 树根节点的hash值
  25. static final int TREEBIN = -2;
  26. // ReservationNode的hash值
  27. static final int RESERVED = -3;
  28. // 可用处理器数量
  29. static final int NCPU = Runtime.getRuntime().availableProcessors();
  30. //存放node的数组
  31. transient volatile Node<K,V>[] table;
  32. // 这是一个连接表,用于哈希表扩容,扩容完成后会被重置为 null。
  33. private transient volatile Node<K,V>[] nextTable;
  34. // 该属性保存着整个哈希表中存储的所有的结点的个数总和,有点类似于 HashMap 的 size 属性。
  35. private transient volatile long baseCount;
  36. /*控制标识符,用来控制table的初始化和扩容的操作,不同的值有不同的含义
  37. *当为负数时:-1代表正在初始化,-N代表有N-1个线程正在进行扩容
  38. *当为0时:代表当时的table还没有被初始化
  39. *当为正数时:表示初始化或者下一次进行扩容的大小
  40. */
  41. private transient volatile int sizeCtl;

Node

static class Node<K,V> implements Map.Entry<K,V> {
    //链表的数据结构
    final int hash;
    final K key;
    //val和next都会在扩容时发生变化,所以加上volatile来保持可见性和禁止重排序
    volatile V val;
    volatile Node<K,V> next;
    Node(int hash, K key, V val, Node<K,V> next) {
        this.hash = hash;
        this.key = key;
        this.val = val;
        this.next = next;
    }
    public final K getKey()       { return key; }
    public final V getValue()     { return val; }
    public final int hashCode()   { return key.hashCode() ^ val.hashCode(); }
    public final String toString(){ return key + "=" + val; }
    //不允许更新value 
    public final V setValue(V value) {
        throw new UnsupportedOperationException();
    }
    public final boolean equals(Object o) {
        Object k, v, u; Map.Entry<?,?> e;
        return ((o instanceof Map.Entry) &&
                (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
                (v = e.getValue()) != null &&
                (k == key || k.equals(key)) &&
                (v == (u = val) || v.equals(u)));
    }
    //用于map中的get()方法,子类重写
    Node<K,V> find(int h, Object k) {
        Node<K,V> e = this;
        if (k != null) {
            do {
                K ek;
                if (e.hash == h &&
                    ((ek = e.key) == k || (ek != null && k.equals(ek))))
                    return e;
            } while ((e = e.next) != null);
        }
        return null;
    }
}

Node数据结构很简单,从上可知,就是一个链表,但是只允许对数据进行查找,不允许进行修改

ForwardingNode

static final class ForwardingNode<K,V> extends Node<K,V> {
    final Node<K,V>[] nextTable;
    ForwardingNode(Node<K,V>[] tab) {
        //注意这里
        super(MOVED, null, null, null);
        this.nextTable = tab;
    }
 //省略其 find 方法
}

这个节点内部保存了 nextTable 引用,它指向一张 hash 表。在扩容操作中,我们需要对每个桶中的结点进行分离和转移,如果某个桶结点中所有节点都已经迁移完成了(已经被转移到新表 nextTable 中了),那么会在原 table 表的该位置挂上一个 ForwardingNode 结点,说明此桶已经完成迁移。
ForwardingNode 继承自 Node 结点,并且它唯一的构造函数将构建一个键,值,next 都为 null 的结点,反正它就是个标识,无需那些属性。但是 hash 值却为 MOVED。
所以,我们在 putVal 方法中遍历整个 hash 表的桶结点,如果遇到 hash 值等于 MOVED,说明已经有线程正在扩容 rehash 操作,整体上还未完成,不过我们要插入的桶的位置已经完成了所有节点的迁移。

TreeNode

TreeNode继承与Node,但是数据结构换成了二叉树结构,它是红黑树的数据的存储结构,用于红黑树中存储数据,当链表的节点数大于8时会转换成红黑树的结构,他就是通过TreeNode作为存储结构代替Node来转换成黑红树源代码如下

static final class TreeNode<K,V> extends Node<K,V> {
    //树形结构的属性定义
    TreeNode<K,V> parent;  // red-black tree links
    TreeNode<K,V> left;
    TreeNode<K,V> right;
    TreeNode<K,V> prev;    // needed to unlink next upon deletion
    boolean red; //标志红黑树的红节点
    TreeNode(int hash, K key, V val, Node<K,V> next,TreeNode<K,V> parent) {
        super(hash, key, val, next);
        this.parent = parent;
    }
    Node<K,V> find(int h, Object k) {
        return findTreeNode(h, k, null);
    }
    //根据key查找 从根节点开始找出相应的TreeNode,
    final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
        if (k != null) {
            TreeNode<K,V> p = this;
            do  {
                int ph, dir; K pk; TreeNode<K,V> q;
                TreeNode<K,V> pl = p.left, pr = p.right;
                if ((ph = p.hash) > h)
                    p = pl;
                else if (ph < h)
                    p = pr;
                else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
                    return p;
                else if (pl == null)
                    p = pr;
                else if (pr == null)
                    p = pl;
                else if ((kc != null ||
                          (kc = comparableClassFor(k)) != null) &&
                         (dir = compareComparables(kc, k, pk)) != 0)
                    p = (dir < 0) ? pl : pr;
                else if ((q = pr.findTreeNode(h, k, kc)) != null)
                    return q;
                else
                    p = pl;
            } while (p != null);
        }
        return null;
    }
}

TreeBin

TreeBin从字面含义中可以理解为存储树形结构的容器,而树形结构就是指TreeNode,所以TreeBin就是封装TreeNode的容器,它提供转换黑红树的一些条件和锁的控制,部分源码结构如下

static final class TreeBin<K,V> extends Node<K,V> {
    //指向TreeNode列表和根节点
    TreeNode<K,V> root;
    volatile TreeNode<K,V> first;
    volatile Thread waiter;
    volatile int lockState;
    // 读写锁状态
    static final int WRITER = 1; // 获取写锁的状态
    static final int WAITER = 2; // 等待写锁的状态
    static final int READER = 4; // 增加数据时读锁的状态
    /**
     * 初始化红黑树
     */
    TreeBin(TreeNode<K,V> b) {
        super(TREEBIN, null, null, null);
        this.first = b;
        TreeNode<K,V> r = null;
        for (TreeNode<K,V> x = b, next; x != null; x = next) {
            next = (TreeNode<K,V>)x.next;
            x.left = x.right = null;
            if (r == null) {
                x.parent = null;
                x.red = false;
                r = x;
            }
            else {
                K k = x.key;
                int h = x.hash;
                Class<?> kc = null;
                for (TreeNode<K,V> p = r;;) {
                    int dir, ph;
                    K pk = p.key;
                    if ((ph = p.hash) > h)
                        dir = -1;
                    else if (ph < h)
                        dir = 1;
                    else if ((kc == null &&
                              (kc = comparableClassFor(k)) == null) ||
                             (dir = compareComparables(kc, k, pk)) == 0)
                        dir = tieBreakOrder(k, pk);
                        TreeNode<K,V> xp = p;
                    if ((p = (dir <= 0) ? p.left : p.right) == null) {
                        x.parent = xp;
                        if (dir <= 0)
                            xp.left = x;
                        else
                            xp.right = x;
                        r = balanceInsertion(r, x);
                        break;
                    }
                }
            }
        }
        this.root = r;
        assert checkInvariants(root);
    }
    ......
}

ConcurrentHashMap的方法

initTable

是一个初始化哈希表的操作,它同时只允许一个线程进行初始化操作。

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;

    //如果表为空才进行初始化操作
    while ((tab = table) == null || tab.length == 0) {

         //sizeCtl 小于零说明已经有线程正在进行初始化操作
        //当前线程应该放弃 CPU 的使用
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin

       //否则说明还未有线程对表进行初始化,那么本线程就来做这个工作
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                //保险起见,再次判断下表是否为空
                if ((tab = table) == null || tab.length == 0) {

                    //sc 大于零说明容量已经初始化了,否则使用默认容量
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;

                   //根据容量构建数组
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;

                    //计算阈值,等效于 n*0.75
                    sc = n - (n >>> 2);
                }
            } finally {
                //设置阈值
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

关于 initTable 方法的每一步实现都已经给出注释,该方法的核心思想就是,只允许一个线程对表进行初始化,如果不巧有其他线程进来了,那么会让其他线程交出 CPU 等待下次系统调度。这样,保证了表同时只会被一个线程初始化。

addCount

当我们成功的添加完成一个结点,最后是需要判断添加操作后是否会导致哈希表达到它的阈值,并针对不同情况决定是否需要进行扩容,还有 CAS 式更新哈希表实际存储的键值对数量。这些操作都封装在 addCount 这个方法中,当然 putVal 方法的最后必然会调用该方法进行处理。下面我们看看该方法的具体实现,该方法主要做两个事情。一是更新 baseCount,二是判断是否需要扩容。

//第一部分,更新 baseCount
private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;

    //更新baseCount,table的数量,counterCells表示元素个数的变化
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        boolean uncontended = true;
        //如果多个线程都在执行,则CAS失败,执行fullAddCount,全部加入count
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;

        s = sumCount();
    }

    //####上面部分主要完成的是对 baseCount 的 CAS 更新。

     //判断是否需要扩容
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;

        // s >= sizeCtl 表示需要进行扩容操作
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);
            if (sc < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            //当前线程操作,nextTable=null
            else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            s = sumCount();
        }
    }
}

size方法

size的作用是为我们返回哈希表中实际存在的键值对的总数。

public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 :(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :(int)n);
}
final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

可能你会有所疑问,ConcurrentHashMap 中的 baseCount 属性不就是记录的所有键值对的总数吗?直接返回它不就行了吗?
之所以没有这么做,是因为我们的 addCount 方法用于 CAS 更新 baseCount,但很有可能在高并发的情况下,更新失败,那么这些节点虽然已经被添加到哈希表中了,但是数量却没有被统计。
还好,addCount 方法在更新 baseCount 失败的时候,会调用 fullAddCount 将这些失败的结点包装成一个 CounterCell 对象,保存在 CounterCell 数组中。那么整张表实际的 size 其实是 baseCount 加上 CounterCell 数组中元素的个数。

transfer

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    // 每核处理的量小于16,则强制赋值16
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range

    //刚开始扩容,初始化 nextTab 
    if (nextTab == null) { 
        try {
           //构建一个nextTable对象,其容量为原来容量的两倍
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        //transferIndex 指向最后一个桶,方便从后向前遍历 
        transferIndex = n;
    }

    int nextn = nextTab.length;
    //定义 ForwardingNode 用于标记迁移完成的桶
    // 连接点指针,用于标志位(fwd的hash值为-1,fwd.nextTable=nextTab)
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);

   //====上面部分 主要完成的是对单个线程能处理的最少桶结点个数的计算和一些属性的初始化操作===

    //第二部分,并发扩容控制的核心
    // 当advance == true时,表明该节点已经处理过了
    boolean advance = true;
    boolean finishing = false;  

    //i 指向当前桶,bound 指向当前线程需要处理的桶结点的区间下限
    for (int i = 0, bound = 0;;) {

        Node<K,V> f; int fh;
        //这个 while 循环的目的就是通过 --i 遍历当前线程所分配到的桶结点
       //一个桶一个桶的处理
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;

            //transferIndex <= 0 说明已经没有需要迁移的桶了
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            //更新 transferIndex
           //为当前线程分配任务,处理的桶结点区间为(nextBound,nextIndex)
            else if (U.compareAndSwapInt (
                        this, TRANSFERINDEX, nextIndex,
                        nextBound = (nextIndex > stride ? nextIndex - stride : 0)
                        )
                    ) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }

        //当前线程所有任务完成
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            // 已经完成所有节点复制了
            if (finishing) {
                nextTable = null;
                table = nextTab;        // table 指向nextTable
                sizeCtl = (n << 1) - (n >>> 1);     // sizeCtl阈值为原来的1.5倍
                return;     // 跳出死循环,
            }
            // CAS 更扩容阈值,在这里面sizectl值减一,说明新加入一个线程参与到扩容操作
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }

        //待迁移桶为空,那么在此位置 CAS 添加 ForwardingNode 结点标识该桶已经被处理过了
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        // f.hash == -1 表示遍历到了ForwardingNode节点,意味着该节点已经处理过了
        // 这里是控制并发扩容的核心
        else if ((fh = f.hash) == MOVED)
            advance = true; // already processed
        else {
            // 节点加锁
            synchronized (f) {
                // 节点复制工作
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    // fh >= 0 ,表示为链表节点
                    if (fh >= 0) {
                        // 构造两个链表  一个是原链表  另一个是原链表的反序排列
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }

                        //如果fh&n不变的链表的runbit都是0,则nextTab[i]内元素ln前逆序,ln及其之后顺序
                        //否则,nextTab[i+n]内元素全部相对原table逆序
                        //这是通过一个节点一个节点的往nextTab添加
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        //把两条链表整体迁移到nextTab中
                        setTabAt(nextTab, i, ln);
                        // 在nextTable i + n 位置处插上链表
                        setTabAt(nextTab, i + n, hn);
                        // 在table i 位置处插上ForwardingNode 表示该节点已经处理过了
                        setTabAt(tab, i, fwd);
                        // advance = true 可以执行--i动作,遍历节点
                        advance = true;
                    }
                    // 如果是TreeBin,则按照红黑树进行处理,处理逻辑与上面一致
                    else if (f instanceof TreeBin) {
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                 else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        // 扩容后树节点个数若<=6,将树转链表
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                }
            }
        }
    }
}

每个新参加进来扩容的线程必然先进 while 循环的最后一个判断条件中去领取自己需要迁移的桶的区间。然后 i 指向区间的最后一个位置,表示迁移操作从后往前的做。接下来的几个判断就是实际的迁移结点操作了。

扩容过程有点复杂,这里主要涉及到多线程并发扩容,ForwardingNode的作用就是支持扩容操作,将已处理的节点和空节点置为ForwardingNode,并发处理时多个线程经过ForwardingNode就表示已经遍历了,就往后遍历,下图是多线程合作扩容的过程:
ConcurrentHashMap - 图1
首先,每个线程进来会先领取自己的任务区间,然后开始 —i 来遍历自己的任务区间,对每个桶进行处理。如果遇到桶的头结点是空的,那么使用 ForwardingNode 标识该桶已经被处理完成了。如果遇到已经处理完成的桶,直接跳过进行下一个桶的处理。如果是正常的桶,对桶首节点加锁,正常的迁移即可,迁移结束后依然会将原表的该位置标识位已经处理。
当 i < 0,说明本线程处理速度够快的,整张表的最后一部分已经被它处理完了,现在需要看看是否还有其他线程在自己的区间段还在迁移中。这是退出的逻辑判断部分:
ConcurrentHashMap - 图2
这里写图片描述
finnish 是一个标志,如果为 true 则说明整张表的迁移操作已经全部完成了,我们只需要重置 table 的引用并将 nextTable 赋为空即可。否则,CAS 式的将 sizeCtl 减一,表示当前线程已经完成了任务,退出扩容操作。
如果退出成功,那么需要进一步判断是否还有其他线程仍然在执行任务。

if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
   return;

我们说过 resizeStamp(n) 返回的是对 n 的一个数据校验标识,占 16 位。而 RESIZE_STAMP_SHIFT 的值为 16,那么位运算后,整个表达式必然在右边空出 16 个零。也正如我们所说的,sizeCtl 的高 16 位为数据校验标识,低 16 为表示正在进行扩容的线程数量。
(resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2 表示当前只有一个线程正在工作,相对应的,如果 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT,说明当前线程就是最后一个还在扩容的线程,那么会将 finishing 标识为 true,并在下一次循环中退出扩容方法。

helpTransfer

/**
 *帮助从旧的table的元素复制到新的table中
 */
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;

    if (tab != null && (f instanceof ForwardingNode) &&

        //新的table nextTba已经存在前提下才能帮助扩容
        (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {

        //返回一个 16 位长度的扩容校验标识
        int rs = resizeStamp(tab.length);

        while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) {

            //sizeCtl 如果处于扩容状态的话
            //前 16 位是数据校验标识,后 16 位是当前正在扩容的线程总数
            //这里判断校验标识是否相等,如果校验符不等或者扩容操作已经完成了,直接退出循环,不用协助它们扩容了
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || transferIndex <= 0)
                break;

            //否则调用 transfer 帮助它们进行扩容
            //sc + 1 标识增加了一个线程进行扩容
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                transfer(tab, nextTab);//调用扩容方法
                break;
            }
        }
        return nextTab;
    }
    return table;
}

其实helpTransfer()方法的目的就是调用多个工作线程一起帮助进行扩容,这样的效率就会更高,而不是只有检查到要扩容的那个线程进行扩容操作,其他线程就要等待扩容操作完成才能工作
既然这里涉及到扩容的操作,我们也一起来看看扩容方法transfer()

treeifyBin

put流程会调用treeifyBin()方法进行链表转红黑树的过程

private final void treeifyBin(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n, sc;
    if (tab != null) {
        //如果整个table的数量小于64,就扩容至原来的一倍,不转红黑树了
        //因为这个阈值扩容可以减少hash冲突,不必要去转红黑树
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            tryPresize(n << 1);
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
            synchronized (b) {
                if (tabAt(tab, index) == b) {
                    TreeNode<K,V> hd = null, tl = null;
                    for (Node<K,V> e = b; e != null; e = e.next) {
                        //封装成TreeNode
                        TreeNode<K,V> p =
                            new TreeNode<K,V>(e.hash, e.key, e.val,
                                              null, null);
                        if ((p.prev = tl) == null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    }
                    //通过TreeBin对象对TreeNode转换成红黑树
                    setTabAt(tab, index, new TreeBin<K,V>(hd));
                }
            }
        }
    }
}

put操作

public V put(K key, V value) {
    return putVal(key, value, false);
}

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {

    if (key == null || value == null) throw new NullPointerException();

    int hash = spread(key.hashCode()); //两次hash,减少hash冲突,可以均匀分布

    int binCount = 0;
    for (Node<K,V>[] tab = table;;) { //对这个table进行迭代
        Node<K,V> f; int n, i, fh;
        //这里就是上面构造方法没有进行初始化,在这里进行判断,为null就调用initTable进行初始化,属于懒汉模式初始化
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//如果i位置没有数据,就直接无锁插入
            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
                break;  // no lock when adding to empty bin
        }

        //检测到桶结点是 ForwardingNode 类型,协助扩容
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);

        //桶结点是普通的结点,锁住该桶头结点并试图在该链表的尾部添加一个节点
        else {
            V oldVal = null;
            //那就要进行加锁操作,也就是存在hash冲突,锁住链表或者红黑树的头结点
            synchronized (f) {
                if (tabAt(tab, i) == f) {

                    if (fh >= 0) { //表示该节点是链表结构
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            //这里涉及到相同的key进行put就会覆盖原先的value
                            if (e.hash == hash &&
                                ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {  //插入链表尾部
                                pred.next = new Node<K,V>(hash, key, alue, null);
                                break;
                            }
                        }
                    }

                    //向红黑树中添加元素,TreeBin 结点的hash值为TREEBIN(-2)
                    else if (f instanceof TreeBin) {//红黑树结构
                        Node<K,V> p;
                        binCount = 2;
                        //红黑树结构旋转插入
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }

            //binCount != 0 说明向链表或者红黑树中添加或修改一个节点成功
            //binCount  == 0 说明 put 操作将一个新节点添加成为某个桶的首节点
            if (binCount != 0) { 
                //如果链表的长度大于8时就会进行红黑树的转换
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);//统计size,并且检查是否需要扩容
    return null;
}

这个put的过程很清晰,对当前的table进行无条件自循环直到put成功,可以分成以下六步流程来概述

  1. 如果没有初始化就先调用initTable()方法来进行初始化过程
  2. 如果没有hash冲突就直接CAS插入
  3. 如果还在进行扩容操作就先进行扩容
  4. 如果存在hash冲突,就加锁来保证线程安全,这里有两种情况,一种是链表形式就直接遍历到尾端插入,一种是红黑树就按照红黑树结构插入,
  5. 最后一个如果该链表的数量大于阈值8,就要先转换成黑红树的结构,break再一次进入循环
  6. 如果添加成功就调用addCount()方法统计size,并且检查是否需要扩容

put的流程现在已经分析完了,你可以从中发现,在并发处理中使用的是乐观锁,当有冲突的时候才进行并发处理,而且流程步骤很清晰,但是细节设计的很复杂,毕竟多线程的场景也复杂
当我们根据 hash 值,找到对应的桶结点,如果发现该结点为 ForwardingNode 结点,表明当前的哈希表正在扩容和 rehash,于是将本线程送进去帮忙扩容。否则如果是普通的桶结点,于是锁住该桶,分链表和红黑树的插入一个节点,具体插入过程类似 HashMap,此处不再赘述。

get操作

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode()); //计算两次hash
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {//读取首节点的Node元素
        if ((eh = e.hash) == h) { //如果该节点就是首节点就返回
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        //hash值为负值表示正在扩容,这个时候查的是ForwardingNode的find方法来定位到nextTable来
        //查找,查找到就返回
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;

        while ((e = e.next) != null) {//既不是首节点也不是ForwardingNode,那就往下遍历
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

ConcurrentHashMap的get操作的流程很简单,也很清晰,可以分为三个步骤来描述

  1. 计算hash值,定位到该table索引位置,如果是首节点符合就返回
  2. 如果遇到扩容的时候,会调用标志正在扩容节点ForwardingNode的find方法,查找该节点,匹配就返回。
  3. 以上都不符合的话,就往下遍历节点,匹配就返回,否则最后就返回null

    remove操作

    在我们分析完 put 方法的源码之后,相信 remove 方法对你而言就比较轻松了,无非就是先定位再删除的复合。
    限于篇幅,我们这里简单的描述下 remove 方法的并发删除过程。
    首先遍历整张表的桶结点,如果表还未初始化或者无法根据参数的 hash 值定位到桶结点,那么将返回 null。
    如果定位到的桶结点类型是 ForwardingNode 结点,调用 helpTransfer 协助扩容。
    否则就老老实实的给桶加锁,删除一个节点。
    最后会调用 addCount 方法 CAS 更新 baseCount 的值。

    clear操作

    clear 方法将删除整张哈希表中所有的键值对,删除操作也是一个桶一个桶的进行删除。

    public void clear() {
     long delta = 0L; // negative number of deletions
     int i = 0;
     Node<K,V>[] tab = table;
     while (tab != null && i < tab.length) {
         int fh;
         Node<K,V> f = tabAt(tab, i);
         if (f == null)
             ++i;
         else if ((fh = f.hash) == MOVED) {
             tab = helpTransfer(tab, f);
             i = 0; // restart
         }
         else {
             synchronized (f) {
                 if (tabAt(tab, i) == f) {
                     Node<K,V> p = (fh >= 0 ? f :(f instanceof TreeBin) ?((TreeBin<K,V>)f).first : null);
                         //循环到链表或者红黑树的尾部
                         while (p != null) {
                             --delta;
                             p = p.next;
                         }
                         //首先删除链、树的末尾元素,避免产生大量垃圾  
                         //利用CAS无锁置null  
                         setTabAt(tab, i++, null);
                     }
                 }
             }
         }
         if (delta != 0L)
             addCount(delta, -1);
     }
    

    size操作

    最后我们来看下例子中最后获取size的方式int size = map.size();,现在让我们看下size()方法

    public int size() {
     long n = sumCount();
     return ((n < 0L) ? 0 :
             (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
             (int)n);
    }
    final long sumCount() {
     CounterCell[] as = counterCells; CounterCell a; //变化的数量
     long sum = baseCount;
     if (as != null) {
         for (int i = 0; i < as.length; ++i) {
             if ((a = as[i]) != null)
                 sum += a.value;
         }
     }
     return sum;
    }
    

    在JDK1.8版本中,对于size的计算,在扩容和addCount()方法就已经有处理了,JDK1.7是在调用size()方法才去计算,其实在并发集合中去计算size是没有多大的意义的,因为size是实时在变的,只能计算某一刻的大小,但是某一刻太快了,人的感知是一个时间段,所以并不是很精确。

    总结与思考

    其实可以看出JDK1.8版本的ConcurrentHashMap的数据结构已经接近HashMap,相对而言,ConcurrentHashMap只是增加了同步的操作来控制并发,总结如下思考

  4. JDK1.8的实现降低锁的粒度,JDK1.7版本锁的粒度是基于Segment的,包含多个HashEntry,而JDK1.8锁的粒度就是HashEntry(首节点)

  5. JDK1.8版本的数据结构变得更加简单,使得操作也更加清晰流畅,因为已经使用synchronized来进行同步,所以不需要分段锁的概念,也就不需要Segment这种数据结构了,由于粒度的降低,实现的复杂度也增加了
  6. JDK1.8使用红黑树来优化链表,基于长度很长的链表的遍历是一个很漫长的过程,而红黑树的遍历效率是很快的,代替一定阈值的链表,这样形成一个最佳拍档
  7. JDK1.8为什么使用内置锁synchronized来代替重入锁ReentrantLock,我觉得有以下几点
    1. 因为粒度降低了,在相对而言的低粒度加锁方式,synchronized并不比ReentrantLock差,在粗粒度加锁中ReentrantLock可能通过Condition来控制各个低粒度的边界,更加的灵活,而在低粒度中,Condition的优势就没有了
    2. JVM的开发团队从来都没有放弃synchronized,而且基于JVM的synchronized优化空间更大,使用内嵌的关键字比使用API更加自然
    3. 在大量的数据操作下,对于JVM的内存压力,基于API的ReentrantLock会开销更多的内存,虽然不是瓶颈,但是也是一个选择依据