配色: 字号:
Java并发编程与技术内幕
2016-10-12 | 阅:  转:  |  分享 
  
Java并发编程与技术内幕:ConcurrentHashMap源码解析



ConcurrentHashMap是Java并发包中非常有用的一个类,在高并发场景用得非常多,它是线程安全的。要注意到虽然HashTable虽然也是线程安全的,但是它的性能在高并发场景下完全比不上ConcurrentHashMap,这也是由它们的结构所决定的,可以认为ConcurrentHashMap是HashTable的加强版,不过这加强版和原来的HashTable有非常大的区别,不仅是在结构上,而且在方法上也有差别。

下面先来简单说说它们的区别吧HashMap、HashTable、ConcurrentHashMap的区别吧

1、HashMap是非线程安全,HashTable、ConcurrentHashMap都是线程安全,而且ConcurrentHashMap、Hashtable不能传入nul的key或value,HashMap可以。

2、Hashtable是将数据放入到一个Entrty数组或者它Entrty数组上一个Entrty的链表节点。而ConcurrentHashMap是由Segment数组组成,每一个Segment可以看成是一个单独的Map.然后每个Segment里又有一个HashEntrty数组用来存放数据。

3、HashTable的get/put/remove方法都是基于同步的synchronized方法,而ConcurrentHashMap是基本锁的机制,并且每次不是锁全表,而是锁单独的一个Segment。所以ConcurrentHashMap的性能比HashTable好。

4、如果不考虑线程安全因素,推荐使用HashMap,因为它性能最好。



首先来看看它包含的结构吧!

[java]viewplaincopy在CODE上查看代码片派生到我的代码片

publicclassConcurrentHashMapextendsAbstractMap

