配色: 字号:
Java并发编程深入学习—生产者-消费者模式多种实现方式
2016-09-18 | 阅:  转:  |  分享 
  
Java并发编程深入学习—生产者-消费者模式多种实现方式

问题介绍



??生产者消费者模型是经典的同步问题。问题大致如下:当队列满时,生产者需要等待队列有空间才能继续往里面放入商品,而在等待的期间内,生产者必须释放对临界资源(即队列)的占用权。因为生产者如果不释放对临界资源的占用权,那么消费者就无法消费队列中的商品,就不会让队列有空间,那么生产者就会一直无限等待下去。因此,一般情况下,当队列满时,会让生产者交出对临界资源的占用权,并进入挂起状态。然后等待消费者消费了商品,然后消费者通知生产者队列有空间了。同样地,当队列空时,消费者也必须等待,等待生产者通知它队列中有商品了。这种互相通信的过程就是线程间的协作。



下面总结了三种方式来实现一个消费者/生产者模型.



1.wait/notify模式



/

waitandnotify

生产者和消费者问题



@authorbridge

/

publicclasstest1{



publicstaticvoidmain(String[]args){

Resourceresource=newResource();

//生产者线程

ProducerThreadp=newProducerThread(resource);

//多个消费者

ConsumerThreadc1=newConsumerThread(resource);

ConsumerThreadc2=newConsumerThread(resource);

ConsumerThreadc3=newConsumerThread(resource);



p.start();

c1.start();

c2.start();

c3.start();

}



}





classProducerThreadextendsThread{



privateResourceresource;



publicProducerThread(Resourceresource){

this.resource=resource;

//setName("生产者");

}



publicvoidrun(){

while(true){

try{

Thread.sleep((long)(1000Math.random()));

}catch(InterruptedExceptione){

e.printStackTrace();

}

resource.add();

}

}

}



classConsumerThreadextendsThread{

privateResourceresource;



publicConsumerThread(Resourceresource){

this.resource=resource;

//setName("消费者");

}



publicvoidrun(){

while(true){

try{

Thread.sleep((long)(1000Math.random()));

}catch(InterruptedExceptione){

e.printStackTrace();

}

resource.remove();

}

}

}





/

公共资源类

/

classResource{



//当前资源数量

privateintnum=0;

//资源池中允许存放的资源数

privateintsize=10;



/

向资源池中添加资源

/

publicsynchronizedvoidadd(){



if(num
num++;

System.out.println(Thread.currentThread().getName()+"生产一件资源,当前资源池有"+num+"个");

notifyAll();

}else{

try{

wait();

System.out.println(Thread.currentThread().getName()+"线程进入等待");

}catch(InterruptedExceptione){

e.printStackTrace();

}

}

}



/

从资源池中取走资源

/

publicsynchronizedvoidremove(){



if(num>0){

num--;

System.out.println("消费者"+Thread.currentThread().getName()+"消耗一件资源,"+"当前资源池有"+num+"个");

notifyAll();

}else{

try{

wait();

System.out.println(Thread.currentThread().getName()+"线程进入等待");

}catch(InterruptedExceptione){

e.printStackTrace();

}

}



}



}



2.Lockcondition模式



packageProducerAndConsumer.p2;



importjava.util.concurrent.locks.Condition;

importjava.util.concurrent.locks.Lock;

importjava.util.concurrent.locks.ReentrantLock;



/

lock和condition解法

生产者和消费者问题



@authorbridge

/

publicclasstest2{



publicstaticvoidmain(String[]args){

Locklock=newReentrantLock();

ConditionpCondition=lock.newCondition();

ConditioncCondition=lock.newCondition();



Resourceresource=newResource(lock,pCondition,cCondition);



//生产者线程

ProducerThreadp=newProducerThread(resource);

//多个消费者

ConsumerThreadc1=newConsumerThread(resource);

ConsumerThreadc2=newConsumerThread(resource);

ConsumerThreadc3=newConsumerThread(resource);



p.start();

c1.start();

//c2.start();

//c3.start();

}

}



/

生产者线程

/

