分享

生产者和消费者

 astrotycoon 2015-02-12

“生产者消费者”问题描述如下。

有一个有限缓冲区和两个线程:生产者和消费者。他们分别把产品放入缓冲区和从缓冲区中拿走产品。生产者线程生产物品,然后将物品放置在一个空缓冲区中供消费者线程消费。消费者线程从缓冲区中获得物品,然后释放缓冲区。当生产者线程生产物品时,如果没有空缓冲区可用,那么生产者线程必须等待消费者线程释放出一个空缓冲区。当消费者线程消费物品时,如果没有满的缓冲区,那么消费者线程将被阻塞,直到新的物品被生产出来。并且生产者和消费者之间的操作是互斥的。它们之间的关系如下图所示:

                           生产者和消费者--线程 - chenfang7977 - On my way

这里用有名管道来模拟有限缓冲区,用信号量来解决生产者消费者问题之间的同步和互斥问题。

解决方案:

(1)信号量的考虑

    这里使用3个信号量,其中两个信号量avail和full分别用于解决生产者和消费者线程之间的同步问题,mutex是用于这两个线程时间的互斥问题。其中avail初始化为N(有界缓冲区的空单元数),mutex初始化为1,full初始化为0.

(2)流程图如下。

生产者和消费者--线程(信号量) - chenfang7977 - On my way

对流程图的分析:

这里假设初始化avail为5,也就是空缓存单元的起始个数是5。主函数创建两个线程后,两个线程并行执行。

    刚开始的时候,在生产者线程函数里,由于要对缓冲区写入数据(放入产品),因此通过P操作使avail减1,也使mutex减1,此时mutex为0。在生产者运行的同时,消费者也在运行,不过由于full的初始值设为0,消费者对full的P操作被阻塞,此时消费者线程是被阻塞的。

    生产者将数据写入缓冲区后,对full进行V操作,full置为1,此时消费者对full的P操作可以执行了,但对mutex进行P操作时又被阻塞,因为生产者还对mutex进行V操作(mutex保证了当前只有一个线程对缓冲区进行操作)。在生产者完成对mutex的V操作后,消费者才能继续执行,P操作mutex,读取缓冲区中的数据(此时生产者就被mutex阻塞了),消费者读取数据后,V操作avail,使空缓冲区个数加1,V操作mutex,解除生产者的阻塞状态。如此循环。

实验:

 编写程序实现生产者-消费者问题。使用linux的Pthread线程库,创建生产者和消费者两个线程,生产者线程计算当前的时间并放入缓冲区,每次计算一个时间数据;消费者线程从缓冲区读出并打印生产者计算的时间,每次打印一个时间数据。缓冲区大小为5个,生产和消费的消息数为10个,即生产和消费分别为10次。

可以参考上面的流程图,设置avail为5(即在消费者被阻塞的情况下,生产者可以写入5个数据到缓冲区,写入5个后生产者也被阻塞,只能等待消费者读取缓冲区中的数据,并对avai进行V操作),生产和消费分别循环10次。这里采用无名管道做为缓冲区。

与线程有关的函数参考:http://baike.baidu.com/view/400319.htm

程序如下:

 

 

/*producer-customer.c*/

#include <time.h>

#include <stdio.h>

#include <stdlib.h>

#include <unistd.h>

#include <fcntl.h>

#include <pthread.h>

#include <errno.h>

#include <semaphore.h>

#include <sys/ipc.h>

#include <string.h>

 

#define BUFFER_SIZE   5 //avail inital value                       

#define TIME_SIZE 30

 

int pipe_fd[2];

int i=0,j=0;

char *t;

time_t timer;

sem_t mutex,full,avail;

 

void *producer(void *arg)

{    

       int real_write;     

      

       while(i < 10)

       {

              sem_wait(&avail);            

              sem_wait(&mutex);  

 

              /*get seconds since 1970*/

              timer = time(0);

              /*calculate current time(struct tm type)*/

              struct tm *currtime = localtime(&timer);

              /*change time type:struct tm to ASCII code*/

              t=asctime(currtime);

 

                    

              printf("\nProducer: ID = %d\n", i);              

                           

              if ((real_write = write(pipe_fd[1], t, TIME_SIZE)) == -1)

              {                  

                     if(errno == EAGAIN)

                     {                         

                            printf("The FIFO has not been read yet.Please try later\n");

                     }

              }

              else

              {

                     printf("Write time to the FIFO\n");

              }

              sem_post(&full);

              sem_post(&mutex);

              i++;

              sleep(3);

       }

       pthread_exit(NULL);

}

