分享

ConcurrentHashMap的实现分析

 碧海山城 2012-12-23
最近要研究缓存,不可避免的大量使用HashMap,又专门研究了下并发下的ConcurrentHashMap
关于volatile的HB原则,可以参考:并发总结2--volatile、CAS、HB

CHM结构

 

 
 

将一个大的锁分成多个锁,减少竞争

初始化

public ConcurrentHashMap(int initialCapacity,

                             float loadFactor, int concurrencyLevel) {

        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)

            throw new IllegalArgumentException();

        if (concurrencyLevel > MAX_SEGMENTS)

            concurrencyLevel = MAX_SEGMENTS;

        // Find power-of-two sizes best matching arguments

// 找到合适的段数,比如传入,27,那最后会得到32 2^5

// sshift表示5段,可以用来确定每段里面的元素最大的数量

        int sshift = 0;

        int ssize = 1;

        while (ssize < concurrencyLevel) {

            ++sshift;

            ssize <<= 1;

        }

//segmentShift 移动的位数,因为hashcode都是int 32位的,比如segment的数量为2^5次即32个段,那如果通过int&(32-1)即可确定一个32以内的值,但是如果想取高5位来确定在哪个段的话,就要先右移(32-527

        segmentShift = 32 - sshift;

        segmentMask = ssize - 1;

        this.segments = Segment.newArray(ssize);

//确定总容量

        if (initialCapacity > MAXIMUM_CAPACITY)

            initialCapacity = MAXIMUM_CAPACITY;

//确定每段里面的数量(初始容量/段数)

int c = initialCapacity / ssize;

        if (c * ssize < initialCapacity)

            ++c;

//将数量转成2^n次的形式,作为每个Segment的构造函数

        int cap = 1;

        while (cap < c)

            cap <<= 1;

        for (int i = 0; i < this.segments.length; ++i)

            this.segments[i] = new Segment<K,V>(cap, loadFactor);

    }

Segment段的结构和HashEntry

 static final class Segment<K,V> extends ReentrantLock implements Serializable {

//当前数量,volatile,每次增加/删除都要写count

transient volatile int count;

     /* 统计改变的次数,对多个段遍历时用来检测某个段是否改变过,只增不减 */

     transient int modCount;

     //阀值,扩容用

    transient int threshold;

      //元素 

transient volatile HashEntry<K,V>[] table;

 //装填因子,确定阀值

     final float loadFactor;

}

Segment直接继承了ReentrantLock,相当于每个段都是一把锁,里面的元素是一个HashEntry数组。

大部分字段都是vloatile的,确保了happends-before语义,“多volatile字段的写入操作happends-befire于每一个后续的同一个字段的读操作”

static final class HashEntry<K,V> {

        final K key;

        final int hash;

        volatile V value;

        final HashEntry<K,V> next;

}

HashEntry的结构和HashMapEntry结构相同

Get操作

相比较于HashTableget操作首先多了一步定位segment

/**

     * 这个hash函数的作用主要是为了防止有些不好的hash函数引起聚集,所以再进行hash一次

     */

    private static int hash(int h) {

        // Spread bits to regularize both segment and index locations,

        // using variant of single-word Wang/Jenkins hash.

        h += (h <<  15) ^ 0xffffcd7d;

        h ^= (h >>> 10);

        h += (h <<   3);

        h ^= (h >>>  6);

        h += (h <<   2) + (h << 14);

        return h ^ (h >>> 16);

    }

public V get(Object key) {

        int hash = hash(key.hashCode());

        return segmentFor(hash).get(key, hash);

}

final Segment<K,V> segmentFor(int hash) {

//为什么使用segmentMask,参看hashtable里的,为了让更多的位参与运算

//利用hashcode的前几位定位segment

    return segments[(hash >>> segmentShift) & segmentMask];

}

 HashEntry<K,V> getFirst(int hash) {

//利用hashcode的后几位定位HashEntry

//没有都是用后几位或者前几位,可能是怕segment长度和hashentry数组长度相当的时候就会引起聚集,比如定位到segment15,那HashEntry里的位子就永远都是15

            HashEntry<K,V>[] tab = table;

            return tab[hash & (tab.length - 1)];

        }

get(Object key, int hash) {

//先读count字段,根据happend-before原则,确保在另外一个线程count的修改被读到

            if (count != 0) { 

// 有可能读到过期的数据,比如定位到头节点,之后马上执行执行了删除操作,并更新了头节点,这就导致getFirst返回的头节点不是最新的,这是可以容忍的,通过对count的协调,get能读取到几乎最新的数据,要读取最新的,只有完全同步

                HashEntry<K,V> e = getFirst(hash);

                while (e != null) {

//遍历操作不需要加锁,因为next节点是final的

                    if (e.hash == hash && key.equals(e.key)) {

                        V v = e.value;

//value是volatile的,保证能取到最新址

//一般没理由是null,put的时候也不可能是null,除非在put的时候,发生重排序,tab[index] = new  HashEntry<K,V>(key, hash, first, value)

HashEntry对value的赋值和tab[index]可能发生重排序,可能导致节点为空

则可能这个value正在put中,加锁获取

                        if (v != null)

                            return v;

                        return readValueUnderLock(e); // recheck

                    }

                    e = e.next;

                }

            }

            return null;

}

