“生产者消费者”问题描述如下。 有一个有限缓冲区和两个线程:生产者和消费者。他们分别把产品放入缓冲区和从缓冲区中拿走产品。生产者线程生产物品,然后将物品放置在一个空缓冲区中供消费者线程消费。消费者线程从缓冲区中获得物品,然后释放缓冲区。当生产者线程生产物品时,如果没有空缓冲区可用,那么生产者线程必须等待消费者线程释放出一个空缓冲区。当消费者线程消费物品时,如果没有满的缓冲区,那么消费者线程将被阻塞,直到新的物品被生产出来。并且生产者和消费者之间的操作是互斥的。它们之间的关系如下图所示: 这里用有名管道来模拟有限缓冲区,用信号量来解决生产者消费者问题之间的同步和互斥问题。 解决方案: (1)信号量的考虑 这里使用3个信号量,其中两个信号量avail和full分别用于解决生产者和消费者线程之间的同步问题,mutex是用于这两个线程时间的互斥问题。其中avail初始化为N(有界缓冲区的空单元数),mutex初始化为1,full初始化为0. (2)流程图如下。 对流程图的分析: 这里假设初始化avail为5,也就是空缓存单元的起始个数是5。主函数创建两个线程后,两个线程并行执行。 刚开始的时候,在生产者线程函数里,由于要对缓冲区写入数据(放入产品),因此通过P操作使avail减1,也使mutex减1,此时mutex为0。在生产者运行的同时,消费者也在运行,不过由于full的初始值设为0,消费者对full的P操作被阻塞,此时消费者线程是被阻塞的。 生产者将数据写入缓冲区后,对full进行V操作,full置为1,此时消费者对full的P操作可以执行了,但对mutex进行P操作时又被阻塞,因为生产者还对mutex进行V操作(mutex保证了当前只有一个线程对缓冲区进行操作)。在生产者完成对mutex的V操作后,消费者才能继续执行,P操作mutex,读取缓冲区中的数据(此时生产者就被mutex阻塞了),消费者读取数据后,V操作avail,使空缓冲区个数加1,V操作mutex,解除生产者的阻塞状态。如此循环。 实验: 编写程序实现生产者-消费者问题。使用linux的Pthread线程库,创建生产者和消费者两个线程,生产者线程计算当前的时间并放入缓冲区,每次计算一个时间数据;消费者线程从缓冲区读出并打印生产者计算的时间,每次打印一个时间数据。缓冲区大小为5个,生产和消费的消息数为10个,即生产和消费分别为10次。 可以参考上面的流程图,设置avail为5(即在消费者被阻塞的情况下,生产者可以写入5个数据到缓冲区,写入5个后生产者也被阻塞,只能等待消费者读取缓冲区中的数据,并对avai进行V操作),生产和消费分别循环10次。这里采用无名管道做为缓冲区。 与线程有关的函数参考:http://baike.baidu.com/view/400319.htm 程序如下:
/*producer-customer.c*/ #include <time.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <fcntl.h> #include <pthread.h> #include <errno.h> #include <semaphore.h> #include <sys/ipc.h> #include <string.h>
#define BUFFER_SIZE 5 //avail inital value #define TIME_SIZE 30
int pipe_fd[2]; int i=0,j=0; char *t; time_t timer; sem_t mutex,full,avail;
void *producer(void *arg) { int real_write;
while(i < 10) { sem_wait(&avail); sem_wait(&mutex);
/*get seconds since 1970*/ timer = time(0); /*calculate current time(struct tm type)*/ struct tm *currtime = localtime(&timer); /*change time type:struct tm to ASCII co t=asctime(currtime);
printf("\nProducer: ID = %d\n", i);
if ((real_write = write(pipe_fd[1], t, TIME_SIZE)) == -1) { if(errno == EAGAIN) { printf("The FIFO has not been read yet.Please try later\n"); } } else { printf("Write time to the FIFO\n"); } sem_post(&full); sem_post(&mutex); i++; sleep(3); } pthread_exit(NULL); } void *customer(void *arg) { char c[TIME_SIZE]; int real_read;
while(j < 10) { sem_wait(&full); sem_wait(&mutex); memset(c, 0, sizeof(c)); printf("\nCustomer: ID = %d\n", j);
if ((real_read = read(pipe_fd[0], c, sizeof(c))) == -1) { if (errno == EAGAIN) { printf("No da } } printf("Read current time %s from FIFO\n",c); sem_post(&avail); sem_post(&mutex); j++; } pthread_exit(NULL); } int main() { pthread_t thrd_prd_id,thrd_cst_id; pthread_t mon_th_id; int ret; if (pipe(pipe_fd) < 0) { printf("pipe create error\n"); exit(1); } ret = sem_init(&mutex, 0, 1); ret += sem_init(&avail, 0, BUFFER_SIZE); ret += sem_init(&full, 0, 0); if (ret != 0) { printf("Any semaphore initialization failed\n"); return ret; } ret = pthread_create(&thrd_prd_id, NULL, producer, NULL); if (ret != 0) { printf("Create producer thread error\n"); return ret; } ret = pthread_create(&thrd_cst_id, NULL, customer, NULL); if(ret != 0) { printf("Create customer thread error\n"); return ret; }
pthread_join(thrd_prd_id, NULL); pthread_join(thrd_cst_id, NULL);
close(pipe_fd[0]); close(pipe_fd[1]); return 0; } 在终端编译: root@chenfang-desktop:/home/chenfang# gcc -lpthread EX3-3.c -o EX3-3 (注意:在对线程程序编译时,需要使用链接库“-lpthread”。) 运行: root@chenfang-desktop:/home/chenfang# ./EX3-3 Producer: ID = 0 Customer: ID = 0 Producer: ID = 1 Customer: ID = 1 Producer: ID = 2 Customer: ID = 2 Producer: ID = 3 Customer: ID = 3 Producer: ID = 4 Customer: ID = 4 Producer: ID = 5 Customer: ID = 5 Producer: ID = 6 Customer: ID = 6 Producer: ID = 7 Customer: ID = 7 Producer: ID = 8 Customer: ID = 8 Producer: ID = 9 Customer: ID = 9
|
|
来自: astrotycoon > 《thread》