最近总是在分析源码,感觉源码也不是想象上的那么难,今天我来记录一下我对ConcurrentHashMap的理解。这里只敢说记录,不敢说分析,因为ConcurrentHashMap的代码确实有点难以理解,本文不对代码进行死磕,也就是说,可能不会对某一行代码进行死磕,而是对整个ConcurrentHashMap类进行整体的理解。
本文参考资料:
1.ConcurrentHashMap源码分析(JDK8版本)
2. 死磕 Java 并发:J.U.C 之 Java 并发容器:ConcurrentHashMap
3.深入浅出ConcurrentHashMap1.8
4.深入分析ConcurrentHashMap1.8的扩容实现
5.方腾飞、魏鹏、程晓明的《Java 并发编程的艺术》
1.为什么要使用ConcurrentHashMap?
其实,楼主都觉得这个问题问的非常傻逼,为什么使用ConcurrentHashMap呢?当然是为了线程安全呗。其实这个回答是比较笼统的,这里面还有很多的问题没有涉及到,比如说,使用最多的HashMap在多线程模型下的缺点,传统线程安全的HashTable的问题等等。
(1).线程不安全的HashMap
我们才学习HashMap的时候,就知道HashMap是线程不安全的,但是不知道HashMap由于多线程会导致什么问题。下面,我们来看一段代码:
public class Demo { public static void main(String[] args) { final Map<String, String> map = new HashMap<>(2); Thread t = new Thread(new Runnable() { @Override public void run() { for(int i = 0; i < 100000; i++){ new Thread(new Runnable() { @Override public void run() { map.put(UUID.randomUUID().toString(), ""); } }).start(); } } }); t.start(); try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }}
HashMap在并发执行put操作是会引起死循环,是因为多线程会导致HashMap的Entry链表形成数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获取Entry。
(2).效率低下的HashTable
HashTable使用synchronized关键字来保证线程安全,但在线程竞争激烈的情况下HashTable的效率非常低下。因为当一个线程访问HashTable的同步方法,其他线程也访问HashTable的同步方法,会进入阻塞或者轮询状态。如果线程1在使用put进行元素添加,线程2不但不能使用put方法添加元素也不能使用get方法来获取元素,所以竞争越激烈效率越低。
(3).ConcurrentHashMap
相较于笨重的HashTable,ConcurrentHashMap就显得非常高效。ConcurrentHashMap降低了锁的粒度,其中在1.7中,设置了Segment数组,来表示不同数据段,在1.8中,取消了Segment数组,进一步降低了锁的粒度。由于本文是分析1.8的ConcurrentHashMap,所以不对1.7的版本过多的解释。
2.ConcurrentHashMap的模型图
1.8在1.7的基础上进一步的优化,使得ConcurrentHashMap的锁的粒度进一步降低。我们还是先来看看ConcurrentHashMap的结构图:

