最近要研究缓存,不可避免的大量使用HashMap,又专门研究了下并发下的ConcurrentHashMap 关于volatile的HB原则,可以参考:并发总结2--volatile、CAS、HB CHM结构 将一个大的锁分成多个锁,减少竞争 1 初始化public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments // 找到合适的段数,比如传入,27,那最后会得到32 即2^5 // sshift表示5段,可以用来确定每段里面的元素最大的数量 int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } //segmentShift 移动的位数,因为hashcode都是int 32位的,比如segment的数量为2^5次即32个段,那如果通过int&(32-1)即可确定一个32以内的值,但是如果想取高5位来确定在哪个段的话,就要先右移(32-5)27位 segmentShift = 32 - sshift; segmentMask = ssize - 1; this.segments = Segment.newArray(ssize); //确定总容量 if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; //确定每段里面的数量(初始容量/段数) int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; //将数量转成2^n次的形式,作为每个Segment的构造函数 int cap = 1; while (cap < c) cap <<= 1; for (int i = 0; i < this.segments.length; ++i) this.segments[i] = new Segment<K,V>(cap, loadFactor); } 2 Segment段的结构和HashEntry static final class Segment<K,V> extends ReentrantLock implements Serializable { //当前数量,volatile,每次增加/删除都要写count值 transient volatile int count; /* 统计改变的次数,对多个段遍历时用来检测某个段是否改变过,只增不减 */ transient int modCount; //阀值,扩容用 transient int threshold; //元素 transient volatile HashEntry<K,V>[] table; //装填因子,确定阀值 final float loadFactor; } Segment直接继承了ReentrantLock,相当于每个段都是一把锁,里面的元素是一个HashEntry数组。 大部分字段都是vloatile的,确保了happends-before语义,“多volatile字段的写入操作happends-befire于每一个后续的同一个字段的读操作” static final class HashEntry<K,V> { final K key; final int hash; volatile V value; final HashEntry<K,V> next; } HashEntry的结构和HashMap的Entry结构相同 3 Get操作相比较于HashTable,get操作首先多了一步定位segment /** * 这个hash函数的作用主要是为了防止有些不好的hash函数引起聚集,所以再进行hash一次 */ private static int hash(int h) { // Spread bits to regularize both segment and index locations, // using variant of single-word Wang/Jenkins hash. h += (h << 15) ^ 0xffffcd7d; h ^= (h >>> 10); h += (h << 3); h ^= (h >>> 6); h += (h << 2) + (h << 14); return h ^ (h >>> 16); } public V get(Object key) { int hash = hash(key.hashCode()); return segmentFor(hash).get(key, hash); } final Segment<K,V> segmentFor(int hash) { //为什么使用segmentMask,参看hashtable里的,为了让更多的位参与运算 //利用hashcode的前几位定位segment return segments[(hash >>> segmentShift) & segmentMask]; } HashEntry<K,V> getFirst(int hash) { //利用hashcode的后几位定位HashEntry //没有都是用后几位或者前几位,可能是怕segment长度和hashentry数组长度相当的时候就会引起聚集,比如定位到segment是15,那HashEntry里的位子就永远都是15 HashEntry<K,V>[] tab = table; return tab[hash & (tab.length - 1)]; } V get(Object key, int hash) { //先读count字段,根据happend-before原则,确保在另外一个线程count的修改被读到 if (count != 0) { // 有可能读到过期的数据,比如定位到头节点,之后马上执行执行了删除操作,并更新了头节点,这就导致getFirst返回的头节点不是最新的,这是可以容忍的,通过对count的协调,get能读取到几乎最新的数据,要读取最新的,只有完全同步 HashEntry<K,V> e = getFirst(hash); while (e != null) { //遍历操作不需要加锁,因为next节点是final的 if (e.hash == hash && key.equals(e.key)) { V v = e.value; //value是volatile的,保证能取到最新址 //一般没理由是null,put的时候也不可能是null,除非在put的时候,发生重排序,tab[index] = new HashEntry<K,V>(key, hash, first, value) HashEntry对value的赋值和tab[index]可能发生重排序,可能导致节点为空 则可能这个value正在put中,加锁获取 if (v != null) return v; return readValueUnderLock(e); // recheck } e = e.next; } } return null; } 4 Put操作V put(K key, int hash, V value, boolean onlyIfAbsent) { lock(); try { int c = count; //扩容 if (c++ > threshold) // ensure capacity rehash(); HashEntry<K,V>[] tab = table; int index = hash & (tab.length - 1); HashEntry<K,V> first = tab[index]; HashEntry<K,V> e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue; //找到则直接更新value if (e != null) { oldValue = e.value; if (!onlyIfAbsent) e.value = value; } else { oldValue = null; ++modCount; //不然生产一个新的元素 tab[index] = new HashEntry<K,V>(key, hash, first, value); count = c; // write-volatile //先写modCount再写count,确保被其他线程可见 } return oldValue; } finally { unlock(); } } 5 Remove操作 V remove(Object key, int hash, Object value) { lock(); try { int c = count - 1; //将volatile赋给局部变量tab,因为table是volatile变量,读写volatile的开销比较大,编译器也不能对volatile变量的读写做任何优化;直接多次访问非volatile实例没有太大影响,编译器会做相应优化 HashEntry<K,V>[] tab = table; int index = hash & (tab.length - 1); HashEntry<K,V> first = tab[index]; HashEntry<K,V> e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue = null; if (e != null) { V v = e.value; if (value == null || value.equals(v)) { oldValue = v; // 定位到要删除的节点,e, //HashEntry的next都是final的,所以是不能修改的,如果有删除,要将e前面的节点复制一遍,e后面的节点不需要复制,可以重用 ++modCount; HashEntry<K,V> newFirst = e.next; for (HashEntry<K,V> p = first; p != e; p = p.next) newFirst = new HashEntry<K,V>(p.key, p.hash, newFirst, p.value); tab[index] = newFirst; count = c; // write-volatile } } return oldValue; } finally { unlock(); } } 6 全局操作 size/containsValue每个segement的count虽然是volatile的,但是可能获得以后又会有变化,最安全的方法是把put,clar,remove都锁住,但是这样性能又太低,所以CHM使用的方法是先直接统计count,再通过mdCount判断是否有修改,如果有的话,再加锁计算 public int size() { final Segment<K,V>[] segments = this.segments; long sum = 0; long check = 0; int[] mc = new int[segments.length]; // 锁住之前尝试的次数 for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) { check = 0; sum = 0; int mcsum = 0; for (int i = 0; i < segments.length; ++i) { sum += segments[i].count; mcsum += mc[i] = segments[i].modCount; } if (mcsum != 0) { for (int i = 0; i < segments.length; ++i) { check += segments[i].count; if (mc[i] != segments[i].modCount) { check = -1; // force retry break; } } } if (check == sum) break; } //顺序加锁,防止死锁 if (check != sum) { // Resort to locking all segments sum = 0; for (int i = 0; i < segments.length; ++i) segments[i].lock(); for (int i = 0; i < segments.length; ++i) sum += segments[i].count; for (int i = 0; i < segments.length; ++i) segments[i].unlock(); } if (sum > Integer.MAX_VALUE) return Integer.MAX_VALUE; else return (int)sum; } public boolean containsValue(Object value) { ..... for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) { int sum = 0; int mcsum = 0; for (int i = 0; i < segments.length; ++i) { //访问count,虽然C变量完全没用,但是根据happen before原则,count之前的修改,包括modCount,都对这里可见 int c = segments[i].count; mcsum += mc[i] = segments[i].modCount; if (segments[i].containsValue(value)) return true; } boolean cleanSweep = true; if (mcsum != 0) { for (int i = 0; i < segments.length; ++i) { int c = segments[i].count; if (mc[i] != segments[i].modCount) { cleanSweep = false; break; } } } if (cleanSweep) return false; } ..... } 可以看到大量利用了hb原则,虽然count字段没用,但还是先访问一下先count,因为hb原则有传递性 7 遍历比较简单,把每个段连起来就好了 |
|