Java并发编程深入学习—生产者-消费者模式多种实现方式
问题介绍
??生产者消费者模型是经典的同步问题。问题大致如下:当队列满时,生产者需要等待队列有空间才能继续往里面放入商品,而在等待的期间内,生产者必须释放对临界资源(即队列)的占用权。因为生产者如果不释放对临界资源的占用权,那么消费者就无法消费队列中的商品,就不会让队列有空间,那么生产者就会一直无限等待下去。因此,一般情况下,当队列满时,会让生产者交出对临界资源的占用权,并进入挂起状态。然后等待消费者消费了商品,然后消费者通知生产者队列有空间了。同样地,当队列空时,消费者也必须等待,等待生产者通知它队列中有商品了。这种互相通信的过程就是线程间的协作。
下面总结了三种方式来实现一个消费者/生产者模型.
1.wait/notify模式
/
waitandnotify
生产者和消费者问题
@authorbridge
/
publicclasstest1{
publicstaticvoidmain(String[]args){
Resourceresource=newResource();
//生产者线程
ProducerThreadp=newProducerThread(resource);
//多个消费者
ConsumerThreadc1=newConsumerThread(resource);
ConsumerThreadc2=newConsumerThread(resource);
ConsumerThreadc3=newConsumerThread(resource);
p.start();
c1.start();
c2.start();
c3.start();
}
}
classProducerThreadextendsThread{
privateResourceresource;
publicProducerThread(Resourceresource){
this.resource=resource;
//setName("生产者");
}
publicvoidrun(){
while(true){
try{
Thread.sleep((long)(1000Math.random()));
}catch(InterruptedExceptione){
e.printStackTrace();
}
resource.add();
}
}
}
classConsumerThreadextendsThread{
privateResourceresource;
publicConsumerThread(Resourceresource){
this.resource=resource;
//setName("消费者");
}
publicvoidrun(){
while(true){
try{
Thread.sleep((long)(1000Math.random()));
}catch(InterruptedExceptione){
e.printStackTrace();
}
resource.remove();
}
}
}
/
公共资源类
/
classResource{
//当前资源数量
privateintnum=0;
//资源池中允许存放的资源数
privateintsize=10;
/
向资源池中添加资源
/
publicsynchronizedvoidadd(){
if(num num++;
System.out.println(Thread.currentThread().getName()+"生产一件资源,当前资源池有"+num+"个");
notifyAll();
}else{
try{
wait();
System.out.println(Thread.currentThread().getName()+"线程进入等待");
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
}
/
从资源池中取走资源
/
publicsynchronizedvoidremove(){
if(num>0){
num--;
System.out.println("消费者"+Thread.currentThread().getName()+"消耗一件资源,"+"当前资源池有"+num+"个");
notifyAll();
}else{
try{
wait();
System.out.println(Thread.currentThread().getName()+"线程进入等待");
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
}
}
2.Lockcondition模式
packageProducerAndConsumer.p2;
importjava.util.concurrent.locks.Condition;
importjava.util.concurrent.locks.Lock;
importjava.util.concurrent.locks.ReentrantLock;
/
lock和condition解法
生产者和消费者问题
@authorbridge
/
publicclasstest2{
publicstaticvoidmain(String[]args){
Locklock=newReentrantLock();
ConditionpCondition=lock.newCondition();
ConditioncCondition=lock.newCondition();
Resourceresource=newResource(lock,pCondition,cCondition);
//生产者线程
ProducerThreadp=newProducerThread(resource);
//多个消费者
ConsumerThreadc1=newConsumerThread(resource);
ConsumerThreadc2=newConsumerThread(resource);
ConsumerThreadc3=newConsumerThread(resource);
p.start();
c1.start();
//c2.start();
//c3.start();
}
}
/
生产者线程
/
classProducerThreadextendsThread{
privateResourceresource;
publicProducerThread(Resourceresource){
this.resource=resource;
setName("生产者");
}
publicvoidrun(){
while(true){
try{
Thread.sleep((long)(1000Math.random()));
}catch(Interruptwww.shanxiwang.netedExceptione){
e.printStackTrace();
}
resource.add();
}
}
}
/
消费者线程
/
classConsumerThreadextendsThread{
privateResourceresource;
publicConsumerThread(Resourceresource){
this.resource=resource;
//setName("消费者");
}
publicvoidrun(){
while(true){
try{
Thread.sleep((long)(1000Math.random()));
}catch(InterruptedExceptione){
e.printStackTrace();
}
resource.remove();
}
}
}
/
公共资源类
/
classResource{
//当前资源数量
privateintnum=0;
//资源池中允许存放的资源数
privateintsize=10;
privateLocklock;
privateConditionpCondition;
privateConditioncCondition;
publicResource(Locklock,ConditionpCondition,ConditioncCondition){
this.lock=lock;
this.pCondition=pCondition;
this.cCondition=cCondition;
}
/
向资源池中添加资源
/
publicvoidadd(){
lock.lock();
try{
if(num num++;
System.out.println(Thread.currentThread().getName()+"生产一件资源,当前资源池有"+num+"个");
//唤醒等待的消费者
cCondition.signalAll();
}else{
//使生产者等待
pCondition.await();
System.out.println(Thread.currentThread().getName()+"线程进入等待");
}
}catch(InterruptedExceptione){
e.printStackTrace();
}finally{
lock.unlock();
}
}
/
从资源池中取走资源
/
publicvoidremove(){
lock.lock();
try{
if(num>0){
num--;
System.out.println("消费者"+Thread.currentThread().getName()+"消耗一件资源,"+"当前资源池有"+num+"个");
pCondition.signalAll();//唤醒等待的生产者
}else{
try{
cCondition.await();
//使消费者等待
System.out.println(Thread.currentThread().getName()+"线程进入等待");
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
}finally{
lock.unlock();
}
}
}
3.阻塞队列模式
packageProducerAndConsumer.p3;
importjava.util.concurrent.BlockingQueue;
importjava.util.concurrent.LinkedBlockingQueue;
/
阻塞队列解法
生产者和消费者问题
@authorbridge
/
publicclasstest3{
publicstaticvoidmain(String[]args){
Resourceresource=newResource();
//生产者线程
ProducerThreadp=newProducerThread(resource);
//多个消费者
ConsumerThreadc1=newConsumerThread(resource);
ConsumerThreadc2=newConsumerThread(resource);
ConsumerThreadc3=newConsumerThread(resource);
p.start();
c1.start();
//c2.start();
//c3.start();
}
}
classProducerThreadextendsThread{
privateResourceresource;
publicProducerThread(Resourceresource){
this.resource=resource;
//setName("生产者");
}
publicvoidrun(){
while(true){
try{
Thread.sleep((long)(1000Math.random()));
}catch(InterruptedExceptione){
e.printStackTrace();
}
resource.add();
}
}
}
classConsumerThreadextendsThread{
privateResourceresource;
publicConsumerThread(Resourceresource){
this.resource=resource;
//setName("消费者");
}
publicvoidrun(){
while(true){
try{
Thread.sleep((long)(1000Math.random()));
}catch(InterruptedExceptione){
e.printStackTrace();
}
resource.remove();
}
}
}
/
公共资源类
/
classResource{
privateBlockingQueueresourceQueue=newLinkedBlockingQueue(10);
/
向资源池中添加资源
/
publicvoidadd(){
try{
resourceQueue.put(1);
System.out.println("生产者"+Thread.currentThread().getName()+"生产一件资源,"+"当前资源池有"+resourceQueue.size()+"个资源");
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
/
从资源池中取走资源
/
publicvoidremove(){
try{
resourceQueue.take();
System.out.println("消费者"+Thread.currentThread().getName()+"消耗一件资源,"+"当前资源池有"+resourceQueue.size()+"个资源");
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
}
|
|