关于文章中的疑问:为什么要构造一个反序链表,放在nextTable的i n的位置上呢,在《深入分析ConcurrentHashMap1.8的扩容实现 》一文中进行了详细分析。 前言HashMap是我们平时开发过程中用的比较多的集合,但它是非线程安全的,在涉及到多线程并发的情况,进行get操作有可能会引起死循环,导致CPU利用率接近100%。 final HashMap<String, String> map = new HashMap<String, String>(2);for (int i = 0; i < 10000; i ) {new Thread(new Runnable() {@Overridepublic void run() {
map.put(UUID.randomUUID().toString(), '');
}
}).start();
} 解决方案有Hashtable和Collections.synchronizedMap(hashMap),不过这两个方案基本上是对读写进行加锁操作,一个线程在读写元素,其余线程必须等待,性能可想而知。 所以,Doug Lea给我们带来了并发安全的ConcurrentHashMap,它的实现是依赖于 Java 内存模型,所以我们在了解 ConcurrentHashMap 的之前必须了解一些底层的知识: 本文源码是JDK8的版本,与之前的版本有较大差异。 JDK1.7分析ConcurrentHashMap采用 分段锁的机制,实现并发的更新操作,底层采用数组 链表的存储结构。
一个 ConcurrentHashMap 实例中包含由若干个 Segment 对象组成的数组,下面我们通过一个图来演示一下 ConcurrentHashMap 的结构: ConcurrentHashMap存储结构.png JDK1.8分析1.8的实现已经抛弃了Segment分段锁机制,利用CAS Synchronized来保证并发更新的安全,底层采用数组 链表 红黑树的存储结构。 Paste_Image.png 重要概念在开始之前,有些重要的概念需要介绍一下:
其中value和next都用volatile修饰,保证并发的可见性。
final class ForwardingNode<K,V> extends Node<K,V> {final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {super(MOVED, null, null, null);this.nextTable = tab;
}
} 只有table发生扩容的时候,ForwardingNode才会发挥作用,作为一个占位符放在table中表示当前节点为null或则已经被移动。 实例初始化实例化ConcurrentHashMap时带参数时,会根据参数调整table的大小,假设参数为100,最终会调整成256,确保table的大小总是2的幂次方,算法如下:
注意,ConcurrentHashMap在构造函数中只会初始化sizeCtl值,并不会直接初始化table,而是延缓到第一次put操作。 table初始化前面已经提到过,table初始化操作会延缓到第一次put行为。但是put是可以并发执行的,Doug Lea是如何实现table只初始化一次的?让我们来看看源码的实现。 private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;while ((tab = table) == null || tab.length == 0) {//如果一个线程发现sizeCtl<0,意味着另外的线程执行CAS操作成功,当前线程只需要让出cpu时间片if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spinelse if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {try {if ((tab = table) == null || tab.length == 0) {int n = (sc > 0) ? sc : DEFAULT_CAPACITY;@SuppressWarnings('unchecked')
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}break;
}
}return tab;
} sizeCtl默认为0,如果ConcurrentHashMap实例化时有传参数,sizeCtl会是一个2的幂次方的值。所以执行第一次put操作的线程会执行Unsafe.compareAndSwapInt方法修改sizeCtl为-1,有且只有一个线程能够修改成功,其它线程通过Thread.yield()让出CPU时间片等待table初始化完成。 put操作假设table已经初始化完成,put操作采用CAS synchronized实现并发插入或更新操作,具体实现如下。
static final int spread(int h) {return (h ^ (h >>> 16)) & HASH_BITS;}
synchronized (f) {if (tabAt(tab, i) == f) {if (fh >= 0) {
binCount = 1;for (Node<K,V> e = f;; binCount) {
K ek;if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;if (!onlyIfAbsent)
e.val = value;break;
}
Node<K,V> pred = e;if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);break;
}
}
}else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;if (!onlyIfAbsent)
p.val = value;
}
}
}
} 在节点f上进行同步,节点插入之前,再次利用tabAt(tab, i) == f判断,防止被其它线程修改。
table扩容当table容量不足的时候,即table的元素数量达到容量阈值sizeCtl,需要对table进行扩容。
这两个过程在单线程下实现很简单,但是ConcurrentHashMap是支持并发插入的,扩容操作自然也会有并发的出现,这种情况下,第二步可以支持节点的并发复制,这样性能自然提升不少,但实现的复杂度也上升了一个台阶。 先看第一步,构建nextTable,毫无疑问,这个过程只能只有单个线程进行nextTable的初始化,具体实现如下:
通过Unsafe.compareAndSwapInt修改sizeCtl值,保证只有一个线程能够初始化nextTable,扩容后的数组长度为原来的两倍,但是容量是原来的1.5。 节点从table移动到nextTable,大体思想是遍历、复制的过程。
遍历过所有的节点以后就完成了复制工作,把table指向nextTable,并更新sizeCtl为新数组大小的0.75倍 ,扩容完成。 红黑树构造注意:如果链表结构中元素超过TREEIFY_THRESHOLD阈值,默认为8个,则把链表转化为红黑树,提高遍历查询效率。 if (binCount != 0) {if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);if (oldVal != null)return oldVal;break;
} 接下来我们看看如何构造树结构,代码如下:
可以看出,生成树节点的代码块是同步的,进入同步代码块之后,再次验证table中index位置元素是否被修改过。 TreeBin(TreeNode<K,V> b) {super(TREEBIN, null, null, null);this.first = b;
TreeNode<K,V> r = null;for (TreeNode<K,V> x = b, next; x != null; x = next) {
next = (TreeNode<K,V>)x.next;
x.left = x.right = null;if (r == null) {
x.parent = null;
x.red = false;
r = x;
}else {
K k = x.key;int h = x.hash;
Class<?> kc = null;for (TreeNode<K,V> p = r;;) {int dir, ph;
K pk = p.key;if ((ph = p.hash) > h)
dir = -1;else if (ph < h)
dir = 1;else if ((kc == null &&
(kc = comparableClassFor(k)) == null) ||
(dir = compareComparables(kc, k, pk)) == 0)
dir = tieBreakOrder(k, pk);
TreeNode<K,V> xp = p;if ((p = (dir <= 0) ? p.left : p.right) == null) {
x.parent = xp;if (dir <= 0)
xp.left = x;elsexp.right = x;
r = balanceInsertion(r, x);break;
}
}
}
}this.root = r;assert checkInvariants(root);
} 主要根据Node节点的hash值大小构建二叉树。这个红黑树的构造过程实在有点复杂,感兴趣的同学可以看看源码。 get操作get操作和put操作相比,显得简单了许多。
总结ConcurrentHashMap 是一个并发散列映射表的实现,它允许完全并发的读取,并且支持给定数量的并发更新。相比于 HashTable 和同步包装器包装的 HashMap,使用一个全局的锁来同步不同线程间的并发访问,同一时间点,只能有一个线程持有锁,也就是说在同一时间点,只能有一个线程能访问容器,这虽然保证多线程间的安全并发访问,但同时也导致对容器的访问变成串行化的了。 END。 |
|
来自: 永恒clek4xeu0r > 《编程》