【Java容器】ConcurrentHashMap
本文参考了Java3y公众号
重要属性
一些简单的属性就略过了,比如ConcurrentHashMap和HashMap一样初始容量位16,容量也必须是2的倍数,Load_Factor也是0.75f,也使用了懒加载。
/** 它是一个控制标识符,在不同的地方有不同用途,而且它的取值不同,也代表不同的含义。 负数代表正在进行初始化或扩容操作: -1代表正在初始化; -N 表示有N-1个线程正在进行扩容操作; 正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小,这一点类似于扩容阈值的概念。 后面还可以看到,它的值始终是当前ConcurrentHashMap容量的0.75倍,这与loadfactor是对应的。 */ private transient volatile int sizeCtl; // 以下两个是用来控制扩容的时候 单线程进入的变量 /** * The number of bits used for generation stamp in sizeCtl. * Must be at least 6 for 32bit arrays. */ private static int RESIZE_STAMP_BITS = 16; /** * The bit shift for recording size stamp in sizeCtl. */ private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; /* * Encodings for Node hash fields. See above for explanation. */ static final int MOVED = -1; // hash值是-1,表示这是一个forwardNode节点 static final int TREEBIN = -2; // hash值是-2 表示这是一个TreeBin节点
重要内部类
1. Node
Node是最核心的内部类,它包装了key-value键值对,所有插入ConcurrentHashMap的数据都包装在这里面。它与HashMap中的定义很相似,但是但是有一些差别它对value和next属性设置了volatile同步锁,它不允许调用setValue方法直接改变Node的value域(直接抛出异常),它增加了find方法辅助map.get()方法。
static class Node<K,V> implements Map.Entry<K,V> { final int hash; final K key; volatile V val;//带有同步锁的value volatile Node<K,V> next;//带有同步锁的next指针 Node(int hash, K key, V val, Node<K,V> next) { this.hash = hash; this.key = key; this.val = val; this.next = next; } public final K getKey() { return key; } public final V getValue() { return val; } public final int hashCode() { return key.hashCode() ^ val.hashCode(); } public final String toString(){ return key + "=" + val; } //不允许直接改变value的值 public final V setValue(V value) { throw new UnsupportedOperationException(); } public final boolean equals(Object o) { Object k, v, u; Map.Entry<?,?> e; return ((o instanceof Map.Entry) && (k = (e = (Map.Entry<?,?>)o).getKey()) != null && (v = e.getValue()) != null && (k == key || k.equals(key)) && (v == (u = val) || v.equals(u))); } /** * Virtualized support for map.get(); overridden in subclasses. */ Node<K,V> find(int h, Object k) { Node<K,V> e = this; if (k != null) { do { K ek; if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; } while ((e = e.next) != null); } return null; } }
小总结:
- 这个Node内部类与HashMap中定义的Node类很相似,但是有一些差别
- 它对value和next属性设置了volatile同步锁
- 它不允许调用setValue方法直接改变Node的value域
- 它增加了find方法辅助map.get()方法
2. TreeNode
树节点类,另外一个核心的数据结构。当链表长度过长的时候,会转换为TreeNode。但是与HashMap不相同的是,它并不是直接转换为红黑树,而是把这些结点包装成TreeNode放在TreeBin对象中,由TreeBin完成对红黑树的包装。而且TreeNode在ConcurrentHashMap集成自Node类,而并非HashMap中的集成自LinkedHashMap.Entry<K,V>类,也就是说TreeNode带有next指针,这样做的目的是方便基于TreeBin的访问。
3. TreeBin
这个类并不负责包装用户的key、value信息,而是包装的很多TreeNode节点。它代替了TreeNode的根节点,也就是说在实际的ConcurrentHashMap“数组”中,存放的是TreeBin对象,而不是TreeNode对象,这是与HashMap的区别。另外这个类还带有了读写锁。
这里仅贴出它的构造方法。可以看到在构造TreeBin节点时,仅仅指定了它的hash值为TREEBIN常量,这也就是个标识。同时也看到我们熟悉的红黑树构造方法。
/** * Creates bin with initial set of nodes headed by b. */ TreeBin(TreeNode<K,V> b) { super(TREEBIN, null, null, null); this.first = b; TreeNode<K,V> r = null; for (TreeNode<K,V> x = b, next; x != null; x = next) { next = (TreeNode<K,V>)x.next; x.left = x.right = null; if (r == null) { x.parent = null; x.red = false; r = x; } else { K k = x.key; int h = x.hash; Class<?> kc = null; for (TreeNode<K,V> p = r;;) { int dir, ph; K pk = p.key; if ((ph = p.hash) > h) dir = -1; else if (ph < h) dir = 1; else if ((kc == null && (kc = comparableClassFor(k)) == null) || (dir = compareComparables(kc, k, pk)) == 0) dir = tieBreakOrder(k, pk); TreeNode<K,V> xp = p; if ((p = (dir <= 0) ? p.left : p.right) == null) { x.parent = xp; if (dir <= 0) xp.left = x; else xp.right = x; r = balanceInsertion(r, x); break; } } } } this.root = r; assert checkInvariants(root); }
4. ForwardingNode
Node的子类,一个用于连接两个table的节点类。它包含一个nextTable指针,用于指向下一张表。而且这个节点的key value next指针全部为null,它的hash值为-1。 这里面定义的find的方法是从nextTable里进行查询节点,而不是以自身为头节点进行查找。这个类的作用是在并发扩容的时候表明该节点已经被处理过,线程会进入下一个节点。
/** * A node inserted at head of bins during transfer operations. */ static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null); this.nextTable = tab; } Node<K,V> find(int h, Object k) { // loop to avoid arbitrarily deep recursion on forwarding nodes outer: for (Node<K,V>[] tab = nextTable;;) { Node<K,V> e; int n; if (k == null || tab == null || (n = tab.length) == 0 || (e = tabAt(tab, (n - 1) & h)) == null) return null; for (;;) { int eh; K ek; if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; if (eh < 0) { if (e instanceof ForwardingNode) { tab = ((ForwardingNode<K,V>)e).nextTable; continue outer; } else return e.find(h, k); } if ((e = e.next) == null) return null; } } } }
Unsafe类
在ConcurrentHashMap源码最下面有如下一段static变量声明和一段静态代码块。
// Unsafe mechanics private static final sun.misc.Unsafe U; private static final long SIZECTL; private static final long TRANSFERINDEX; private static final long BASECOUNT; private static final long CELLSBUSY; private static final long CELLVALUE; private static final long ABASE; private static final int ASHIFT; static { try { U = sun.misc.Unsafe.getUnsafe(); Class<?> k = ConcurrentHashMap.class; SIZECTL = U.objectFieldOffset (k.getDeclaredField("sizeCtl")); TRANSFERINDEX = U.objectFieldOffset (k.getDeclaredField("transferIndex")); BASECOUNT = U.objectFieldOffset (k.getDeclaredField("baseCount")); CELLSBUSY = U.objectFieldOffset (k.getDeclaredField("cellsBusy")); Class<?> ck = CounterCell.class; CELLVALUE = U.objectFieldOffset (ck.getDeclaredField("value")); Class<?> ak = Node[].class; ABASE = U.arrayBaseOffset(ak); int scale = U.arrayIndexScale(ak); if ((scale & (scale - 1)) != 0) throw new Error("data type scale not a power of two"); ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); } catch (Exception e) { throw new Error(e); } }
其中比较重要的是Unsafe的一个引用,通过它(也就是U),在ConcurrentHashMap中大量使用了U.compareAndSwapXXX的方法,这个方法是利用一个CAS算法实现无锁化的修改值的操作,他可以大大降低锁代理的性能消耗。
ConcurrentHashMap定义了三个原子操作,他们都使用了Unsafe类的方法,用于对指定位置的节点进行操作。正是这些原子操作保证了ConcurrentHashMap的线程安全。
@SuppressWarnings("unchecked") //获得在i位置上的Node节点 static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); } //利用CAS算法设置i位置上的Node节点。之所以能实现并发是因为他指定了原来这个节点的值是多少 //在CAS算法中,会比较内存中的值与你指定的这个值是否相等,如果相等才接受你的修改,否则拒绝你的修改 //因此当前线程中的值并不是最新的值,这种修改可能会覆盖掉其他线程的修改结果 有点类似于SVN static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); } //利用volatile方法设置节点位置的值 static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) { U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v); }
初始化方法initTable
对于ConcurrentHashMap来说,调用它的构造方法仅仅是设置了一些参数而已。而整个table的初始化是在向ConcurrentHashMap中插入元素的时候发生的。如调用put、computeIfAbsent、compute、merge等方法的时候,调用时机是检查table==null。
初始化方法主要应用了关键属性sizeCtl 如果这个值<0,表示其他线程正在进行初始化,就放弃这个操作。在这也可以看出ConcurrentHashMap的初始化只能由一个线程完成。如果获得了初始化权限,就用CAS方法将sizeCtl置为-1,防止其他线程进入。初始化数组后,将sizeCtl的值改为0.75*n。
/** * Initializes table, using the size recorded in sizeCtl. */ private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { //sizeCtl表示有其他线程正在进行初始化操作,把线程挂起。对于table的初始化工作,只能有一个线程在进行。 if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//利用CAS方法把sizectl的值置为-1 表示本线程正在进行初始化 try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2);//相当于0.75*n 设置一个扩容的阈值 } } finally { sizeCtl = sc; } break; } } return tab; }
扩容方法transfer
当ConcurrentHashMap容量不足的时候,需要对table进行扩容。这个方法的基本思想跟HashMap是很像的,但是由于它是支持并发扩容的,所以要复杂的多。原因是它支持多线程进行扩容操作,而并没有加锁。我想这样做的目的不仅仅是为了满足concurrent的要求,而是希望利用并发处理去减少扩容带来的时间影响。因为在扩容的时候,总是会涉及到从一个“数组”到另一个“数组”拷贝的操作,如果这个操作能够并发进行,那真真是极好的了。
整个扩容操作分为两个部分:
第一部分是构建一个nextTable,它的容量是原来的两倍,这个操作是单线程完成的。这个单线程的保证是通过RESIZE_STAMP_SHIFT这个常量经过一次运算来保证的,这个地方在后面会有提到;
第二个部分就是将原来table中的元素复制到nextTable中,这里允许多线程进行操作。
先来看一下单线程是如何完成的:
它的大体思想就是遍历、复制的过程。首先根据运算得到需要遍历的次数i,然后利用tabAt方法获得i位置的元素:
如果这个位置为空,就在原table中的i位置放入forwardNode节点,这个也是触发并发扩容的关键点;
如果这个位置是Node节点(fh>=0),如果它是一个链表的头节点,就构造一个反序链表,把他们分别放在nextTable的i和i+n的位置上
如果这个位置是TreeBin节点(fh<0),也做一个反序处理,并且判断是否需要untreefi,把处理的结果分别放在nextTable的i和i+n的位置上
遍历过所有的节点以后就完成了复制工作,这时让nextTable作为新的table,并且更新sizeCtl为新容量的0.75倍 ,完成扩容。
再看一下多线程是如何完成的:
给每个线程分配16个桶去扩容,小于16的按16算。在代码的69行有一个判断,如果遍历到的节点是forward节点,就向后继续遍历,再加上给节点上锁的机制,就完成了多线程的控制。多线程遍历节点,处理了一个节点,就把对应点的值set为forward,另一个线程看到forward,就向后遍历。这样交叉就完成了复制工作。而且还很好的解决了线程安全的问题。
奉上源码:
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; // 将 length / 8 然后除以 CPU核心数。如果得到的结果小于 16,那么就使用 16。 // 这里的目的是让每个 CPU 处理的桶一样多,避免出现转移任务不均匀的现象,如果桶较少的话,默认一个 CPU(一个线程)处理 16 个桶 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range 细分范围 stridea:TODO // 新的 table 尚未初始化 if (nextTab == null) { // initiating try { // 扩容 2 倍 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; // 更新 nextTab = nt; } catch (Throwable ex) { // try to cope with OOME // 扩容失败, sizeCtl 使用 int 最大值。 sizeCtl = Integer.MAX_VALUE; return;// 结束 } // 更新成员变量 nextTable = nextTab; // 更新转移下标,就是 老的 tab 的 length transferIndex = n; } // 新 tab 的 length int nextn = nextTab.length; // 创建一个 fwd 节点,用于占位。当别的线程发现这个槽位中是 fwd 类型的节点,则跳过这个节点。 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // 首次推进为 true,如果等于 true,说明需要再次推进一个下标(i--),反之,如果是 false,那么就不能推进下标,需要将当前的下标处理完毕才能继续推进 boolean advance = true; // 完成状态,如果是 true,就结束此方法。 boolean finishing = false; // to ensure sweep before committing nextTab // 死循环,i 表示下标,bound 表示当前线程可以处理的当前桶区间最小下标 for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; // 如果当前线程可以向后推进;这个循环就是控制 i 递减。同时,每个线程都会进入这里取得自己需要转移的桶的区间 while (advance) { int nextIndex, nextBound; // 对 i 减一,判断是否大于等于 bound (正常情况下,如果大于 bound 不成立,说明该线程上次领取的任务已经完成了。那么,需要在下面继续领取任务) // 如果对 i 减一大于等于 bound(还需要继续做任务),或者完成了,修改推进状态为 false,不能推进了。任务成功后修改推进状态为 true。 // 通常,第一次进入循环,i-- 这个判断会无法通过,从而走下面的 nextIndex 赋值操作(获取最新的转移下标)。其余情况都是:如果可以推进,将 i 减一,然后修改成不可推进。如果 i 对应的桶处理成功了,改成可以推进。 if (--i >= bound || finishing) advance = false;// 这里设置 false,是为了防止在没有成功处理一个桶的情况下却进行了推进 // 这里的目的是:1. 当一个线程进入时,会选取最新的转移下标。2. 当一个线程处理完自己的区间时,如果还有剩余区间的没有别的线程处理。再次获取区间。 else if ((nextIndex = transferIndex) <= 0) { // 如果小于等于0,说明没有区间了 ,i 改成 -1,推进状态变成 false,不再推进,表示,扩容结束了,当前线程可以退出了 // 这个 -1 会在下面的 if 块里判断,从而进入完成状态判断 i = -1; advance = false;// 这里设置 false,是为了防止在没有成功处理一个桶的情况下却进行了推进 }// CAS 修改 transferIndex,即 length - 区间值,留下剩余的区间值供后面的线程使用 else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound;// 这个值就是当前线程可以处理的最小当前区间最小下标 i = nextIndex - 1; // 初次对i 赋值,这个就是当前线程可以处理的当前区间的最大下标 advance = false; // 这里设置 false,是为了防止在没有成功处理一个桶的情况下却进行了推进,这样对导致漏掉某个桶。下面的 if (tabAt(tab, i) == f) 判断会出现这样的情况。 } }// 如果 i 小于0 (不在 tab 下标内,按照上面的判断,领取最后一段区间的线程扩容结束) // 如果 i >= tab.length(不知道为什么这么判断) // 如果 i + tab.length >= nextTable.length (不知道为什么这么判断) if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { // 如果完成了扩容 nextTable = null;// 删除成员变量 table = nextTab;// 更新 table sizeCtl = (n << 1) - (n >>> 1); // 更新阈值 return;// 结束方法。 }// 如果没完成 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {// 尝试将 sc -1. 表示这个线程结束帮助扩容了,将 sc 的低 16 位减一。 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)// 如果 sc - 2 不等于标识符左移 16 位。如果他们相等了,说明没有线程在帮助他们扩容了。也就是说,扩容结束了。 return;// 不相等,说明没结束,当前线程结束方法。 finishing = advance = true;// 如果相等,扩容结束了,更新 finising 变量 i = n; // 再次循环检查一下整张表 } } else if ((f = tabAt(tab, i)) == null) // 获取老 tab i 下标位置的变量,如果是 null,就使用 fwd 占位。 advance = casTabAt(tab, i, null, fwd);// 如果成功写入 fwd 占位,再次推进一个下标 else if ((fh = f.hash) == MOVED)// 如果不是 null 且 hash 值是 MOVED。 advance = true; // already processed // 说明别的线程已经处理过了,再次推进一个下标 else {// 到这里,说明这个位置有实际值了,且不是占位符。对这个节点上锁。为什么上锁,防止 putVal 的时候向链表插入数据 synchronized (f) { // 判断 i 下标处的桶节点是否和 f 相同 if (tabAt(tab, i) == f) { Node<K,V> ln, hn;// low, height 高位桶,低位桶 // 如果 f 的 hash 值大于 0 。TreeBin 的 hash 是 -2 if (fh >= 0) { // 对老长度进行与运算(第一个操作数的的第n位于第二个操作数的第n位如果都是1,那么结果的第n为也为1,否则为0) // 由于 Map 的长度都是 2 的次方(000001000 这类的数字),那么取于 length 只有 2 种结果,一种是 0,一种是1 // 如果是结果是0 ,Doug Lea 将其放在低位,反之放在高位,目的是将链表重新 hash,放到对应的位置上,让新的取于算法能够击中他。 int runBit = fh & n; Node<K,V> lastRun = f; // 尾节点,且和头节点的 hash 值取于不相等 // 遍历这个桶 for (Node<K,V> p = f.next; p != null; p = p.next) { // 取于桶中每个节点的 hash 值 int b = p.hash & n; // 如果节点的 hash 值和首节点的 hash 值取于结果不同 if (b != runBit) { runBit = b; // 更新 runBit,用于下面判断 lastRun 该赋值给 ln 还是 hn。 lastRun = p; // 这个 lastRun 保证后面的节点与自己的取于值相同,避免后面没有必要的循环 } } if (runBit == 0) {// 如果最后更新的 runBit 是 0 ,设置低位节点 ln = lastRun; hn = null; } else { hn = lastRun; // 如果最后更新的 runBit 是 1, 设置高位节点 ln = null; }// 再次循环,生成两个链表,lastRun 作为停止条件,这样就是避免无谓的循环(lastRun 后面都是相同的取于结果) for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; // 如果与运算结果是 0,那么就还在低位 if ((ph & n) == 0) // 如果是0 ,那么创建低位节点 ln = new Node<K,V>(ph, pk, pv, ln); else // 1 则创建高位 hn = new Node<K,V>(ph, pk, pv, hn); } // 其实这里类似 hashMap // 设置低位链表放在新链表的 i setTabAt(nextTab, i, ln); // 设置高位链表,在原有长度上加 n setTabAt(nextTab, i + n, hn); // 将旧的链表设置成占位符 setTabAt(tab, i, fwd); // 继续向后推进 advance = true; }// 如果是红黑树 else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; // 遍历 for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); // 和链表相同的判断,与运算 == 0 的放在低位 if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } // 不是 0 的放在高位 else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } // 如果树的节点数小于等于 6,那么转成链表,反之,创建一个新的树 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; // 低位树 setTabAt(nextTab, i, ln); // 高位数 setTabAt(nextTab, i + n, hn); // 旧的设置成占位符 setTabAt(tab, i, fwd); // 继续向后推进 advance = true; } } } } } }
put方法
前面的所有的介绍其实都为这个方法做铺垫。ConcurrentHashMap最常用的就是put和get两个方法。现在来介绍put方法,这个put方法依然沿用HashMap的put方法的思想,根据hash值计算这个新插入的点在table中的位置i,如果i位置是空的,直接放进去,否则进行判断,如果i位置是树节点,按照树的方式插入新的节点,否则把i插入到链表的末尾(尾插法)。ConcurrentHashMap中依然沿用这个思想,有一个最重要的不同点就是ConcurrentHashMap不允许key或value为null值。另外由于涉及到多线程,put方法就要复杂一点。在多线程中可能有以下4个情况:
如果一个或多个线程正在对ConcurrentHashMap进行扩容操作,当前线程也要进入扩容的操作中。这个扩容的操作之所以能被检测到,是因为transfer方法中在空结点上插入forward节点,如果检测到需要插入的位置被forward节点占有,就帮助进行扩容;
如果检测到要插入的节点是非空且不是forward节点,就对这个节点加锁,这样就保证了线程安全。尽管这个有一些影响效率,但是还是会比hashTable的synchronized要好得多。
整体流程就是首先定义不允许key或value为null的情况放入 对于每一个放入的值,首先利用spread方法对key的hashcode进行一次hash计算,由此来确定这个值在table中的位置。
如果这个位置是空的,那么直接使用CAS放入桶中,而且不需要加锁操作。
如果这个位置存在结点,说明发生了hash碰撞,首先判断这个节点的类型。如果是链表节点(fh>0),则得到的结点就是hash值相同的节点组成的链表的头节点。需要依次向后遍历确定这个新加入的值所在位置。如果遇到hash值与key值都与新加入节点是一致的情况,则只需要更新value值即可。否则依次向后遍历,直到链表尾插入这个结点。如果加入这个节点以后链表长度大于8,就把这个链表转换成红黑树。如果这个节点的类型已经是树节点的话,直接调用树节点的插入方法进行插入新的值。
来看源码:
public V put(K key, V value) { return putVal(key, value, false); } /** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) { //不允许 key或value为null if (key == null || value == null) throw new NullPointerException(); //计算hash值 int hash = spread(key.hashCode()); int binCount = 0; //死循环 何时插入成功 何时跳出 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //如果table为空的话,初始化table if (tab == null || (n = tab.length) == 0) tab = initTable(); //根据hash值计算出在table里面的位置 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //如果这个位置没有值 ,直接放进去,不需要加锁 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } //当遇到表连接点时,需要进行整合表的操作 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; //结点上锁 这里的结点可以理解为hash值相同组成的链表的头结点 synchronized (f) { if (tabAt(tab, i) == f) { //fh〉0 说明这个节点是一个链表的节点 不是树的节点 if (fh >= 0) { binCount = 1; //在这里遍历链表所有的结点 for (Node<K,V> e = f;; ++binCount) { K ek; //如果hash值和key值相同 则修改对应结点的value值 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; //如果遍历到了最后一个结点,那么就证明新的节点需要插入 就把它插入在链表尾部 if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } //如果这个节点是树节点,就按照树的方式插入值 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { //如果链表长度已经达到临界值8 就需要把链表转换为树结构 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } //将当前ConcurrentHashMap的元素数量+1 addCount(1L, binCount); return null; }
helpTransfer方法
这是一个协助扩容的方法。这个方法被调用的时候,当前ConcurrentHashMap一定已经有了nextTable对象,首先拿到这个nextTable对象,调用transfer方法。回看上面的transfer方法可以看到,当本线程进入扩容方法的时候会直接进入复制阶段。
/** * Helps transfer if a resize is in progress. */ final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { int rs = resizeStamp(tab.length);//计算一个操作校验码 while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab); break; } } return nextTab; } return table; }
treeifyBin方法
这个方法用于将过长的链表转换为TreeBin对象。但是他并不是直接转换,而是进行一次容量判断,如果容量没有达到转换的要求,直接进行扩容操作并返回;如果满足条件才链表的结构抓换为TreeBin ,这与HashMap不同的是,它并没有把TreeNode直接放入红黑树,而是利用了TreeBin这个小容器来封装所有的TreeNode.
private final void treeifyBin(Node<K,V>[] tab, int index) { Node<K,V> b; int n, sc; if (tab != null) { if ((n = tab.length) < MIN_TREEIFY_CAPACITY)//如果table.length<64 就扩大一倍 返回 tryPresize(n << 1); else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { synchronized (b) { if (tabAt(tab, index) == b) { TreeNode<K,V> hd = null, tl = null; //构造了一个TreeBin对象 把所有Node节点包装成TreeNode放进去 for (Node<K,V> e = b; e != null; e = e.next) { TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null);//这里只是利用了TreeNode封装 而没有利用TreeNode的next域和parent域 if ((p.prev = tl) == null) hd = p; else tl.next = p; tl = p; } //在原来index的位置 用TreeBin替换掉原来的Node对象 setTabAt(tab, index, new TreeBin<K,V>(hd)); } } } } }
get方法
get方法比较简单,给定一个key来确定value的时候,必须满足两个条件 key相同 hash值相同,对于节点可能在链表或树上的情况,需要分别去查找。
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; //计算hash值 int h = spread(key.hashCode()); //根据hash值确定节点位置 if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { //如果搜索到的节点key与传入的key相同且不为null,直接返回这个节点 if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } //如果eh<0 说明这个节点在树上 直接寻找 else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; //否则遍历链表 找到对应的值并返回 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
size相关方法
对于ConcurrentHashMap来说,这个table里到底装了多少东西其实是个不确定的数量,因为不可能在调用size()方法的时候像GC的“stop the world”一样让其他线程都停下来让你去统计,因此只能说这个数量是个估计值。对于这个估计值,ConcurrentHashMap也是大费周章才计算出来的。
辅助定义
为了统计元素个数,ConcurrentHashMap定义了一些变量和一个内部类。
/** * A padded cell for distributing counts. Adapted from LongAdder * and Striped64. See their internal docs for explanation. */ @sun.misc.Contended static final class CounterCell { volatile long value; CounterCell(long x) { value = x; } } /******************************************/ /** * 实际上保存的是hashmap中的元素个数 利用CAS锁进行更新 但它并不用返回当前hashmap的元素个数 */ private transient volatile long baseCount; /** * Spinlock (locked via CAS) used when resizing and/or creating CounterCells. */ private transient volatile int cellsBusy; /** * Table of counter cells. When non-null, size is a power of 2. */ private transient volatile CounterCell[] counterCells;
mappingCount与Size方法
mappingCount与size方法的类似 从Java工程师给出的注释来看,应该使用mappingCount代替size方法 两个方法都没有直接返回basecount 而是统计一次这个值,而这个值其实也是一个大概的数值,因此可能在统计的时候有其他线程正在执行插入或删除操作。
public int size() { long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); } /** * Returns the number of mappings. This method should be used * instead of {@link #size} because a ConcurrentHashMap may * contain more mappings than can be represented as an int. The * value returned is an estimate; the actual count may differ if * there are concurrent insertions or removals. * * @return the number of mappings * @since 1.8 */ public long mappingCount() { long n = sumCount(); return (n < 0L) ? 0L : n; // ignore transient negative values } final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value;//所有counter的值求和 } } return sum; }
addCount方法
在put方法结尾处调用了addCount方法,把当前ConcurrentHashMap的元素个数+1这个方法一共做了两件事,更新baseCount的值,检测是否进行扩容。
private final void addCount(long x, int check) { CounterCell[] as; long b, s; //利用CAS方法更新baseCount的值 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); } //如果check值大于等于0 则需要检验是否需要进行扩容操作 if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); // if (sc < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; //如果已经有其他线程在执行扩容操作 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } //当前线程是唯一的或是第一个发起扩容的线程 此时nextTable=null else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } } }
总结
何时会加锁?put时当前节点不为空,以及transfer时都会锁定当前的节点。
为何容量一定是2的指数?计算桶位置时,使用%性能会比较差,然而在n为2的指数时,这个过程等效为e.hash & (n - 1)
,性能就好了。
允许null为key吗?不。
为何使用(e.hash & n) == 0
能判断当前节点需不需要rehash?(e.hash & n) == 0
说明e的哈希值显然是小于旧数组容量或者大于新数组容量的,举个例子就知道了,比如假设容量为16,在第二个桶(i = 1)的位置有如下节点的hash:1、17、33。那么对于这3个节点,如何进行rehash呢,首先,1 & 16 = 0,那么他在新数组的位置不变;然后,17 & 16 = 16,那么他在新数组的位置就是当前位置1 + 旧数组长度16,那么就是17处;最后,33 & 16 = 0,那么他的位置也不动。