classProducerThreadextendsThread{



privateResourceresource;



publicProducerThread(Resourceresource){

this.resource=resource;

setName("生产者");

}



publicvoidrun(){

while(true){

try{

Thread.sleep((long)(1000Math.random()));

}catch(Interruptwww.shanxiwang.netedExceptione){

e.printStackTrace();

}

resource.add();

}

}

}



/

消费者线程

/

classConsumerThreadextendsThread{



privateResourceresource;



publicConsumerThread(Resourceresource){

this.resource=resource;

//setName("消费者");

}



publicvoidrun(){

while(true){

try{

Thread.sleep((long)(1000Math.random()));

}catch(InterruptedExceptione){

e.printStackTrace();

}

resource.remove();

}

}

}





/

公共资源类

/

classResource{



//当前资源数量

privateintnum=0;

//资源池中允许存放的资源数

privateintsize=10;



privateLocklock;

privateConditionpCondition;

privateConditioncCondition;



publicResource(Locklock,ConditionpCondition,ConditioncCondition){

this.lock=lock;

this.pCondition=pCondition;

this.cCondition=cCondition;



}



/

向资源池中添加资源

/

publicvoidadd(){



lock.lock();

try{

if(num
num++;

System.out.println(Thread.currentThread().getName()+"生产一件资源,当前资源池有"+num+"个");

//唤醒等待的消费者

cCondition.signalAll();

}else{

//使生产者等待

pCondition.await();

System.out.println(Thread.currentThread().getName()+"线程进入等待");

}

}catch(InterruptedExceptione){

e.printStackTrace();

}finally{

lock.unlock();

}





}



/

从资源池中取走资源

/

publicvoidremove(){



lock.lock();

try{

if(num>0){

num--;

System.out.println("消费者"+Thread.currentThread().getName()+"消耗一件资源,"+"当前资源池有"+num+"个");

pCondition.signalAll();//唤醒等待的生产者

}else{

try{

cCondition.await();

//使消费者等待

System.out.println(Thread.currentThread().getName()+"线程进入等待");

}catch(InterruptedExceptione){

e.printStackTrace();

}

}

}finally{

lock.unlock();

}





}



}



3.阻塞队列模式



packageProducerAndConsumer.p3;



importjava.util.concurrent.BlockingQueue;

importjava.util.concurrent.LinkedBlockingQueue;



/

阻塞队列解法

生产者和消费者问题



@authorbridge

/

publicclasstest3{



publicstaticvoidmain(String[]args){

Resourceresource=newResource();

//生产者线程

ProducerThreadp=newProducerThread(resource);

//多个消费者

ConsumerThreadc1=newConsumerThread(resource);

ConsumerThreadc2=newConsumerThread(resource);

ConsumerThreadc3=newConsumerThread(resource);



p.start();

c1.start();

//c2.start();

//c3.start();

}



}





classProducerThreadextendsThread{



privateResourceresource;



publicProducerThread(Resourceresource){

this.resource=resource;

//setName("生产者");

}



publicvoidrun(){

while(true){

try{

Thread.sleep((long)(1000Math.random()));

}catch(InterruptedExceptione){

e.printStackTrace();

}

resource.add();

}

}

}



classConsumerThreadextendsThread{

privateResourceresource;



publicConsumerThread(Resourceresource){

this.resource=resource;

//setName("消费者");

}



publicvoidrun(){

while(true){

try{

Thread.sleep((long)(1000Math.random()));

}catch(InterruptedExceptione){

e.printStackTrace();

}

resource.remove();

}

}

}





/

公共资源类

/

classResource{





privateBlockingQueueresourceQueue=newLinkedBlockingQueue(10);



/

向资源池中添加资源

/

publicvoidadd(){



try{

resourceQueue.put(1);

System.out.println("生产者"+Thread.currentThread().getName()+"生产一件资源,"+"当前资源池有"+resourceQueue.size()+"个资源");

}catch(InterruptedExceptione){

e.printStackTrace();

}

}



/

从资源池中取走资源

/

publicvoidremove(){

try{

resourceQueue.take();

System.out.println("消费者"+Thread.currentThread().getName()+"消耗一件资源,"+"当前资源池有"+resourceQueue.size()+"个资源");

}catch(InterruptedExceptione){

e.printStackTrace();

}

}



}

献花(0)
+1
(本文系网络学习天...首藏)