我们看到的是,在1.8的内部,维持了一个Node数组,用来存储数据。实际上,ConcurrentHashMap结构与HashMap的结构类似,基本结构都是数组+链表+红黑树。但是在ConcurrentHashMap里面,红黑树在Node数组内部存储的不是一个TreeNode对象,而是一个TreeBin对象,TreeBin内部维持着一个红黑树。
3.ConcurrentHashMap的内部类
简单的看过ConcurrentHashMap的结构图之后,现在来了解一下ConcurrentHashMap的内部类,这个在后面我们理解源码有一定的帮助。
(1).Node类
Node类在ConcurrentHashMap的内部类是最基本的类,其中桶里面装的就是Node元素,还有就是TreeBin、TreeNode等都继承于Node类。我们先来看看Node的代码:
static class Node<K,V> implements Map.Entry<K,V> { final int hash; final K key; volatile V val; volatile Node<K,V> next; Node(int hash, K key, V val, Node<K,V> next) { this.hash = hash; this.key = key; this.val = val; this.next = next; } public final K getKey() { return key; } public final V getValue() { return val; } public final int hashCode() { return key.hashCode() ^ val.hashCode(); } public final String toString(){ return key + "=" + val; } public final V setValue(V value) { throw new UnsupportedOperationException(); } public final boolean equals(Object o) { Object k, v, u; Map.Entry<?,?> e; return ((o instanceof Map.Entry) && (k = (e = (Map.Entry<?,?>)o).getKey()) != null && (v = e.getValue()) != null && (k == key || k.equals(key)) && (v == (u = val) || v.equals(u))); } /** * Virtualized support for map.get(); overridden in subclasses. */ Node<K,V> find(int h, Object k) { Node<K,V> e = this; if (k != null) { do { K ek; if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; } while ((e = e.next) != null); } return null; } }
我们来看一下Node成员变量:
final int hash; final K key; volatile V val; volatile Node<K,V> next;
hash用来存储该key的hash值,但是这里面有三个状态需要注意:
名称 | 值 | 描述 |
---|---|---|
MOVED | -1 | hash值为-1的Node,表示当前节点已经被处理过了,这中情况将出现多线程扩容的情况下,后面会详细的解释 |
TREEBIN | -2 | hash值为-2的Node,表示当前的节点是TreeBin |
RESERVED | -3 | 保留的hash值,用在ReservationNode上面 |
key用来存储的key值,val用来存储value值,这个就不用多说。next字段表示下一个Node,这个在出现哈希冲突时,将定位在相同位置上的Node使用链表连接起来。
我们还需要注意一点是,我们不能够通过调用setValue方法来改变Node的值。
public final V setValue(V value) { throw new UnsupportedOperationException(); }
(2).TreeNode类和TreeBin类
TreeNode类表示的是红黑树上的每个节点。当一个链表上的节点数量超过了指定的值,会将这个链表变为红黑树,当然每个节点就转换为TreeNode。不像HashMap,ConcurrentHashMap在桶里面直接存储的不是TreeNode,而是一个TreeBin,在TreeBin内部维护一个红黑树,也就是说TreeNode在TreeBin内部使用的。
(3).ForwardingNode类
这个是辅助类,通常在扩容时用到。此时如果一个线程在进行扩容操作,另一个线程put一个元素进来,如果看到对应的位置上面是ForwardingNode对象的话,那么就参与扩容的操作队列来。其中ForwardingNode对象来源有两个:1.原来的位置为null,但是此时复制操作已经到当前位置的后面了,会将这个原来的桶的这个位置置为ForwardingNode对象;2.原来位置不为null,但是已经操作过这个位置了。
我们来来ForwardingNode的源码:
static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null); this.nextTable = tab; } //省略了find方法代码 }
在ForwardingNode里面,我们发现了一个nextTable对象,这个对象表示扩容之后的数组,当一个线程访问到了一个ForwardingNode对象,就知道当前正在进行扩容操作,当前这个线程会帮助扩容(扩容分为两步:1.数组容量增大2倍;2.将原数组的元素拷贝到新数组里面去),将原数组的元素复制到这个nextTable里面去。
4.基本成员变量
我们在分析ConcurrentHashMap的源码时,还是先来看看它基本的成员变量。
变量名 | 描述 |
---|---|
table | 桶数组,用来存储Node元素的。默认为null,只在第一次put操作的进行初始化,该数组的长度永远为2的n次方。 |
nextTable | 默认为null,当不为null,表示当前正在进行扩容操作,这个数组就是扩容之后的数组,长度为原数组的两倍。 |
baseCount | 记录map中元素个数,由于是多线程操作,baseCount记录的不准确,所以要结合counterCells 来使用保证记录的正确性。 |
sizeCtl | 表初始化和扩容的控制位。其中,当为-1时,表示当前table数组正在被初始化;当为-N时,表示有N-1个线程在进行扩容操作;当为0(默认值)时,表示当前table还未使用,此时table为null;当为其余正整数时,表示table的容量,默认是table大小的0.75倍,在代码中使用的是(n - (n>>>2))的方式来计算0.75 |
5.构造方法
在ConcurrentHashMap里面,我觉得有两个构造方法需要我们关注,一个是无参数的构造方法,这个构造方法非常的简单,是一个空的构造方法,还有一个就是带初始容量的构造方法:
public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); this.sizeCtl = cap; }
在这个构造方法里面最主要的是,初始化sizeCtl。在这里保证sizeCtl必须是2的n次方,所以这里调用tableSizeFor方法的目的就是调整sizeCtl大小为2的n次方。
private static final int tableSizeFor(int c) { int n = c - 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1; }
假设给出大小为100,调用这个方法之后,会将大小调整为256。
6.table数组的初始化
在前面,对table数组的初始化也有所提及。table不会在构造方法里面进行初始化,而是在第一次put操作时,进行初始化。我们来看看:
final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //发现table数组尚未初始化,调用initTable方法来对table数组进行初始化 if (tab == null || (n = tab.length) == 0) tab = initTable(); //省略了其余代码 }
在调用put方法时,如果发现table尚未被初始化,会调用initTable方法来初始化。我们来看看initTable方法的代码:
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { //sizeCtl小于0,表示已经有线程在对table数组进行初始化,现在这个线程要做的就是让出cpu时间, //全力支持table初始化 if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin //此时有一个线程在对table数组进行是初始化,如果使用CAS算法对sizeCtl字段更新成功, //表示当前线程可以对table数组进行初始化。 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab; }
在initTable方法中,我们发现table数组只能被一个线程初始化。如果一个线程在初始化table数组,会将sizeCtl字段更新为-1,其他线程看到sizeCtl为-1时,就知道当前已经有线程在初始化table数组,他们需要做的是,全力支持那个线程初始化table数组--让出cpu占用时间。
同时,我们看到当table数组更新成功之后,sizeCtl更新为table数组大小的0.75倍:
sc = n - (n >>> 2);
7.put操作
我们使用HashMap最多的操作就是put操作和get操作。这里先对put进行分析。我们先总体看一下put的代码:
final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; }
这里,我先将整个put方法的代码贴出,让大家有一个总体上概念,然后再来一一解释,整个put操作的流程。
整个put操作分为4种情况:
1.如果当前table没有被初始化,会先初始化,重新进行put操作(循环操作,保证put成功)。
2.通过hash计算,定位位置,如果位置上为null,表示可以直接放进去。
3.如果对应位置的元素已经被标记为MOVED,即Node的hash为MOVED,那么表示当前table数组在进行扩容操作,此时,当前的线程会帮助扩容。
4.如果通过hash计算之后的位置不为null,表示出现了哈希冲突,此时会锁住当前这个位置上的Node对象,然后将新添加的Node添加这个位置链表或者红黑树上。
(1).hash计算
put操作的第一步就是hash计算,这里的hash计算,我认为分为两步:
1.调用spread方法将key的hashcode再次hash。
2.通过(n - 1) & hash方法来计算该元素定位的位置。
我们先来看看spread方法:
static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; }
我先来解释一下>>>运算符的含义:
>>>的基本操作就是左移,然后高位补0,这里的左移表示连符号位都要跟着左移;而>>只是左移数值位,不移动符号位。
h ^ (h >>> 16)与运算过程如下图:

