来自:mjsws > 馆藏分类
配色: 字号:
解读Java并发队列BlockingQueue
2019-01-15 | 阅:  转:  |  分享 
  
解读Java并发队列BlockingQueue最近得空,想写篇文章好好说说java线程池问题,我相信很多人都一知半解的,包括我自己在仔仔
细细看源码之前,也有许多的不解,甚至有些地方我一直都没有理解到位。说到线程池实现,那么就不得不涉及到各种BlockingQueu
e的实现,那么我想就BlockingQueue的问题和大家分享分享我了解的一些知识。本文没有像之前分析AQS那样一行一行
源码分析了,不过还是把其中最重要和最难理解的代码说了一遍,所以不免篇幅略长。本文涉及到比较多的DougLea对Blocki
ngQueue的设计思想,希望有心的读者真的可以有一些收获,我觉得自己还是写了一些干货的。本文直接参考DougLea写的
javadoc和注释,这也是我们在学习java并发包时最好的材料了。希望大家能有所思、有所悟,学习DougLea的代
码风格,并将其优雅、严谨的作风应用到我们写的每一行代码中。BlockingQueue开篇先介绍下BlockingQueue这个
接口的规则,后面再看其实现。首先,最基本的来说,BlockingQueue是一个先进先出的队列(Queue),为什么说是阻塞(
Blocking)的呢?是因为BlockingQueue支持当获取队列元素但是队列为空时,会阻塞等待队列中有元素再返回;也支持
添加元素时,如果队列已满,那么等到队列可以放入新元素时再放入。BlockingQueue是一个接口,继承自Queue,所以其实
现类也可以作为Queue的实现来使用,而Queue又继承自Collection接口。BlockingQueue对插入
操作、移除操作、获取元素操作提供了四种不同的方法用于不同的场景中使用:1、抛出异常;2、返回特殊值(null或true/fal
se,取决于具体的操作);3、阻塞等待此操作,直到这个操作成功;4、阻塞等待此操作,直到成功或者超时指定时间。总结如下:Block
ingQueue的各个实现都遵循了这些规则,当然我们也不用死记这个表格,知道有这么回事,然后写代码的时候根据自己的需要去看方法的
注释来选取合适的方法即可。www.gw638.cn对于BlockingQueue,我们的关注点应该在put(e)和take
()这两个方法,因为这两个方法是带阻塞的。BlockingQueue不接受null值的插入,相应的方法在碰到null的
插入时会抛出NullPointerException异常。null值在这里通常用于作为特殊值返回(表格中的第三列),代表p
oll失败。所以,如果允许插入null值的话,那获取的时候,就不能很好地用null来判断到底是代表失败,还是获取的值就是
null值。一个BlockingQueue可能是有界的,如果在插入的时候,发现队列满了,那么put操作将会阻塞。通常,
在这里我们说的×××队列也不是说真正的×××,而是它的容量是Integer.MAX_VALUE(21亿多)。BlockingQu
eue是设计用来实现生产者-消费者队列的,当然,你也可以将它当做普通的Collection来用,前面说了,它实现了java
.util.Collection接口。例如,我们可以用remove(x)来删除任意一个元素,但是,这类操作通常并不高效,所以
尽量只在少数的场合使用,比如一条消息已经入队,但是需要做取消操作的时候。BlockingQueue的实现都是线程安全的,但是批量
的集合操作如addAll,containsAll,retainAll和removeAll不一定是原子操作。如addA
ll(c)有可能在添加了一些元素后中途抛出异常,此时BlockingQueue中已经添加了部分元素,这个是允许的,取决于具体
的实现。BlockingQueue不支持close或shutdown等关闭操作,因为开发者可能希望不会有新的元素添加进去
,此特性取决于具体的实现,不做强制约束。最后,BlockingQueue在生产者-消费者的场景中,是支持多消费者和多生产者的,说
的其实就是线程安全问题。相信上面说的每一句都很清楚了,BlockingQueue是一个比较简单的线程安全容器,下面我会分析其具体
的在JDK中的实现,这里又到了DougLea表演时间了。BlockingQueue实现之ArrayBlockingQ
ueueArrayBlockingQueue是BlockingQueue接口的有界队列实现类,底层采用数组来实现。其并发控制
采用可重入锁来控制,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。ArrayBlockingQueue共有以下几个属性
://用于存放元素的数组finalObject[]items;//下一次读取操作的位置inttakeIndex;//下
一次写入操作的位置intputIndex;//队列中的元素数量intcount;//以下几个就是控制并发用的同步器fin
alReentrantLocklock;privatefinalConditionnotEmpty;privatefi
nalConditionnotFull;我们用个示意图来描述其同步机制:ArrayBlockingQueue实现并发同步的
原理就是,读操作和写操作都需要获取到AQS独占锁才能进行操作。如果队列为空,这个时候读操作的线程进入到读线程队列排队,等待写线
程写入新的元素,然后唤醒读线程队列的第一个等待线程。如果队列已满,这个时候写操作的线程进入到写线程队列排队,等待读线程将队列元素移
除腾出空间,然后唤醒写线程队列的第一个等待线程。对于ArrayBlockingQueue,我们可以在构造的时候指定以下三个参数:
队列容量,其限制了队列中最多允许的元素个数;指定独占锁是公平锁还是非公平锁。非公平锁的吞吐量比较高,公平锁可以保证每次都是等待最久
的线程获取到锁;可以指定用一个集合来初始化,将此集合中的元素在构造方法期间就先添加到队列中。www.f-1.ccBlockingQ
ueue实现之LinkedBlockingQueue底层基于单向链表实现的阻塞队列,可以当做×××队列也可以当做有界队列来使用
。看构造方法://传说中的×××队列publicLinkedBlockingQueue(){this(Integer.MA
X_VALUE);}//传说中的有界队列publicLinkedBlockingQueue(intcapacity){i
f(capacity<=0)thrownewIllegalArgumentException();this.capa
city=capacity;last=head=newNode(null);}我们看看这个类有哪些属性://
队列容量privatefinalintcapacity;//队列中的元素数量privatefinalAtomicI
ntegercount=newAtomicInteger(0);//队头privatetransientNode<
E>head;//队尾privatetransientNodelast;//take,poll,peek
等读操作的方法需要获取到这个锁privatefinalReentrantLocktakeLock=newReentr
antLock();//如果读操作的时候队列是空的,那么等待notEmpty条件privatefinalConditi
onnotEmpty=takeLock.newCondition();//put,offer等写操作的方法需要获取到
这个锁privatefinalReentrantLockputLock=newReentrantLock();//
如果写操作的时候队列是满的,那么等待notFull条件privatefinalConditionnotFull=pu
tLock.newCondition();这里用了两个锁,两个Condition,简单介绍如下:takeLock和notEm
pty怎么搭配:如果要获取(take)一个元素,需要获取takeLock锁,但是获取了锁还不够,如果队列此时为空,还需要队列
不为空(notEmpty)这个条件(Condition)。putLock需要和notFull搭配:如果要插入(put)一个元
素,需要获取putLock锁,但是获取了锁还不够,如果队列此时已满,还需要队列不是满的(notFull)这个条件(Condit
ion)。首先,这里用一个示意图来看看LinkedBlockingQueue的并发读写控制,然后再开始分析源码:看懂这个示意图
,源码也就简单了,读操作是排好队的,写操作也是排好队的,唯一的并发问题在于一个写操作和一个读操作同时进行,只要控制好这个就可以了。
先上构造方法:publicLinkedBlockingQueue(intcapacity){if(capacity<=
0)thrownewIllegalArgumentException();this.capacity=capacit
y;last=head=newNode(null);}注意,这里会初始化一个空的头结点,那么第一个元素入队的时候
,队列中就会有两个元素。读取元素时,也总是获取头节点后面的一个节点。count的计数值不包括这个头节点。我们来看下put方法
是怎么将元素插入到队尾的:publicvoidput(Ee)throwsInterruptedException{i
f(e==null)thrownewNullPointerException();//如果你纠结这里为什么是-1
,可以看看offer方法。这就是个标识成功、失败的标志而已。intc=-1;Nodenode=newN
ode(e);finalReentrantLockputLock=this.putLock;finalAtomicI
ntegercount=this.count;//必须要获取到putLock才可以进行插入操作putLock.lo
ckInterruptibly();try{//如果队列满,等待notFull的条件满足。while(count.
get()==capacity){notFull.await();}//入队enqueue(node);//c
ount原子加1,c还是加1前的值c=count.getAndIncrement();//如果这个元素入队后,
还有至少一个槽可以使用,调用notFull.signal()唤醒等待线程。//哪些线程会等待在notFull这个Co
ndition上呢?if(c+1/入队后,释放掉putLockputLock.unlock();}//如果c==0,那么代表队列在这个元素入队前
是空的(不包括head空节点),//那么所有的读线程都在等待notEmpty这个条件,等待唤醒,这里做一次唤醒操作if
(c==0)signalNotEmpty();}//入队的代码非常简单,就是将last属性指向这个新元素,并且让原队
尾的next指向这个元素//这里入队没有并发问题,因为只有获取到putLock独占锁以后,才可以进行此操作private
voidenqueue(Nodenode){//assertputLock.isHeldByCurrentTh
read();//assertlast.next==null;last=last.next=node;}//
元素入队后,如果需要,调用这个方法唤醒读线程来读privatevoidsignalNotEmpty(){finalRe
entrantLocktakeLock=this.takeLock;takeLock.lock();try{notE
mpty.signal();}finally{takeLock.unlock();}}我们再看看take方法:pub
licEtake()throwsInterruptedException{Ex;intc=-1;final
AtomicIntegercount=this.count;finalReentrantLocktakeLock=
this.takeLock;//首先,需要获取到takeLock才能进行出队操作takeLock.lockInterr
uptibly();try{//如果队列为空,等待notEmpty这个条件满足再继续执行while(count.g
et()==0){notEmpty.await();}//出队x=dequeue();//count进行
原子减1c=count.getAndDecrement();//如果这次出队后,队列中至少还有一个元素,那么调用no
tEmpty.signal()唤醒其他的读线程if(c>1)notEmpty.signal();}finally
{//出队后释放掉takeLocktakeLock.unlock();}//如果c==capacity,那么说
明在这个take方法发生的时候,队列是满的//既然出队了一个,那么意味着队列不满了,唤醒写线程去写if(c==ca
pacity)signalNotFull();returnx;}//取队头,出队privateEdequeue(){
//asserttakeLock.isHeldByCurrentThread();//asserthead.item
==null;//之前说了,头结点是空的Nodeh=head;Nodefirst=h.next;
h.next=h;//helpGC//设置这个为新的头结点head=first;Ex=first.i
tem;first.item=null;returnx;}//元素出队后,如果需要,调用这个方法唤醒写线程来写priv
atevoidsignalNotFull(){finalReentrantLockputLock=this.put
Lock;putLock.lock();try{notFull.signal();}finally{putLock
.unlock();}}源码分析就到这里结束了吧,毕竟还是比较简单的源码,基本上只要读者认真点都看得懂。www.44226.ne
tBlockingQueue实现之SynchronousQueue它是一个特殊的队列,它的名字其实就蕴含了它的特征–-同
步的队列。为什么说是同步的呢?这里说的并不是多线程的并发问题,而是因为当一个线程往队列中写入一个元素时,写入操作不会立即返回,需要
等待另一个线程来将这个元素拿走;同理,当一个读线程做读操作的时候,同样需要一个相匹配的写线程的写操作。这里的Synchronou
s指的就是读线程和写线程需要同步,一个读线程匹配一个写线程。我们比较少使用到SynchronousQueue这个类,不过它在
线程池的实现类ScheduledThreadPoolExecutor中得到了应用,感兴趣的读者可以在看完这个后去看看相应的使用
。虽然上面我说了队列,但是SynchronousQueue的队列其实是虚的,其不提供任何空间(一个都没有)来存储元素。数据必须
从某个写线程交给某个读线程,而不是写到某个队列中等待被消费。你不能在SynchronousQueue中使用peek方法(在
这里这个方法直接返回null),peek方法的语义是只读取不移除,显然,这个方法的语义是不符合SynchronousQueu
e的特征的。SynchronousQueue也不能被迭代,因为根本就没有元素可以拿来迭代的。虽然SynchronousQue
ue间接地实现了Collection接口,但是如果你将其当做Collection来用的话,那么集合是空的。当然,这个类也
是不允许传递null值的(并发包中的容器类好像都不支持插入null值,因为null值往往用作其他用途,比如用于方法的返
回值代表操作失败)。接下来,我们来看看具体的源码实现吧,它的源码不是很简单的那种,我们需要先搞清楚它的设计思想。源码加注释大概有
1200行,我们先看大框架://构造时,我们可以指定公平模式还是非公平模式,区别之后再说publicSynchronousQ
ueue(booleanfair){transferer=fair?newTransferQueue():ne
wTransferStack();}abstractstaticclassTransferer{//从方法名上大概就
知道,这个方法用于转移元素,从生产者手上转到消费者手上//也可以被动地,消费者调用这个方法来从生产者手上取元素//第一个参
数e如果不是null,代表场景为:将元素从生产者转移给消费者//如果是null,代表消费者等待生产者提供元素,然后返回
值就是相应的生产者提供的元素//第二个参数代表是否设置超时,如果设置超时,超时时间是第三个参数的值//返回值如果是nul
l,代表超时,或者中断。具体是哪个,可以通过检测中断状态得到。abstractObjecttransfer(Objecte
,booleantimed,longnanos);}Transferer有两个内部实现类,是因为构造Synchrono
usQueue的时候,我们可以指定公平策略。公平模式意味着,所有的读写线程都遵守先来后到,FIFO嘛,对应TransferQ
ueue。而非公平模式则对应TransferStack。我们先采用公平模式分析源码,然后再说说公平模式和非公平模式的区别。接下来
,我们看看put方法和take方法://写入值publicvoidput(Eo)throwsInterrupt
edException{if(o==null)thrownewNullPointerException();if
(transferer.transfer(o,false,0)==null){//1Thread.interru
pted();thrownewInterruptedException();}}//读取值并移除publicEtak
e()throwsInterruptedException{Objecte=transferer.transfer(
null,false,0);//2if(e!=null)return(E)e;Thread.interrup
ted();thrownewInterruptedException();}我们看到,写操作put(Eo)和读操作t
ake()都是调用Transferer.transfer(…)方法,区别在于第一个参数是否为null值。我们来看看tr
ansfer的设计思路,其基本算法如下:当调用这个方法时,如果队列是空的,或者队列中的节点和当前的线程操作类型一致(如当前操作是
put操作,而队列中的元素也都是写线程)。这种情况下,将当前线程加入到等待队列即可。如果队列中有等待节点,而且与当前操作可以匹
配(如队列中都是读操作线程,当前线程是写操作线程,反之亦然)。这种情况下,匹配等待队列的队头,出队,返回相应数据。其实这里有个隐含
的条件被满足了,队列如果不为空,肯定都是同种类型的节点,要么都是读操作,要么都是写操作。这个就要看到底是读线程积压了,还是写线程积
压了。我们可以假设出一个男女配对的场景:一个男的过来,如果一个人都没有,那么他需要等待;如果发现有一堆男的在等待,那么他需要排到队
列后面;如果发现是一堆女的在排队,那么他直接牵走队头的那个女的。既然这里说到了等待队列,我们先看看其实现,也就是QNode:st
aticfinalclassQNode{volatileQNodenext;//可以看出来,等待队列是单向链表
volatileObjectitem;//CAS''edtoorfromnullvolatileThreadw
aiter;//将线程对象保存在这里,用于挂起和唤醒finalbooleanisData;//用于判断是写线程节点(
isData==true),还是读线程节点QNode(Objectitem,booleanisData){this
.item=item;this.isData=isData;}......相信说了这么多以后,我们再来看trans
fer方法的代码就轻松多了。/?Putsortakesanitem./Objecttransfer(Object
e,booleantimed,longnanos){QNodes=null;//constructed/r
eusedasneededbooleanisData=(e!=null);for(;;){QNodet
=tail;QNodeh=head;if(t==null||h==null)//sawuninit
ializedvaluecontinue;//spin//队列空,或队列中节点类型和当前节点一致,//即我们说的第
一种情况,将节点入队即可。读者要想着这块if里面方法其实就是入队if(h==t||t.isData==isDa
ta){//emptyorsame-modeQNodetn=t.next;//t!=tail说明刚刚有
节点入队,continue即可if(t!=tail)//inconsistentreadcontinue;//
有其他节点入队,但是tail还是指向原来的,此时设置tail即可if(tn!=null){//laggin
gtail//这个方法就是:如果tail此时为t的话,设置为tnadvanceTail(t,tn);cont
inue;}//if(timed&&nanos<=0)//can''twaitreturnnull;if
(s==null)s=newQNode(e,isData);//将当前节点,插入到tail的后面if(
!t.casNext(null,s))//failedtolinkincontinue;//将当前节点设置为新的
tailadvanceTail(t,s);//swingtailandwait//看到这里,请读者先往下滑到这
个方法,看完了以后再回来这里,思路也就不会断了Objectx=awaitFulfill(s,e,timed,nano
s);//到这里,说明之前入队的线程被唤醒了,准备往下执行if(x==s){//waitwascancell
edclean(t,s);returnnull;}if(!s.isOffList()){//notalrea
dyunlinkedadvanceHead(t,s);//unlinkifheadif(x!=null)/
/andforgetfieldss.item=s;s.waiter=null;}return(x!=n
ull)?x:e;//这里的else分支就是上面说的第二种情况,有相应的读或写相匹配的情况}else{//
complementary-modeQNodem=h.next;//nodetofulfillif(t!=
tail||m==null||h!=head)continue;//inconsistentreadO
bjectx=m.item;if(isData==(x!=null)||//malreadyfulfi
lledx==m||//mcancelled!m.casItem(x,e)){//lostCASadv
anceHead(h,m);//dequeueandretrycontinue;}advanceHead(h,m
);//successfullyfulfilledLockSupport.unpark(m.waiter);return
(x!=null)?x:e;}}}voidadvanceTail(QNodet,QNodent){
if(tail==t)UNSAFE.compareAndSwapObject(this,tailOffset,t,n
t);}//自旋或阻塞,直到满足条件,这个方法返回ObjectawaitFulfill(QNodes,Objecte,
booleantimed,longnanos){longlastTime=timed?System.nanoT
ime():0;Threadw=Thread.currentThread();//判断需要自旋的次数,ints
pins=((head.next==s)?(timed?maxTimedSpins:maxUntimedSpi
ns):0);for(;;){//如果被中断了,那么取消这个节点if(w.isInterrupted())//
就是将当前节点s中的item属性设置为thiss.tryCancel(e);Objectx=s.item;
//这里是这个方法的唯一的出口if(x!=e)returnx;//如果需要,检测是否超时if(timed)
{longnow=System.nanoTime();nanos-=now-lastTime;lastTime
=now;if(nanos<=0){s.tryCancel(e);continue;}}if(spins
>0)--spins;//如果自旋达到了最大的次数,那么检测elseif(s.waiter==null)s.
waiter=w;//如果自旋到了最大的次数,那么线程挂起,等待唤醒elseif(!timed)LockSuppo
rt.park(this);//spinForTimeoutThreshold这个之前讲AQS的时候其实也说过,剩余时间
小于这个阈值的时候,就//不要进行挂起了,自旋的性能会比较好elseif(nanos>spinForTimeoutT
hreshold)LockSupport.parkNanos(this,nanos);}}DougLea的巧妙之处在于,
将各个代码凑在了一起,使得代码非常简洁,当然也同时增加了我们的阅读负担,看代码的时候,还是得仔细想想各种可能的情况。下面,再说说前
面说的公平模式和非公平模式的区别。相信大家心里面已经有了公平模式的工作流程的概念了,我就简单说说TransferStack的算
法,就不分析源码了。当调用这个方法时,如果队列是空的,或者队列中的节点和当前的线程操作类型一致(如当前操作是put操作,而栈中
的元素也都是写线程)。这种情况下,将当前线程加入到等待栈中,等待配对。然后返回相应的元素,或者如果被取消了的话,返回null。如
果栈中有等待节点,而且与当前操作可以匹配(如栈里面都是读操作线程,当前线程是写操作线程,反之亦然)。将当前节点压入栈顶,和栈中的节
点进行匹配,然后将这两个节点出栈。配对和出栈的动作其实也不是必须的,因为下面的一条会执行同样的事情。如果栈顶是进行匹配而入栈的节点
,帮助其进行匹配并出栈,然后再继续操作。应该说,TransferStack的源码要比TransferQueue的复杂一些,如
果读者感兴趣,请自行进行源码阅读。BlockingQueue实现之PriorityBlockingQueue带排序的Bloc
kingQueue实现,其并发控制采用的是ReentrantLock,队列为×××队列(ArrayBlockingQueue
是有界队列,LinkedBlockingQueue也可以通过在构造函数中传入capacity指定队列最大的容量,但是Pri
orityBlockingQueue只能指定初始的队列大小,后面插入元素的时候,如果空间不够的话会自动扩容)。简单地说,它就是
PriorityQueue的线程安全版本。不可以插入null值,同时,插入队列的对象必须是可比较大小的(comparable
),否则报ClassCastException异常。它的插入操作put方法不会block,因为它是×××队列(take
方法在队列为空的时候会阻塞)。它的源码相对比较简单,本节将介绍其核心源码部分。我们来看看它有哪些属性://构造方法中,如果不指定
大小的话,默认大小为11privatestaticfinalintDEFAULT_INITIAL_CAPACITY=
11;//数组的最大容量privatestaticfinalintMAX_ARRAY_SIZE=Integer.MA
X_VALUE-8;//这个就是存放数据的数组privatetransientObject[]queue;//队
列当前大小privatetransientintsize;//大小比较器,如果按照自然序排序,那么此属性可设置为nul
lprivatetransientComparatorcomparator;//并发控制所用的锁,
所有的public且涉及到线程安全的方法,都必须先获取到这个锁privatefinalReentrantLocklock
;//这个很好理解,其实例由上面的lock属性创建privatefinalConditionnotEmpty;//
这个也是用于锁,用于数组扩容的时候,需要先获取到这个锁,才能进行扩容操作//其使用CAS操作privatetransie
ntvolatileintallocationSpinLock;//用于序列化和反序列化的时候用,对于Priority
BlockingQueue我们应该比较少使用到序列化privatePriorityQueueq;此类实现了Collecti
on和Iterator接口中的所有接口方法,对其对象进行迭代并遍历时,不能保证有序性。如果你想要实现有序遍历,建议采用Ar
rays.sort(queue.toArray())进行处理。PriorityBlockingQueue提供了drainTo
方法用于将部分或全部元素有序地填充(准确说是转移,会删除原队列中的元素)到另一个集合中。还有一个需要说明的是,如果两个对象的优先
级相同(compare方法返回0),此队列并不保证它们之间的顺序。PriorityBlockingQueue使用了基于数组的
二叉堆来存放元素,所有的public方法采用同一个lock进行并发控制。二叉堆:一颗完全二叉树,它非常适合用数组进行存储,
对于数组中的元素a[i],其左子节点为a[2i+1],其右子节点为a[2i+2],其父节点为a[(i-1)/2],其堆
序性质为,每个节点的值都小于其左右子节点的值。二叉堆中最小的值就是根节点,但是删除根节点是比较麻烦的,因为需要调整树。简单用个图解
释一下二叉堆,我就不说太多专业的严谨的术语了,这种数据结构的优点是一目了然的,最小的元素一定是根元素,它是一棵满的树,除了最后一层
,最后一层的节点从左到右紧密排列。下面开始PriorityBlockingQueue的源码分析,首先我们来看看构造方法://
默认构造方法,采用默认值(11)来进行初始化publicPriorityBlockingQueue(){this(DEFAU
LT_INITIAL_CAPACITY,null);}//指定数组的初始大小publicPriorityBlockingQu
eue(intinitialCapacity){this(initialCapacity,null);}//指定比较器p
ublicPriorityBlockingQueue(intinitialCapacity,ComparatorerE>comparator){if(initialCapacity<1)thrownewIllegalArg
umentException();this.lock=newReentrantLock();this.notEmpty
=lock.newCondition();this.comparator=comparator;this.queue=
newObject[initialCapacity];}//在构造方法中就先填充指定的集合中的元素publicPriori
tyBlockingQueue(Collectionc){this.lock=newReen
trantLock();this.notEmpty=lock.newCondition();//booleanheap
ify=true;//trueifnotknowntobeinheaporderbooleanscre
en=true;//trueifmustscreenfornullsif(cinstanceofSort
edSet){SortedSetss=(SortedSet)
c;this.comparator=(Comparator)ss.comparator();hea
pify=false;}elseif(cinstanceofPriorityBlockingQueue){
PriorityBlockingQueuepq=(PriorityBlockingQueueextendsE>)c;this.comparator=(Comparator)pq.comp
arator();screen=false;if(pq.getClass()==PriorityBlockingQu
eue.class)//exactmatchheapify=false;}Object[]a=c.toArr
ay();intn=a.length;//Ifc.toArrayincorrectlydoesn''tretur
nObject[],copyit.if(a.getClass()!=Object[].class)a=Arra
ys.copyOf(a,n,Object[].class);if(screen&&(n==1||this.co
mparator!=null)){for(inti=0;i)thrownewNullPointerException();}this.queue=a;this.size=
n;if(heapify)heapify();}接下来,我们来看看其内部的自动扩容实现:privatevoidtryG
row(Object[]array,intoldCap){//这边做了释放锁的操作lock.unlock();//
mustreleaseandthenre-acquiremainlockObject[]newArray=n
ull;//用CAS操作将allocationSpinLock由0变为1,也算是获取锁if(allocati
onSpinLock==0&&UNSAFE.compareAndSwapInt(this,allocationSpinL
ockOffset,0,1)){try{//如果节点个数小于64,那么增加的oldCap+2的容量//
如果节点数大于等于64,那么增加oldCap的一半//所以节点数较小时,增长得快一些intnewCap=oldC
ap+((oldCap<64)?(oldCap+2):(oldCap>>1));//这里有可能溢出i
f(newCap-MAX_ARRAY_SIZE>0){//possibleoverflowintminCap
=oldCap+1;if(minCap<0||minCap>MAX_ARRAY_SIZE)thrown
ewOutOfMemoryError();newCap=MAX_ARRAY_SIZE;}//如果queue!=
array,那么说明有其他线程给queue分配了其他的空间if(newCap>oldCap&&queue==a
rray)//分配一个新的大数组newArray=newObject[newCap];}finally{//
重置,也就是释放锁allocationSpinLock=0;}}//如果有其他的线程也在做扩容的操作if(new
Array==null)//backoffifanotherthreadisallocatingThread
.yield();//重新获取锁lock.lock();//将原来数组中的元素复制到新分配的大数组中if(newAr
ray!=null&&queue==array){queue=newArray;System.arrayco
py(array,0,newArray,0,oldCap);}}扩容方法对并发的控制也非常的巧妙,释放了原来的独占锁l
ock,这样的话,扩容操作和读操作可以同时进行,提高吞吐量。下面,我们来分析下写操作put方法和读操作take方法。pub
licvoidput(Ee){//直接调用offer方法,因为前面我们也说了,在这里,put方法不会阻塞off
er(e);}publicbooleanoffer(Ee){if(e==null)thrownewNull
PointerException();finalReentrantLocklock=this.lock;//首先获取
到独占锁lock.lock();intn,cap;Object[]array;//如果当前队列中的元素个数>=
数组的大小,那么需要扩容了while((n=size)>=(cap=(array=queue).length)
)tryGrow(array,cap);try{Comparatorcmp=comparat
or;//节点添加到二叉堆中if(cmp==null)siftUpComparable(n,e,array);
elsesiftUpUsingComparator(n,e,array,cmp);//更新sizesize=n
+1;//唤醒等待的读线程notEmpty.signal();}finally{lock.unlock();}
returntrue;}对于二叉堆而言,插入一个节点是简单的,插入的节点如果比父节点小,交换它们,然后继续和父节点比较。//
这个方法就是将数据x插入到数组array的位置k处,然后再调整树privatestaticvoidsif
tUpComparable(intk,Tx,Object[]array){Comparable
key=(Comparable)x;while(k>0){//二叉堆中a[k]节点
的父节点位置intparent=(k-1)>>>1;Objecte=array[parent];if(
key.compareTo((T)e)>=0)break;array[k]=e;k=parent;}arr
ay[k]=key;}我们用图来示意一下,我们接下来要将11插入到队列中,看看siftUp是怎么操作的。我们再看看t
ake方法:publicEtake()throwsInterruptedException{finalReentr
antLocklock=this.lock;//独占锁lock.lockInterruptibly();Eresu
lt;try{//dequeue出队while((result=dequeue())==null)not
Empty.await();}finally{lock.unlock();}returnresult;}privat
eEdequeue(){intn=size-1;if(n<0)returnnull;else{
Object[]array=queue;//队头,用于返回Eresult=(E)array[0];//队尾
元素先取出Ex=(E)array[n];//队尾置空array[n]=null;ComparatoruperE>cmp=comparator;if(cmp==null)siftDownComparable(0,
x,array,n);elsesiftDownUsingComparator(0,x,array,n,cmp);
size=n;returnresult;}}dequeue方法返回队头,并调整二叉堆的树,调用这个方法必须先获取独占锁。废话不多说,出队是非常简单的,因为队头就是最小的元素,对应的是数组的第一个元素。难点是队头出队后,需要调整树。privatestaticvoidsiftDownComparable(intk,Tx,Object[]array,intn){if(n>0){Comparablekey=(Comparable)x;//这里得到的half肯定是非叶节点//a[n]是最后一个元素,其父节点是a[(n-1)/2]。所以n>>>1代表的节点肯定不是叶子节点//下面,我们结合图来一行行分析,这样比较直观简单//此时k为0,x为17,n为9inthalf=n>>>1;//得到half=4while(k)c).compareTo((T)array[right])>0)c=array[child=right];//key=17,c=12,所以条件不满足if(key.compareTo((T)c)<=0)break;//把12填充到根节点array[k]=c;//k赋值后为1k=child;//一轮过后,我们发现,12左边的子树和刚刚的差不多,都是缺少根节点,接下来处理就简单了}array[k]=key;}}记住二叉堆是一棵完全二叉树,那么根节点10拿掉后,最后面的元素17必须找到合适的地方放置。首先,17和10不能直接交换,那么先将根节点10的左右子节点中较小的节点往上滑,即12往上滑,然后原来12留下了一个空节点,然后再把这个空节点的较小的子节点往上滑,即13往上滑,最后,留出了位子,17补上即可。我稍微调整下这个树,以便读者能更明白:好了,PriorityBlockingQueue我们也说完了。总结我知道本文过长,相信一字不漏看完的读者肯定是少数。ArrayBlockingQueue底层是数组,有界队列,如果我们要使用生产者-消费者模式,这是非常好的选择。LinkedBlockingQueue底层是链表,可以当做×××和有界队列来使用,所以大家不要以为它就是×××队列。SynchronousQueue本身不带有空间来存储任何元素,使用上可以选择公平模式和非公平模式。PriorityBlockingQueue是×××队列,基于数组,数据结构为二叉堆,数组第一个也是树的根节点总是最小值。
献花(0)
+1
(本文系mjsws首藏)