void *customer(void *arg)

{    

       char c[TIME_SIZE];

       int real_read;

 

       while(j < 10)

       {

            sem_wait(&full);

          sem_wait(&mutex);

          memset(c, 0, sizeof(c));

            printf("\nCustomer: ID = %d\n", j);

         

              if ((real_read = read(pipe_fd[0], c, sizeof(c))) == -1)

              {                  

                     if (errno == EAGAIN)

                     {

                            printf("No data yet\n");

                     }

              }

              printf("Read current time %s from FIFO\n",c);

              sem_post(&avail);

              sem_post(&mutex);

              j++;

       }

       pthread_exit(NULL);

}

int main()

{

       pthread_t thrd_prd_id,thrd_cst_id;

       pthread_t mon_th_id;

       int ret;

       if (pipe(pipe_fd) < 0)

       {

              printf("pipe create error\n");

              exit(1);

       }

       ret = sem_init(&mutex, 0, 1);

       ret += sem_init(&avail, 0, BUFFER_SIZE);

       ret += sem_init(&full, 0, 0);

       if (ret != 0)

       {

              printf("Any semaphore initialization failed\n");

              return ret;

       }

       ret = pthread_create(&thrd_prd_id, NULL, producer, NULL);

       if (ret != 0)

       {

              printf("Create producer thread error\n");

              return ret;

       }

       ret = pthread_create(&thrd_cst_id, NULL, customer, NULL);

       if(ret != 0)

       {

              printf("Create customer thread error\n");

              return ret;

       }

 

       pthread_join(thrd_prd_id, NULL);      

       pthread_join(thrd_cst_id, NULL);

 

       close(pipe_fd[0]);

       close(pipe_fd[1]);

       return 0;

}

在终端编译:

root@chenfang-desktop:/home/chenfang# gcc -lpthread EX3-3.c -o EX3-3

(注意:在对线程程序编译时,需要使用链接库“-lpthread”。)

运行:

root@chenfang-desktop:/home/chenfang# ./EX3-3

Producer: ID = 0
Write time to the FIFO

Customer: ID = 0
Read current time Tue Dec 29 15:48:46 2009
 from FIFO

Producer: ID = 1
Write time to the FIFO

Customer: ID = 1
Read current time Tue Dec 29 15:48:49 2009
 from FIFO

Producer: ID = 2
Write time to the FIFO

Customer: ID = 2
Read current time Tue Dec 29 15:48:52 2009
 from FIFO

Producer: ID = 3
Write time to the FIFO

Customer: ID = 3
Read current time Tue Dec 29 15:48:55 2009
 from FIFO

Producer: ID = 4
Write time to the FIFO

Customer: ID = 4
Read current time Tue Dec 29 15:48:58 2009
 from FIFO

Producer: ID = 5
Write time to the FIFO

Customer: ID = 5
Read current time Tue Dec 29 15:49:01 2009
 from FIFO

Producer: ID = 6
Write time to the FIFO

Customer: ID = 6
Read current time Tue Dec 29 15:49:04 2009
 from FIFO

Producer: ID = 7
Write time to the FIFO

Customer: ID = 7
Read current time Tue Dec 29 15:49:07 2009
 from FIFO

Producer: ID = 8
Write time to the FIFO

Customer: ID = 8
Read current time Tue Dec 29 15:49:10 2009
 from FIFO

Producer: ID = 9
Write time to the FIFO

Customer: ID = 9
Read current time Tue Dec 29 15:49:13 2009
 from FIFO


注意:程序中的sleep(3)使得生产者V操作mutex后,延时3s,因此此时的CPU资源被消费者占去,执行消费者函数,读出数据,这时缓冲区变空了,消费者只能等sleep时间到,由生产者写入数据...如此循环,所以得到的结果是生产者写入数据和消费者读出数据是交替进行的。如果没有sleep或者sleep处于sem_post(mutex)之前,这样当sem_post(mutex)
后,生产者和消费者争夺CPU资源,由得到的那方执行自己的程序,因此出现的结果并不一定是交替进行而是随机的,但由于avail我们设置为5,因此最多连续出现5条生产者写入数据的提示,此时生产者就会被阻塞,等待消费者读出数据后,生产者才能继续写入数据。

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多