异或操作完成之后,然后跟HASH_BITS取与操作,我认为是为了消除符号位的影响。
至于(n - 1) & hash计算,其实就是hash % n的计算,具体的解释,大家可以参考我的Java 源码分析-ThreadLocal,这篇文章里面有相关的解释。
(2).元素放入数组
由于初始化table数组在前面已经说了,所以这里直接说赋值的情况。在这种情况下,还有分为两种情况:1.对应位置为null;2.对应位置不为null。这里将将两种情况分别讨论一下。
A.对应位置为null
这种情况比较简单,直接调用了casTabAt方法进行赋值:
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); }
这里使用Unsafe类来对变量进行CAS更新,具体的实现原理,楼主也不是很清楚,实在抱歉!
A.对应位置不为null
这种情况下,就比较复杂了,我们先来看看代码:
V oldVal = null; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; }
这种情况下,新添加的Node肯定会添加到这个位置后面,当然为了保证线程安全,肯定锁住当前的Node,这样将锁的粒度降到了每个Node上,一个Node的put操作不会影响其他Node的操作。
锁住了之后,当然就要添加Node。这里分为两种情况,一种原来的Node不是TreeBin,也就是说原来就是一个链表,这样直接添加到链表的内部就行了;还有就是原来的Node本身就是一个TreeBin,那么我们通过调用TreeBin内部的方法来添加一个Node,至于红黑树的平衡性,让TreeBin自己来维持,我们外部不需要关注。
添加完成之后,还需要一个操作,那就是如果当前链表达到了阈值,需要将链表变为红黑树。这一步的解释在后面在详细的说。
8.更新baseCount
在putVal方法的循环执行完毕之后,一个Node肯定放入进去了,此时就需要调用addCount方法来更新baseCount变量:
private final void addCount(long x, int check) { CounterCell[] as; long b, s; //更新baseCount if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); } //检测是否需要扩容 if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); if (sc < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } } }
addCount方法里面分为主要两个操作:1.更新baseCount;2.检测是否扩容。这里由于在讨论更新baseCount,所以不对扩容操作进行详细的解释,之后会做详细的解释。
更新baseCount的操作分成了两步:1.尝试更新baseCount变量;2.如果更新失败,或者counterCells为null会调用fullAddCount方法进行循环更新。
我们来看看fullAddCount的代码:
private final void fullAddCount(long x, boolean wasUncontended) { int h; if ((h = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); // force initialization h = ThreadLocalRandom.getProbe(); wasUncontended = true; } boolean collide = false; // True if last slot nonempty for (;;) { CounterCell[] as; CounterCell a; int n; long v; if ((as = counterCells) != null && (n = as.length) > 0) { // 如果counterCells数组对应位置上为null,创建一个cell, //放在这个位置上。 if ((a = as[(n - 1) & h]) == null) { if (cellsBusy == 0) { // Try to attach new Cell CounterCell r = new CounterCell(x); // Optimistic create if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean created = false; try { // Recheck under lock CounterCell[] rs; int m, j; if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty } } collide = false; } else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash //如果对应位置上不为null,尝试更新value值 else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) break; else if (counterCells != as || n >= NCPU) collide = false; // At max size or stale else if (!collide) collide = true; //如果对应不为null,并且更新失败,表示此时counterCells数组的容量过小, //此时需要扩容。 else if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { try { if (counterCells == as) {// Expand table unless stale CounterCell[] rs = new CounterCell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; counterCells = rs; } } finally { cellsBusy = 0; } collide = false; continue; // Retry with expanded table } h = ThreadLocalRandom.advanceProbe(h); } //初始化counterCells数组 else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean init = false; try { // Initialize table if (counterCells == as) { CounterCell[] rs = new CounterCell[2]; rs[h & 1] = new CounterCell(x); counterCells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; } else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x)) break; // Fall back on using base } }
这个方法的代码比较长,理解起来可能麻烦,我来说明整个方法的作用和执行的过程。
在解释这个方法之前,先解释一下baseCount和counterCells含义。两个都是用记录元素个数,只是记录的时机不同的。当要更新元素个数时,优先更新baseCount,如果baseCount更新成功的话,表示更新元素个数的操作已经完成了;如果更新失败的话,此时会考虑更新counterCells数组中某一个(随机的)cell的value值。因此,map的元素个数 = baseCount + 所有的cell的value值。
在调用fullAddCount方法之前,在addCount方法里面进行了两次尝试:1.尝试更新baseCount;2.尝试更新counterCells数组随机位置上的一个cell的value。如果这两次尝试都失败的话,则需要调用fullAddCount来保证元素个数正确的更新。
而这个fullAddCount方法的作用就是更新cell的value值。
整个fullAddCount方法的执行步骤:
1.如果counterCells为null,先初始化counterCells数组,默认大小为0。
2.如果counterCells不为null,根据产生的随机,通过(n - 1) & h找到一个位置,如果这个位置为null的话,创建一个cell对象放在这个位置上面。
3.如果这个位置上面不为null,首先会尝试更新这个cell的value值,如果更新成功的话,表示更新操作完成,否则执行第4步。
4.如果第3步的更新操作失败,表示此时counterCells数组的容量可能不足以应付这么多的线程了。所以此时counterCells数组需要扩容。
9.扩容操作
当我们put一个元素之后,如果此时元素已经达到了扩容的阈值了,就需要扩容。我们先来看看addCount方里面的扩容部分:
if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); //表示已经有线程在进行扩容操作 if (sc < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } //当前线程是唯一的或是第一个发起扩容的线程 此时nextTable=null else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } }
addcount方法扩容部分有两种情况:1. sc<0情况下,表示当前已经有线程在进行扩容操作了,此时当前这个线程需要参与帮助扩容的任务中来;2. 另一个情况,当前线程只有一个线程准备扩容操作。第一种情况为什么不调用helperTransfer方法来表示帮助扩容的含义,这个原因,我也不是很懂,但是比较了第一种情况的代码与helperTransfer方法的代码,两者其实比较类似。
现在我们来看看transfer方法究竟为我们做了什么。由于transfer方法代码过长,所以这里不完全展出来,这里使用分段形式来解释。
首先,如果是第一个线程调用transfer方法进行扩容操作,那么nextTable肯定为null。所以transfer方法的第一步就是创建新数组,新数组的容量是旧数组的两倍:
if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; }
扩容之后,此时需要将原来的数组进行复制操作了。这个过程支持多线程操作。在看这个过程之前,我们先来看看几个变量的含义。
变量名 | 类型 | 描述 |
---|---|---|
fwd | ForwardingNode | 用来作为占位符,表示当前位置已经被处理过 |
advance | boolean | advance为true,表示当前这个节点已经被处理,这个与fwd区别在于,advance为false表示当前节点(i位置)可以被处理 |
finishing | boolean | finishing为true,表示当前扩容操作已经操作完毕 |
接下来,我们来分析一下transfer方法的执行过程。大体的流程是,遍历这个原来的桶数组,然后复制到新数组里面去。
首先,必须找到一个可以被处理的Node节点,这个节点可以为null,也可以不为null,但是不能为fwd对象,因为如果是fwd对象,表示当前已经被处理过,没必要再去处理。
while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } }
上面的代码中,只要advance为false,就退出while循环,表示当前尝试着处理这个位置上的Node。这里说的是尝试着处理,意思就是,可能不会被处理,比如说,已经有现在在处理了;或者这个位置已经被处理,已经为fwd了。
首先,如果这个位置上为null的话,那么直接将这个位置设置成fwd。
else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd);
如果当前这个节点已经被处理了,直接将advance设置为true,进行下一次位置的选择。
else if ((fh = f.hash) == MOVED) advance = true; // already processed
其他情况下,表示可以处理这个节点,这里处理的意思,是可以将这个位置上的所有节点拷贝到新数组里面去。这里分为两种情况,我们来一一的分析。
首先,第一种情况就是节点本身为一个链表结构。
Node<K,V> ln, hn; if (fh >= 0) { int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } //将原来链表上的一部分就放在原位置 setTabAt(nextTab, i, ln); //将原来链表上的另一部分放在 i + n的位置上 setTabAt(nextTab, i + n, hn); //处理完毕,将这个位置上设置为fwd,后面的线程看到之后,知道这个节点已经被处理了 setTabAt(tab, i, fwd); advance = true; }
整个过程是非常简单,就是将原来的链表分为了两部分,一部分在原来的 i 位置上,一部分在 i+ n的位置上。这里涉及到了喜闻乐见的反转链表,这里不对反转链表做解释,后面会详细的解释反转链表的原因,到时候会回来详细的解释这段代码。
现在来看看复制操作的第二种类型。如果当前位置上的Node为TreeBin类型,表示为红黑树,此时要做的操作跟链表的操作也是非常类似,将树分为两个部分,一个部分在原来的i位置上,另一个部分在i + n的位置上。
TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } //如果此时节点数量小于阈值,那么将红黑树变为链表 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true;
最后,如果所有的节点被赋值完毕之后,表示扩容操作已经完成了,此时finishing为true。
if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { //将nextTable置为null,以便下次扩容 nextTable = null; table = nextTab; //将sizeCtl的值调整为长度的0.75 sizeCtl = (n << 1) - (n >>> 1); return; } //sizeCtl减一,表示当前有个线程参与扩容 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } }
10.反转链表的原因
楼主的思路来自于深入分析ConcurrentHashMap1.8的扩容实现,如果大家可以参考一下。刚刚,我们对transfer方法的复制操作有了一个大概的理解,这里将对其详细分析。
在解释这段代码之前,我们先对这段代码里面几个变量有一个初步的理解。
变量名 | 类型 | 描述 |
---|---|---|
runBit | int | 位置为每个节点的hash值与n进行与操作,经过第一次循环,runBit记录的是最后一个hash值变化的Node的hash值。可能这里拗口,待会举一个详细例子。 |
lastRun | Node | 默认值为当前位置第一个Node,经过第一次循环,lastRun记录的是最后一个hash值变化的Node。 |
ln | Node | 默认值为null,是放在新数组 i 位置上的链表头结点。 |
hn | Node | 默认值为null,是放在新数组 i + n 位置上的链表头结点 |
在上面的表格中,提到最后一个hash值变化的Node,这句话可能比较拗口,这里举一个简单的例子来表达意思。

上图中是一个链表的模型,其中,我将这个链表中的节点分为两类:1.一类是节点的hash值相同的;2.第二类是跟第一类的hash值不同的。所以,从这个链表中我们可以得到,lastRun为节点6,runBit也是节点6与n进行与操作得到的值。为什么是节点6呢?因为在6之后,节点类型没有再变了。
Node<K,V> ln, hn; if (fh >= 0) { int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } //将原来链表上的一部分就放在原位置 setTabAt(nextTab, i, ln); //将原来链表上的另一部分放在 i + n的位置上 setTabAt(nextTab, i + n, hn); //处理完毕,将这个位置上设置为fwd,后面的线程看到之后,知道这个节点已经被处理了 setTabAt(tab, i, fwd); advance = true; }
结合源码来分析,runBit默认为节点1的hash和n与操作的值。后面依次记录与上一次runBit的不同值,最后通过runBit == 0将所有的节点分为了两类。
现在我们结合上面的图做一个假设。我们假设紫色节点的hash值&n为0,那么蓝色节点的hash值&n就不为0。经过第一次循环,得出 ln = lastRun = 节点6,hn = null。
此时我们来分析第二次循环,先来看看紫色节点构成ln链表的过程。

我看到ln链表只是部分反转了,默认部分是没有反转了。其中得到的ln链表放在新数组的i位置上。
然后我们再来看看hn链表的构成过程。

我们发现hn链表是完全反转的。从而,我们得出一个结论,哪个链表默认为null,那个链表就是完全反转的。
最终,我们终于知道了,一个反转链表得出的原因,其实不是故意为之,而是加快复制操作速度,因为lastRun之后的节点进行复制操作不需要创建新节点。
11.get操作
前面详细的说了put操作和扩容操作,这里简单的分析一下get操作,get操作相比于前面的操作就非常的简单。先来看看源码:
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { //匹配成功 if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } //当节点为TreeBin或者ForwardingNode else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; //遍历链表 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
整个get操作分为三种情况:1.table数组指定位置的第一个节点就匹配成功,直接返回;2.table数组指定位置上的hash值小于0,此时当前位置可能已经被其他在扩容时处理过,或者当前位置的Node为一个TreeBin,不管是那种类型的Node,调用的都是find方法来获取Node节点;3.其余情况下,直接遍历链表查找。
12.size操作
size操作里面,主要有两个方法提供,一个是传统的size方法,一个是ConcurrentHashMap比较提倡的mappingCount方法。我们先来看看size方法:
public int size() { long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); }
size方法里面调用sumCount方法,sumCount方法才是真正计算元素个数的方法。我们来看看sumCount方法:
final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
可能大家对这个方法计算方式不太好理解,其实这种计算方式取决于前面的设计方式。在前面我说到过,map的元素个数 = baseCount + 所有的cell的value值。这个结论是因为多线程更新个数时,如果只在baseCount里面记录,一个原因是多线程更新可能会出错,另一个原因是竞争性太大了。所以,设计理念就变成了,当baseCount更新失败时,表示baseCount变量当前忙碌,可以将这个增量添加到一个不忙的cell里面去,这样竞争性就降低了。
我们再来看看mappingCount方法:
public long mappingCount() { long n = sumCount(); return (n < 0L) ? 0L : n; // ignore transient negative values }
达到的效果感觉跟size都差不多,只不过是一个返回int,一个返回long而已。
原著是一个有趣的人,若有侵权,请通知删除
还没有人抢沙发呢~