implementsConcurrentMap,Serializable{

privatestaticfinallongserialVersionUID=7249069246763182397L;



//默认数组容量大小

staticfinalintDEFAULT_INITIAL_CAPACITY=16;



//默认装载因子

staticfinalfloatDEFAULT_LOAD_FACTOR=0.75f;



//默认层级

staticfinalintDEFAULT_CONCURRENCY_LEVEL=16;



//最大的每一个数组容量大小

staticfinalintMAXIMUM_CAPACITY=1<<30;



//最大的分组数目

staticfinalintMAX_SEGMENTS=1<<16;//slightlyconservative



//调用remove/contain/replace方法时不加锁的情况下操作重试次数

staticfinalintRETRIES_BEFORE_LOCK=2;



//segments数组索引相关

finalintsegmentMask;



//segments数组偏移相关

finalintsegmentShift;



//segments数组,每个segments单独就可以认为是一个map

finalSegment[]segments;

这里看到了一个Segment[]segments;数组。下面再来看看Segment这个类。在下面可以看到Segment这个类继承了ReentrantLock锁。所以它也是一个锁。然后它里面还有一个HashEntry[]table。这是真正用来存放数据的结构。

[java]viewplaincopy在CODE上查看代码片派生到我的代码片

/

Segment内部类,注意它也是一个锁!可以认为它是一个带有锁方法的map

/

staticfinalclassSegmentextendsReentrantLockimplementsSerializable{



privatestaticfinallongserialVersionUID=2249069246763182397L;



//元素个数

transientvolatileintcount;

//修改次数

transientintmodCount;

//阈值,超过这个数会重新reSize

transientintthreshold;

//注意,这里又一个数组,这个是真正存放数据元素的地方

transientvolatileHashEntry[]table;

//装载因子,用来计算threshold

finalfloatloadFactor;



HashEntry它的结构很简单:

[java]viewplaincopy在CODE上查看代码片派生到我的代码片

staticfinalclassHashEntry{

finalKkey;

finalinthash;//用来保存Segment索引的信息

volatileVvalue;

finalHashEntrynext;



HashEntry(Kkey,inthash,HashEntrynext,Vvalue){

this.key=key;

this.hash=hash;

this.next=next;

this.value=value;

}



@SuppressWarnings("unchecked")

staticfinalHashEntry[]newArray(inti){

returnnewHashEntry[i];

}

}



经过上面的分析:可以得出如下的ConcurrentHashMap结构图



全部源码分析:

[java]viewplaincopy在CODE上查看代码片派生到我的代码片

packagejava.util.concurrent;

importjava.util.concurrent.locks.;

importjava.util.;

importjava.io.Serializable;

importjava.io.IOException;

importjava.io.ObjectInputStream;

importjava.io.ObjectOutputStream;



publicclassConcurrentHashMapextendsAbstractMap

implementsConcurrentMap,Serializable{

privatestaticfinallongserialVersionUID=7249069246763182397L;



//默认数组容量大小

staticfinalintDEFAULT_INITIAL_CAPACITY=16;



//默认装载因子

staticfinalfloatDEFAULT_LOAD_FACTOR=0.75f;



//默认层级

staticfinalintDEFAULT_CONCURRENCY_LEVEL=16;



//最大的每一个数组容量大小

staticfinalintMAXIMUM_CAPACITY=1<<30;



//最大的分组数目

staticfinalintMAX_SEGMENTS=1<<16;//slightlyconservative



//调用remove/contain/replace方法时不加锁的情况下操作重试次数

staticfinalintRETRIES_BEFORE_LOCK=2;



//segments数组索引相关

finalintsegmentMask;



//segments数组偏移相关

finalintsegmentShift;



//segments数组,每个segments单独就可以认为是一个map

finalSegment[]segments;



/

哈希算法

/

privatestaticinthash(inth){

//Spreadbitstoregularizebothsegmentandindexlocations,

//usingvariantofsingle-wordWang/Jenkinshash.

h+=(h<<15)^0xffffcd7d;

h^=(h>>>10);

h+=(h<<3);

h^=(h>>>6);

h+=(h<<2)+(h<<14);

returnh^(h>>>16);

}



/

根据哈希值计算应该落在哪个segments上

/

finalSegmentsegmentFor(inthash){

returnsegments[(hash>>>segmentShift)&segmentMask];

}





/

内部类,每个HashEntry都会存入到一个Segment中去

/

staticfinalclassHashEntry{

finalKkey;//关键字

finalinthash;//哈希值

volatileVvalue;//值

finalHashEntrynext;//不同的关键字,相再的哈希值时会组成一个链表



HashEntry(Kkey,inthash,HashEntrynext,Vvalue){

this.key=key;

this.hash=hash;

this.next=next;

this.value=value;

}



@SuppressWarnings("unchecked")

staticfinalHashEntry[]newArray(inti){

returnnewHashEntry[i];

}

}



/

Segment内部类,注意它也是一个锁!可以认为它是一个带有锁方法的map

/

staticfinalclassSegmentextendsReentrantLockimplementsSerializable{



privatestaticfinallongserialVersionUID=2249069246763182397L;



//元素个数

transientvolatileintcount;

//修改次数

transientintmodCount;

//阈值,超过这个数会重新reSize

transientintthreshold;

//注意,这里又一个数组,这个是真正存放数据元素的地方

transientvolatileHashEntry[]table;

//装载因子,用来计算threshold

finalfloatloadFactor;



//构造函数,由initialCapacity确定table的大小

Segment(intinitialCapacity,floatlf){

loadFactor=lf;

setTable(HashEntry.newArray(initialCapacity));

}



@SuppressWarnings("unchecked")

staticfinalSegment[]newArray(inti){

returnnewSegment[i];

}



//设置threshold、table

voidsetTable(HashEntry[]newTable){

threshold=(int)(newTable.lengthloadFactor);//注意,当table的元素个数超过这个时,会触发reSize;

table=newTable;

}



//取得头一个

HashEntrygetFirst(inthash){

HashEntry[]tab=table;

returntab[hash&(tab.length-1)];

}



//在加锁情况下读数据,注意这个类继续了锁的方法

VreadValueUnderLock(HashEntrye){

lock();

try{

returne.value;

}finally{

unlock();

}

}





//取元素

Vget(Objectkey,inthash){

if(count!=0){//注意,没有加锁

HashEntrye=getFirst(hash);//取得头一个

while(e!=null){//依次从table中取出元素判断

if(e.hash==hash&&key.equals(e.key)){//hash和key同时相等才表示存在

Vv=e.value;

if(v!=null)//有可能在这里时,运行了删除元素导致为Null,一般发生比较少

returnv;

returnreadValueUnderLock(e);//重新在加锁情况下读数据

}

e=e.next;

}

}

returnnull;

}

//是否包含一个元素

booleancontainsKey(Objectkey,inthash){

if(count!=0){//

HashEntrye=getFirst(hash);

while(e!=null){

if(e.hash==hash&&key.equals(e.key))

returntrue;

e=e.next;

}

}

returnfalse;

}

//是否包含一个元素

booleancontainsValue(Objectvalue){

if(count!=0){//read-volatile

HashEntry[]tab=table;

intlen=tab.length;

for(inti=0;i
for(HashEntrye=tab[i];e!=null;e=e.next){//table数组循环读数

Vv=e.value;

if(v==null)//recheck

v=readValueUnderLock(e);

if(value.equals(v))

returntrue;

}

}

}

returnfalse;

}

//替换时要加锁

booleanreplace(Kkey,inthash,VoldValue,VnewValue){

lock();

try{

HashEntrye=getFirst(hash);

while(e!=null&&(e.hash!=hash||!key.equals(e.key)))//hash和key要同时相等才表示是找到了这个元素

e=e.next;



booleanreplaced=false;

if(e!=null&&oldValue.equals(e.value)){//判断是否要进行替换

replaced=true;

e.value=newValue;

}

returnreplaced;

}finally{

unlock();

}

}

//替换时要加锁

Vreplace(Kkey,inthash,VnewValue){

lock();

try{

HashEntrye=getFirst(hash);

while(e!=null&&(e.hash!=hash||!key.equals(e.key)))

e=e.next;



VoldValue=null;

if(e!=null){

oldValue=e.value;

e.value=newValue;

}

returnoldValue;

}finally{

unlock();

}

}



//放入一个元素,onlyIfAbsent如果有false表示替换原来的旧值

Vput(Kkey,inthash,Vvalue,booleanonlyIfAbsent){

lock();

try{

intc=count;

if(c++>threshold)//table数组里的元素超过threshold。触发rehash,其实也就是扩大table

rehash();

HashEntry[]tab=table;

intindex=hash&(tab.length-1);

HashEntryfirst=tab[index];//头一个

HashEntrye=first;

while(e!=null&&(e.hash!=hash||!key.equals(e.key)))//一直不断判断不重复才停止

e=e.next;



VoldValue;

if(e!=null){//这个key、hash已经存在,修改原来的

oldValue=e.value;

if(!onlyIfAbsent)

e.value=value;//替换原来的旧值

}

else{//这个key、hash已经不存在,加入一个新的

oldValue=null;

++modCount;

tab[index]=newHashEntry(key,hash,first,value);//加入一个新的元素

count=c;//个数变化

}

returnoldValue;

}finally{

unlock();

}

}



//重新哈希

voidrehash(){

HashEntry[]oldTable=table;

intoldCapacity=oldTable.length;//旧容量

if(oldCapacity>=MAXIMUM_CAPACITY)//超过默认的最大容量时就退出了

return;



HashEntry[]newTable=HashEntry.newArray(oldCapacity<<1);//这里直接在原来的基础上扩大1倍

threshold=(int)(newTable.lengthloadFactor);//重新计算新的阈值

intsizeMask=newTable.length-1;

for(inti=0;i
HashEntrye=oldTable[i];



if(e!=null){//旧table上该处有数据

HashEntrynext=e.next;

intidx=e.hash&sizeMask;



//单个节点key-value

if(next==null)

newTable[idx]=e;



else{//链表节点key-value

HashEntrylastRun=e;

intlastIdx=idx;

for(HashEntrylast=next;

last!=null;

last=last.next){//这里重新计算了链表上最后一个节点的位置

intk=last.hash&sizeMask;

if(k!=lastIdx){

lastIdx=k;

lastRun=last;

}

}

newTable[lastIdx]=lastRun;//将原table上对应的链表上的最后一个元素放在新table对应链表的首位置



for(HashEntryp=e;p!=lastRun;p=p.next){//for循环依次拷贝链表上的数据,注意最后整个链表相对原来会倒序排列

intk=p.hash&sizeMask;

HashEntryn=newTable[k];

newTable[k]=newHashEntry(p.key,p.hash,

n,p.value);//新table数据赋值

}

}

}

}

table=newTable;

}



//删除一个元素

Vremove(Objectkey,inthash,Objectvalue){

lock();//删除要加锁

try{

intc=count-1;

HashEntry[]tab=table;

intindex=hash&(tab.length-1);

HashEntryfirst=tab[index];

HashEntrye=first;

while(e!=null&&(e.hash!=hash||!key.equals(e.key)))

e=e.next;



VoldValue=null;

if(e!=null){///找到元素

Vv=e.value;

if(value==null||value.equals(v)){//为null或者value相等时才删除

oldValue=v;

++modCount;

HashEntrynewFirst=e.next;

for(HashEntryp=first;p!=e;p=p.next)

newFirst=newHashEntry(p.key,p.hash,

newFirst,p.value);//注意它这里会倒换原来链表的位置

tab[index]=newFirst;

count=c;//记录数减去1

}

}

returnoldValue;

}finally{

unlock();

}

}

//清空整个map

voidclear(){

if(count!=0){

lock();

try{

HashEntry[]tab=table;

for(inti=0;i
tab[i]=null;//直接赋值为null

++modCount;

count=0;//write-volatile

}finally{

unlock();

}

}

}

}







//构造函数

publicConcurrentHashMap(intinitialCapacity,

floatloadFactor,intconcurrencyLevel){

if(!(loadFactor>0)||initialCapacity<0||concurrencyLevel<=0)//loadFactor和initialCapacity都得大于0

thrownewIllegalArgumentException();



if(concurrencyLevel>MAX_SEGMENTS)

concurrencyLevel=MAX_SEGMENTS;



//Findpower-of-twosizesbestmatchingarguments

intsshift=0;

intssize=1;

while(ssize
++sshift;

ssize<<=1;

}

segmentShift=32-sshift;

segmentMask=ssize-1;

this.segments=Segment.newArray(ssize);



if(initialCapacity>MAXIMUM_CAPACITY)

initialCapacity=MAXIMUM_CAPACITY;

intc=initialCapacity/ssize;

if(cssize
++c;

intcap=1;

while(cap
cap<<=1;



for(inti=0;i
this.segments[i]=newSegment(cap,loadFactor);//为每个segments初始化其里面的数组

}



//构造函数

publicConcurrentHashMap(intinitialCapacity,floatloadFactor){

this(initialCapacity,loadFactor,DEFAULT_CONCURRENCY_LEVEL);

}





//构造函数

publicConcurrentHashMap(intinitialCapacity){

this(initialCapacity,DEFAULT_LOAD_FACTOR,DEFAULT_CONCURRENCY_LEVEL);

}





//默认构造函数

publicConcurrentHashMap(){

this(DEFAULT_INITIAL_CAPACITY,DEFAULT_LOAD_FACTOR,DEFAULT_CONCURRENCY_LEVEL);

}



//构造函数

publicConcurrentHashMap(Mapm){

this(Math.max((int)(m.size()/DEFAULT_LOAD_FACTOR)+1,

DEFAULT_INITIAL_CAPACITY),

DEFAULT_LOAD_FACTOR,DEFAULT_CONCURRENCY_LEVEL);

putAll(m);

}



//判断是否为空

publicbooleanisEmpty(){

finalSegment[]segments=this.segments;//取得整个Segment数组



int[]mc=newint[segments.length];

intmcsum=0;

for(inti=0;i
if(segments[i].count!=0)//有一个segments数组元素个数不为0,那么整个map肯定不为空

returnfalse;

else

mcsum+=mc[i]=segments[i].modCount;//累加总的修改次数

}



if(mcsum!=0){

for(inti=0;i
if(segments[i].count!=0||

mc[i]!=segments[i].modCount)//这里又做了一次mc[i]!=segments[i].modCount判断,因为segments[i].count=0时才会跳到这里,不相等那么肯定是又有元素加入

returnfalse;

}

}

returntrue;

}



//整个mapr的大小,这里的大小指的是存放元素的个数

publicintsize(){

finalSegment[]segments=this.segments;

longsum=0;

longcheck=0;

int[]mc=newint[segments.length];



//这是里的for循环是尝试在不加锁的情况下来获取整个map的元素个数

for(intk=0;k
check=0;

sum=0;

intmcsum=0;

for(inti=0;i
sum+=segments[i].count;//累加每一个segments上的count

mcsum+=mc[i]=segments[i].modCount;//累加每一个segments上的modCount

}

if(mcsum!=0){//修改次数不为0,要再做一次判断前后两次的modCount,count的累加

for(inti=0;i
check+=segments[i].count;

if(mc[i]!=segments[i].modCount){//前后两次数据发生了变化

check=-1;//前后两次取的个数不一到,注意sum还是之前的

break;

}

}

}

if(check==sum)//前后两次取的元素个数一样,直接跳出循环

break;

}



//这里会尝试在加锁的情况下来获取整个map的元素个数

if(check!=sum){//这里一般check会等于-1才发生

sum=0;//重新置0

for(inti=0;i
segments[i].lock();//每一个segments上锁

for(inti=0;i
sum+=segments[i].count;//重新累加

for(inti=0;i
segments[i].unlock();//依次释放锁

}

if(sum>Integer.MAX_VALUE)

returnInteger.MAX_VALUE;//如果大于最大值,返回最大值

else

return(int)sum;

}



//取得一个元素,先是不加锁情况下去读

publicVget(Objectkey){

inthash=hash(key.hashCode());

returnsegmentFor(hash).get(key,hash);//具体看上面的代码注释

}



//是否包含一个元素,根据key来获取

publicbooleancontainsKey(Objectkey){

inthash=hash(key.hashCode());

returnsegmentFor(hash).containsKey(key,hash);

}



//是否包含一个元素,根据value来获取

publicbooleancontainsValue(Objectvalue){

if(value==null)

thrownewNullPointerException();

//取得整个Segment的内容

finalSegment[]segments=this.segments;

int[]mc=newint[segments.length];



//尝试在不加锁的情况下做判断

for(intk=0;k
intsum=0;

intmcsum=0;

for(inti=0;i
intc=segments[i].count;//累加个数

mcsum+=mc[i]=segments[i].modCount;//累加修改次数

if(segments[i].containsValue(value))//判断是否包含

returntrue;

}

booleancleanSweep=true;

if(mcsum!=0){//成立说明发生了改变

for(inti=0;i
intc=segments[i].count;//累加第二次循环得到的count

if(mc[i]!=segments[i].modCount){//如果有一个segments前后两次不一样,那么它的元素肯定发生了变化

cleanSweep=false;//

break;//跳出

}

}

}

if(cleanSweep)//为ture表示经过上面的两次判断还是无法找到

returnfalse;

}



//cleanSweepo为false时,进行下面。注意,这里是在加锁情况下

for(inti=0;i
segments[i].lock();//取得每一个segments的锁

booleanfound=false;

try{

for(inti=0;i
if(segments[i].containsValue(value)){

found=true;

break;

}

}

}finally{

for(inti=0;i
segments[i].unlock();

}

returnfound;

}



//是否包含

publicbooleancontains(Objectvalue){

returncontainsValue(value);

}



//放入一个元素

publicVput(Kkey,Vvalue){

if(value==null)

thrownewNullPointerException();

inthash=hash(key.hashCode());

returnsegmentFor(hash).put(key,hash,value,false);//放入时先根据key的hash值找到存放的segments,再调用其put方法

}



//放入一个元素,如果key或value为null,那么为招出一个异常

publicVputIfAbsent(Kkey,Vvalue){

if(value==null)

thrownewNullPointerException();

inthash=hash(key.www.baiyuewang.nethashCode());

returnsegmentFor(hash).put(key,hash,value,true);

}



//放入一个map

publicvoidputAll(Mapm){

for(Map.Entrye:m.entrySet())//遍历entrySet取出再放入

put(e.getKey(),e.getValue());

}



//删除一个元素,根据key

publicVremove(Objectkey){

inthash=hash(key.hashCode());

returnsegmentFor(hash).remove(key,hash,null);//根据key的hash值找到存放的segments,再调用其remove方法

}



//删除一个元素,根据key-value

publicbooleanremove(Objectkey,Objectvalue){

inthash=hash(key.hashCode());

if(value==null)

returnfalse;

returnsegmentFor(hash).remove(key,hash,value)!=null;//根据key的hash值找到存放的segments,再调用其remove方法

}



//替换元素

publicbooleanreplace(Kkey,VoldValue,VnewValue){

if(oldValue==null||newValue==null)

thrownewNullPointerException();

inthash=hash(key.hashCode());

returnsegmentFor(hash).replace(key,hash,oldValue,newValue);//根据key的hash值找到存放的segments,再调用其replace方法

}



//替换元素

publicVreplace(Kkey,Vvalue){

if(value==null)

thrownewNullPointerException();

inthash=hash(key.hashCode());

returnsegmentFor(hash).replace(key,hash,value);

}



//清空

publicvoidclear(){

for(inti=0;i
segments[i].clear();

}





//序列化方法

privatevoidwriteObject(java.io.ObjectOutputStreams)throwsIOException{

s.defaultWriteObject();



for(intk=0;k
Segmentseg=segments[k];

seg.lock();//注意加锁了

try{

HashEntry[]tab=seg.table;

for(inti=0;i
for(HashEntrye=tab[i];e!=null;e=e.next){

s.writeObject(e.key);

s.writeObject(e.value);

}

}

}finally{

seg.unlock();

}

}

s.writeObject(null);

s.writeObject(null);

}



//反序列化方法

privatevoidreadObject(java.io.ObjectInputStreams)

throwsIOException,ClassNotFoundException{

s.defaultReadObject();



for(inti=0;i
segments[i].setTable(newHashEntry[1]);

}



for(;;){

Kkey=(K)s.readObject();

Vvalue=(V)s.readObject();

if(key==null)

break;

put(key,value);

}

}

}



献花(0)
+1
(本文系thedust79首藏)