Java并发编程与技术内幕:ConcurrentHashMap源码解析
ConcurrentHashMap是Java并发包中非常有用的一个类,在高并发场景用得非常多,它是线程安全的。要注意到虽然HashTable虽然也是线程安全的,但是它的性能在高并发场景下完全比不上ConcurrentHashMap,这也是由它们的结构所决定的,可以认为ConcurrentHashMap是HashTable的加强版,不过这加强版和原来的HashTable有非常大的区别,不仅是在结构上,而且在方法上也有差别。
下面先来简单说说它们的区别吧HashMap、HashTable、ConcurrentHashMap的区别吧
1、HashMap是非线程安全,HashTable、ConcurrentHashMap都是线程安全,而且ConcurrentHashMap、Hashtable不能传入nul的key或value,HashMap可以。
2、Hashtable是将数据放入到一个Entrty数组或者它Entrty数组上一个Entrty的链表节点。而ConcurrentHashMap是由Segment数组组成,每一个Segment可以看成是一个单独的Map.然后每个Segment里又有一个HashEntrty数组用来存放数据。
3、HashTable的get/put/remove方法都是基于同步的synchronized方法,而ConcurrentHashMap是基本锁的机制,并且每次不是锁全表,而是锁单独的一个Segment。所以ConcurrentHashMap的性能比HashTable好。
4、如果不考虑线程安全因素,推荐使用HashMap,因为它性能最好。
首先来看看它包含的结构吧!
[java]viewplaincopy在CODE上查看代码片派生到我的代码片
publicclassConcurrentHashMapextendsAbstractMap
implementsConcurrentMap,Serializable{
privatestaticfinallongserialVersionUID=7249069246763182397L;
//默认数组容量大小
staticfinalintDEFAULT_INITIAL_CAPACITY=16;
//默认装载因子
staticfinalfloatDEFAULT_LOAD_FACTOR=0.75f;
//默认层级
staticfinalintDEFAULT_CONCURRENCY_LEVEL=16;
//最大的每一个数组容量大小
staticfinalintMAXIMUM_CAPACITY=1<<30;
//最大的分组数目
staticfinalintMAX_SEGMENTS=1<<16;//slightlyconservative
//调用remove/contain/replace方法时不加锁的情况下操作重试次数
staticfinalintRETRIES_BEFORE_LOCK=2;
//segments数组索引相关
finalintsegmentMask;
//segments数组偏移相关
finalintsegmentShift;
//segments数组,每个segments单独就可以认为是一个map
finalSegment[]segments;
这里看到了一个Segment[]segments;数组。下面再来看看Segment这个类。在下面可以看到Segment这个类继承了ReentrantLock锁。所以它也是一个锁。然后它里面还有一个HashEntry[]table。这是真正用来存放数据的结构。
[java]viewplaincopy在CODE上查看代码片派生到我的代码片
/
Segment内部类,注意它也是一个锁!可以认为它是一个带有锁方法的map
/
staticfinalclassSegmentextendsReentrantLockimplementsSerializable{
privatestaticfinallongserialVersionUID=2249069246763182397L;
//元素个数
transientvolatileintcount;
//修改次数
transientintmodCount;
//阈值,超过这个数会重新reSize
transientintthreshold;
//注意,这里又一个数组,这个是真正存放数据元素的地方
transientvolatileHashEntry[]table;
//装载因子,用来计算threshold
finalfloatloadFactor;
HashEntry它的结构很简单:
[java]viewplaincopy在CODE上查看代码片派生到我的代码片
staticfinalclassHashEntry{
finalKkey;
finalinthash;//用来保存Segment索引的信息
volatileVvalue;
finalHashEntrynext;
HashEntry(Kkey,inthash,HashEntrynext,Vvalue){
this.key=key;
this.hash=hash;
this.next=next;
this.value=value;
}
@SuppressWarnings("unchecked")
staticfinalHashEntry[]newArray(inti){
returnnewHashEntry[i];
}
}
经过上面的分析:可以得出如下的ConcurrentHashMap结构图
全部源码分析:
[java]viewplaincopy在CODE上查看代码片派生到我的代码片
packagejava.util.concurrent;
importjava.util.concurrent.locks.;
importjava.util.;
importjava.io.Serializable;
importjava.io.IOException;
importjava.io.ObjectInputStream;
importjava.io.ObjectOutputStream;
publicclassConcurrentHashMapextendsAbstractMap
implementsConcurrentMap,Serializable{
privatestaticfinallongserialVersionUID=7249069246763182397L;
//默认数组容量大小
staticfinalintDEFAULT_INITIAL_CAPACITY=16;
//默认装载因子
staticfinalfloatDEFAULT_LOAD_FACTOR=0.75f;
//默认层级
staticfinalintDEFAULT_CONCURRENCY_LEVEL=16;
//最大的每一个数组容量大小
staticfinalintMAXIMUM_CAPACITY=1<<30;
//最大的分组数目
staticfinalintMAX_SEGMENTS=1<<16;//slightlyconservative
//调用remove/contain/replace方法时不加锁的情况下操作重试次数
staticfinalintRETRIES_BEFORE_LOCK=2;
//segments数组索引相关
finalintsegmentMask;
//segments数组偏移相关
finalintsegmentShift;
//segments数组,每个segments单独就可以认为是一个map
finalSegment[]segments;
/
哈希算法
/
privatestaticinthash(inth){
//Spreadbitstoregularizebothsegmentandindexlocations,
//usingvariantofsingle-wordWang/Jenkinshash.
h+=(h<<15)^0xffffcd7d;
h^=(h>>>10);
h+=(h<<3);
h^=(h>>>6);
h+=(h<<2)+(h<<14);
returnh^(h>>>16);
}
/
根据哈希值计算应该落在哪个segments上
/
finalSegmentsegmentFor(inthash){
returnsegments[(hash>>>segmentShift)&segmentMask];
}
/
内部类,每个HashEntry都会存入到一个Segment中去
/
staticfinalclassHashEntry{
finalKkey;//关键字
finalinthash;//哈希值
volatileVvalue;//值
finalHashEntrynext;//不同的关键字,相再的哈希值时会组成一个链表
HashEntry(Kkey,inthash,HashEntrynext,Vvalue){
this.key=key;
this.hash=hash;
this.next=next;
this.value=value;
}
@SuppressWarnings("unchecked")
staticfinalHashEntry[]newArray(inti){
returnnewHashEntry[i];
}
}
/
Segment内部类,注意它也是一个锁!可以认为它是一个带有锁方法的map
/
staticfinalclassSegmentextendsReentrantLockimplementsSerializable{
privatestaticfinallongserialVersionUID=2249069246763182397L;
//元素个数
transientvolatileintcount;
//修改次数
transientintmodCount;
//阈值,超过这个数会重新reSize
transientintthreshold;
//注意,这里又一个数组,这个是真正存放数据元素的地方
transientvolatileHashEntry[]table;
//装载因子,用来计算threshold
finalfloatloadFactor;
//构造函数,由initialCapacity确定table的大小
Segment(intinitialCapacity,floatlf){
loadFactor=lf;
setTable(HashEntry.newArray(initialCapacity));
}
@SuppressWarnings("unchecked")
staticfinalSegment[]newArray(inti){
returnnewSegment[i];
}
//设置threshold、table
voidsetTable(HashEntry[]newTable){
threshold=(int)(newTable.lengthloadFactor);//注意,当table的元素个数超过这个时,会触发reSize;
table=newTable;
}
//取得头一个
HashEntrygetFirst(inthash){
HashEntry[]tab=table;
returntab[hash&(tab.length-1)];
}
//在加锁情况下读数据,注意这个类继续了锁的方法
VreadValueUnderLock(HashEntrye){
lock();
try{
returne.value;
}finally{
unlock();
}
}
//取元素
Vget(Objectkey,inthash){
if(count!=0){//注意,没有加锁
HashEntrye=getFirst(hash);//取得头一个
while(e!=null){//依次从table中取出元素判断
if(e.hash==hash&&key.equals(e.key)){//hash和key同时相等才表示存在
Vv=e.value;
if(v!=null)//有可能在这里时,运行了删除元素导致为Null,一般发生比较少
returnv;
returnreadValueUnderLock(e);//重新在加锁情况下读数据
}
e=e.next;
}
}
returnnull;
}
//是否包含一个元素
booleancontainsKey(Objectkey,inthash){
if(count!=0){//
HashEntrye=getFirst(hash);
while(e!=null){
if(e.hash==hash&&key.equals(e.key))
returntrue;
e=e.next;
}
}
returnfalse;
}
//是否包含一个元素
booleancontainsValue(Objectvalue){
if(count!=0){//read-volatile
HashEntry[]tab=table;
intlen=tab.length;
for(inti=0;i for(HashEntrye=tab[i];e!=null;e=e.next){//table数组循环读数
Vv=e.value;
if(v==null)//recheck
v=readValueUnderLock(e);
if(value.equals(v))
returntrue;
}
}
}
returnfalse;
}
//替换时要加锁
booleanreplace(Kkey,inthash,VoldValue,VnewValue){
lock();
try{
HashEntrye=getFirst(hash);
while(e!=null&&(e.hash!=hash||!key.equals(e.key)))//hash和key要同时相等才表示是找到了这个元素
e=e.next;
booleanreplaced=false;
if(e!=null&&oldValue.equals(e.value)){//判断是否要进行替换
replaced=true;
e.value=newValue;
}
returnreplaced;
}finally{
unlock();
}
}
//替换时要加锁
Vreplace(Kkey,inthash,VnewValue){
lock();
try{
HashEntrye=getFirst(hash);
while(e!=null&&(e.hash!=hash||!key.equals(e.key)))
e=e.next;
VoldValue=null;
if(e!=null){
oldValue=e.value;
e.value=newValue;
}
returnoldValue;
}finally{
unlock();
}
}
//放入一个元素,onlyIfAbsent如果有false表示替换原来的旧值
Vput(Kkey,inthash,Vvalue,booleanonlyIfAbsent){
lock();
try{
intc=count;
if(c++>threshold)//table数组里的元素超过threshold。触发rehash,其实也就是扩大table
rehash();
HashEntry[]tab=table;
intindex=hash&(tab.length-1);
HashEntryfirst=tab[index];//头一个
HashEntrye=first;
while(e!=null&&(e.hash!=hash||!key.equals(e.key)))//一直不断判断不重复才停止
e=e.next;
VoldValue;
if(e!=null){//这个key、hash已经存在,修改原来的
oldValue=e.value;
if(!onlyIfAbsent)
e.value=value;//替换原来的旧值
}
else{//这个key、hash已经不存在,加入一个新的
oldValue=null;
++modCount;
tab[index]=newHashEntry(key,hash,first,value);//加入一个新的元素
count=c;//个数变化
}
returnoldValue;
}finally{
unlock();
}
}
//重新哈希
voidrehash(){
HashEntry[]oldTable=table;
intoldCapacity=oldTable.length;//旧容量
if(oldCapacity>=MAXIMUM_CAPACITY)//超过默认的最大容量时就退出了
return;
HashEntry[]newTable=HashEntry.newArray(oldCapacity<<1);//这里直接在原来的基础上扩大1倍
threshold=(int)(newTable.lengthloadFactor);//重新计算新的阈值
intsizeMask=newTable.length-1;
for(inti=0;i HashEntrye=oldTable[i];
if(e!=null){//旧table上该处有数据
HashEntrynext=e.next;
intidx=e.hash&sizeMask;
//单个节点key-value
if(next==null)
newTable[idx]=e;
else{//链表节点key-value
HashEntrylastRun=e;
intlastIdx=idx;
for(HashEntrylast=next;
last!=null;
last=last.next){//这里重新计算了链表上最后一个节点的位置
intk=last.hash&sizeMask;
if(k!=lastIdx){
lastIdx=k;
lastRun=last;
}
}
newTable[lastIdx]=lastRun;//将原table上对应的链表上的最后一个元素放在新table对应链表的首位置
for(HashEntryp=e;p!=lastRun;p=p.next){//for循环依次拷贝链表上的数据,注意最后整个链表相对原来会倒序排列
intk=p.hash&sizeMask;
HashEntryn=newTable[k];
newTable[k]=newHashEntry(p.key,p.hash,
n,p.value);//新table数据赋值
}
}
}
}
table=newTable;
}
//删除一个元素
Vremove(Objectkey,inthash,Objectvalue){
lock();//删除要加锁
try{
intc=count-1;
HashEntry[]tab=table;
intindex=hash&(tab.length-1);
HashEntryfirst=tab[index];
HashEntrye=first;
while(e!=null&&(e.hash!=hash||!key.equals(e.key)))
e=e.next;
VoldValue=null;
if(e!=null){///找到元素
Vv=e.value;
if(value==null||value.equals(v)){//为null或者value相等时才删除
oldValue=v;
++modCount;
HashEntrynewFirst=e.next;
for(HashEntryp=first;p!=e;p=p.next)
newFirst=newHashEntry(p.key,p.hash,
newFirst,p.value);//注意它这里会倒换原来链表的位置
tab[index]=newFirst;
count=c;//记录数减去1
}
}
returnoldValue;
}finally{
unlock();
}
}
//清空整个map
voidclear(){
if(count!=0){
lock();
try{
HashEntry[]tab=table;
for(inti=0;i tab[i]=null;//直接赋值为null
++modCount;
count=0;//write-volatile
}finally{
unlock();
}
}
}
}
//构造函数
publicConcurrentHashMap(intinitialCapacity,
floatloadFactor,intconcurrencyLevel){
if(!(loadFactor>0)||initialCapacity<0||concurrencyLevel<=0)//loadFactor和initialCapacity都得大于0
thrownewIllegalArgumentException();
if(concurrencyLevel>MAX_SEGMENTS)
concurrencyLevel=MAX_SEGMENTS;
//Findpower-of-twosizesbestmatchingarguments
intsshift=0;
intssize=1;
while(ssize ++sshift;
ssize<<=1;
}
segmentShift=32-sshift;
segmentMask=ssize-1;
this.segments=Segment.newArray(ssize);
if(initialCapacity>MAXIMUM_CAPACITY)
initialCapacity=MAXIMUM_CAPACITY;
intc=initialCapacity/ssize;
if(cssize ++c;
intcap=1;
while(cap cap<<=1;
for(inti=0;i this.segments[i]=newSegment(cap,loadFactor);//为每个segments初始化其里面的数组
}
//构造函数
publicConcurrentHashMap(intinitialCapacity,floatloadFactor){
this(initialCapacity,loadFactor,DEFAULT_CONCURRENCY_LEVEL);
}
//构造函数
publicConcurrentHashMap(intinitialCapacity){
this(initialCapacity,DEFAULT_LOAD_FACTOR,DEFAULT_CONCURRENCY_LEVEL);
}
//默认构造函数
publicConcurrentHashMap(){
this(DEFAULT_INITIAL_CAPACITY,DEFAULT_LOAD_FACTOR,DEFAULT_CONCURRENCY_LEVEL);
}
//构造函数
publicConcurrentHashMap(Mapm){
this(Math.max((int)(m.size()/DEFAULT_LOAD_FACTOR)+1,
DEFAULT_INITIAL_CAPACITY),
DEFAULT_LOAD_FACTOR,DEFAULT_CONCURRENCY_LEVEL);
putAll(m);
}
//判断是否为空
publicbooleanisEmpty(){
finalSegment[]segments=this.segments;//取得整个Segment数组
int[]mc=newint[segments.length];
intmcsum=0;
for(inti=0;i if(segments[i].count!=0)//有一个segments数组元素个数不为0,那么整个map肯定不为空
returnfalse;
else
mcsum+=mc[i]=segments[i].modCount;//累加总的修改次数
}
if(mcsum!=0){
for(inti=0;i if(segments[i].count!=0||
mc[i]!=segments[i].modCount)//这里又做了一次mc[i]!=segments[i].modCount判断,因为segments[i].count=0时才会跳到这里,不相等那么肯定是又有元素加入
returnfalse;
}
}
returntrue;
}
//整个mapr的大小,这里的大小指的是存放元素的个数
publicintsize(){
finalSegment[]segments=this.segments;
longsum=0;
longcheck=0;
int[]mc=newint[segments.length];
//这是里的for循环是尝试在不加锁的情况下来获取整个map的元素个数
for(intk=0;k check=0;
sum=0;
intmcsum=0;
for(inti=0;i sum+=segments[i].count;//累加每一个segments上的count
mcsum+=mc[i]=segments[i].modCount;//累加每一个segments上的modCount
}
if(mcsum!=0){//修改次数不为0,要再做一次判断前后两次的modCount,count的累加
for(inti=0;i check+=segments[i].count;
if(mc[i]!=segments[i].modCount){//前后两次数据发生了变化
check=-1;//前后两次取的个数不一到,注意sum还是之前的
break;
}
}
}
if(check==sum)//前后两次取的元素个数一样,直接跳出循环
break;
}
//这里会尝试在加锁的情况下来获取整个map的元素个数
if(check!=sum){//这里一般check会等于-1才发生
sum=0;//重新置0
for(inti=0;i segments[i].lock();//每一个segments上锁
for(inti=0;i sum+=segments[i].count;//重新累加
for(inti=0;i segments[i].unlock();//依次释放锁
}
if(sum>Integer.MAX_VALUE)
returnInteger.MAX_VALUE;//如果大于最大值,返回最大值
else
return(int)sum;
}
//取得一个元素,先是不加锁情况下去读
publicVget(Objectkey){
inthash=hash(key.hashCode());
returnsegmentFor(hash).get(key,hash);//具体看上面的代码注释
}
//是否包含一个元素,根据key来获取
publicbooleancontainsKey(Objectkey){
inthash=hash(key.hashCode());
returnsegmentFor(hash).containsKey(key,hash);
}
//是否包含一个元素,根据value来获取
publicbooleancontainsValue(Objectvalue){
if(value==null)
thrownewNullPointerException();
//取得整个Segment的内容
finalSegment[]segments=this.segments;
int[]mc=newint[segments.length];
//尝试在不加锁的情况下做判断
for(intk=0;k intsum=0;
intmcsum=0;
for(inti=0;i intc=segments[i].count;//累加个数
mcsum+=mc[i]=segments[i].modCount;//累加修改次数
if(segments[i].containsValue(value))//判断是否包含
returntrue;
}
booleancleanSweep=true;
if(mcsum!=0){//成立说明发生了改变
for(inti=0;i intc=segments[i].count;//累加第二次循环得到的count
if(mc[i]!=segments[i].modCount){//如果有一个segments前后两次不一样,那么它的元素肯定发生了变化
cleanSweep=false;//
break;//跳出
}
}
}
if(cleanSweep)//为ture表示经过上面的两次判断还是无法找到
returnfalse;
}
//cleanSweepo为false时,进行下面。注意,这里是在加锁情况下
for(inti=0;i segments[i].lock();//取得每一个segments的锁
booleanfound=false;
try{
for(inti=0;i if(segments[i].containsValue(value)){
found=true;
break;
}
}
}finally{
for(inti=0;i segments[i].unlock();
}
returnfound;
}
//是否包含
publicbooleancontains(Objectvalue){
returncontainsValue(value);
}
//放入一个元素
publicVput(Kkey,Vvalue){
if(value==null)
thrownewNullPointerException();
inthash=hash(key.hashCode());
returnsegmentFor(hash).put(key,hash,value,false);//放入时先根据key的hash值找到存放的segments,再调用其put方法
}
//放入一个元素,如果key或value为null,那么为招出一个异常
publicVputIfAbsent(Kkey,Vvalue){
if(value==null)
thrownewNullPointerException();
inthash=hash(key.www.baiyuewang.nethashCode());
returnsegmentFor(hash).put(key,hash,value,true);
}
//放入一个map
publicvoidputAll(Mapm){
for(Map.Entrye:m.entrySet())//遍历entrySet取出再放入
put(e.getKey(),e.getValue());
}
//删除一个元素,根据key
publicVremove(Objectkey){
inthash=hash(key.hashCode());
returnsegmentFor(hash).remove(key,hash,null);//根据key的hash值找到存放的segments,再调用其remove方法
}
//删除一个元素,根据key-value
publicbooleanremove(Objectkey,Objectvalue){
inthash=hash(key.hashCode());
if(value==null)
returnfalse;
returnsegmentFor(hash).remove(key,hash,value)!=null;//根据key的hash值找到存放的segments,再调用其remove方法
}
//替换元素
publicbooleanreplace(Kkey,VoldValue,VnewValue){
if(oldValue==null||newValue==null)
thrownewNullPointerException();
inthash=hash(key.hashCode());
returnsegmentFor(hash).replace(key,hash,oldValue,newValue);//根据key的hash值找到存放的segments,再调用其replace方法
}
//替换元素
publicVreplace(Kkey,Vvalue){
if(value==null)
thrownewNullPointerException();
inthash=hash(key.hashCode());
returnsegmentFor(hash).replace(key,hash,value);
}
//清空
publicvoidclear(){
for(inti=0;i segments[i].clear();
}
//序列化方法
privatevoidwriteObject(java.io.ObjectOutputStreams)throwsIOException{
s.defaultWriteObject();
for(intk=0;k Segmentseg=segments[k];
seg.lock();//注意加锁了
try{
HashEntry[]tab=seg.table;
for(inti=0;i for(HashEntrye=tab[i];e!=null;e=e.next){
s.writeObject(e.key);
s.writeObject(e.value);
}
}
}finally{
seg.unlock();
}
}
s.writeObject(null);
s.writeObject(null);
}
//反序列化方法
privatevoidreadObject(java.io.ObjectInputStreams)
throwsIOException,ClassNotFoundException{
s.defaultReadObject();
for(inti=0;i segments[i].setTable(newHashEntry[1]);
}
for(;;){
Kkey=(K)s.readObject();
Vvalue=(V)s.readObject();
if(key==null)
break;
put(key,value);
}
}
}
|
|