Put操作

V put(K key, int hash, V value, boolean onlyIfAbsent) {

            lock();

            try {

                int c = count;

                //扩容

                if (c++ > threshold// ensure capacity

                    rehash();

                HashEntry<K,V>[] tab = table;

                int index = hash & (tab.length - 1);

                HashEntry<K,V> first = tab[index];

                HashEntry<K,V> e = first;

                while (e != null && (e.hash != hash || !key.equals(e.key)))

                    e = e.next;

                V oldValue;

  //找到则直接更新value

                if (e != null) {

                    oldValue = e.value;

                    if (!onlyIfAbsent)

                        e.value = value;

                }

                else {

                    oldValue = null;

                    ++modCount;

//不然生产一个新的元素

                    tab[index] = new HashEntry<K,V>(key, hash, first, value);

                    count = c; // write-volatile

//先写modCount再写count,确保被其他线程可见

                }

                return oldValue;

            } finally {

                unlock();

            }

        }

Remove操作

 V remove(Object key, int hash, Object value) {

            lock();

            try {

                int c = count - 1;

//volatile赋给局部变量tab,因为tablevolatile变量,读写volatile的开销比较大,编译器也不能对volatile变量的读写做任何优化;直接多次访问非volatile实例没有太大影响,编译器会做相应优化

                HashEntry<K,V>[] tab = table;

                int index = hash & (tab.length - 1);

                HashEntry<K,V> first = tab[index];

                HashEntry<K,V> e = first;

                while (e != null && (e.hash != hash || !key.equals(e.key)))

                    e = e.next;

                V oldValue = null;

                if (e != null) {

                    V v = e.value;

                    if (value == null || value.equals(v)) {

                        oldValue = v;

                        // 定位到要删除的节点,e

//HashEntrynext都是final的,所以是不能修改的,如果有删除,要将e前面的节点复制一遍,e后面的节点不需要复制,可以重用

                        ++modCount;

                        HashEntry<K,V> newFirst = e.next;

                        for (HashEntry<K,V> p = first; p != e; p = p.next)

                            newFirst = new HashEntry<K,V>(p.key, p.hash,

                                                          newFirst, p.value);

                        tab[index] = newFirst;

                        count = c; // write-volatile

                    }

                }

                return oldValue;

            } finally {

                unlock();

            }

        }

全局操作 size/containsValue

每个segementcount虽然是volatile的,但是可能获得以后又会有变化,最安全的方法是把putclarremove都锁住,但是这样性能又太低,所以CHM使用的方法是先直接统计count,再通过mdCount判断是否有修改,如果有的话,再加锁计算

public int size() {

        final Segment<K,V>[] segments = this.segments;

        long sum = 0;

        long check = 0;

        int[] mc = new int[segments.length];

        // 锁住之前尝试的次数

        for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {

            check = 0;

            sum = 0;

            int mcsum = 0;

            for (int i = 0; i < segments.length; ++i) {

                sum += segments[i].count;

                mcsum += mc[i] = segments[i].modCount;

            }

            if (mcsum != 0) {

                for (int i = 0; i < segments.length; ++i) {

                    check += segments[i].count;

                    if (mc[i] != segments[i].modCount) {

                        check = -1; // force retry

                        break;

                    }

                }

            }

            if (check == sum)

                break;

        }

//顺序加锁,防止死锁

        if (check != sum) { // Resort to locking all segments

            sum = 0;

            for (int i = 0; i < segments.length; ++i)

                segments[i].lock();

            for (int i = 0; i < segments.length; ++i)

                sum += segments[i].count;

            for (int i = 0; i < segments.length; ++i)

                segments[i].unlock();

        }

        if (sum > Integer.MAX_VALUE)

            return Integer.MAX_VALUE;

        else

            return (int)sum;

}

public boolean containsValue(Object value) {

        .....

        for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {

            int sum = 0;

            int mcsum = 0;

            for (int i = 0; i < segments.length; ++i) {

//访问count,虽然C变量完全没用,但是根据happen before原则,count之前的修改,包括modCount,都对这里可见

                int c = segments[i].count;

                mcsum += mc[i] = segments[i].modCount;

                if (segments[i].containsValue(value))

                    return true;

            }

            boolean cleanSweep = true;

            if (mcsum != 0) {

                for (int i = 0; i < segments.length; ++i) {

                    int c = segments[i].count;

                    if (mc[i] != segments[i].modCount) {

                        cleanSweep = false;

                        break;

                    }

                }

            }

            if (cleanSweep)

                return false;

        }

       .....

 }

可以看到大量利用了hb原则,虽然count字段没用,但还是先访问一下先count,因为hb原则有传递性

比较简单,把每个段连起来就好了





    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多