转载:http://blog.csdn.net/mxgsgtc/article/details/11694901 这几天闲来无事,网上无意中看到了关于线程池的东西,发现挺有意思的,找了挺多资料,研究一下,线程池技术,个人理解,线程池是个集合(概念上的,当然是线程的集合),假设这个集合中有3个线程A , B, C 这三个线程初始化的时候就是等待的状态,等待任务的到来,假设有任务1, 2, 3, 4, 5(任务处理的内容是一样的),线程池会怎么处理呢
①:A会处理1任务(任务其实就是函数),B会处理2任务,C处理3任务
②:当1任务结束之后,A会处理4任务;2任务结束后B会处理5任务,3任务结束后,C会等待(因为没有任务了)③:当所有任务处理完之后,A,B,C都会等待状态,等待新任务的到来上面陈述的是一般简单线程池的具体事例,那么如何实现,下面贴出代码(linux环境)
- #include <stdio.h>
- #include <stdlib.h>
- #include <pthread.h>
- #include <unistd.h>
- #include <sys/types.h>
- #include <assert.h>
- #include <string.h>
- typedef void* (*TaskFun)(void *arg);
-
- typedef struct _TskNode{
- TaskFun TaskDmd; /*task节点任务处理函数*/
- void *arg; /*传入任务处理函数的参数*/
- struct _TskNode *pPre ; /*前一个任务节点*/
- struct _TskNode *pNext; /*后一个任务节点*/
- } TskNode; /*队列节点*/
-
- typedef struct _tskQueueManage{
- int tskCurWaitNum; /*当前任务队列的任务数量*/
- struct _TskNode *pTskHead; /*当前任务队列的首节点*/
- struct _TskNode *pTskTail; /*当前任务队列的尾节点*/
- } TskQueueManage; /*任务队列描述符*/
-
- typedef struct _threadManage{
- int thdMaxNum; /*线程池容纳最大线程数量*/
- pthread_t *pth; /*线程指针*/
- pthread_mutex_t mutex; /*线程锁*/
- pthread_cond_t cond; /*线程条件变量*/
- } ThreadManage; /*线程描述符*/
-
- typedef struct _thredPoolManage{
- int shutFlag; /*线程池摧毁标识*/
- ThreadManage *pThdManage; /*线程描述符指针*/
- TskQueueManage *pTskQueueManage; /*任务队列描述符指针*/
- } ThdPoolManage; /*线程池描述符*/
-
- /*初始化,上述描述符*/
- static int mainDmdInit(int thdMaxNum);
- /*线程的创建*/
- static void thdPoolCreat();
- /*线程池中的线程启动后处理*/
- static void threadCreatdmd();
- /*任务添加*/
- static int tskAddDmd(TaskFun TaskDmd, void* arg);
- /*线程池的销毁*/
- static void thdPoolDestroy();
- /*线程池描述符指针*/
- ThdPoolManage* pThdPoolManage = NULL;
- static int mainDmdInit(int thdMaxNum)
- {
- int flag = 0;
- /*线程池描述符的创建*/
- pThdPoolManage = (ThdPoolManage *)malloc(sizeof(struct _thredPoolManage));
- if(pThdPoolManage != NULL){
- /*线程描述符的创建*/
- pThdPoolManage->pThdManage = (ThreadManage *)malloc(sizeof(struct _threadManage));
- if(pThdPoolManage->pThdManage != NULL){
- /*线程互斥锁于条件变量的初始化*/
- pthread_mutex_init(&(pThdPoolManage->pThdManage->mutex), NULL);
- pthread_cond_init(&(pThdPoolManage->pThdManage->cond), NULL);
- /*将线程池中允许的最大线程数赋值给线程池描述符成员*/
- pThdPoolManage->pThdManage->thdMaxNum = thdMaxNum;
- /*线程pthread_t的创建*/
- pThdPoolManage->pThdManage->pth = (pthread_t*)malloc(thdMaxNum*sizeof(pthread_t));
- if(pThdPoolManage->pThdManage->pth != NULL){
- /*工作队列描述符的创建*/
- pThdPoolManage->pTskQueueManage = (TskQueueManage *)malloc(sizeof(struct _tskQueueManage));
- if(pThdPoolManage->pTskQueueManage != NULL){
- /*初始队列工作描述符*/
- pThdPoolManage->pTskQueueManage->tskCurWaitNum = 0;
- pThdPoolManage->pTskQueueManage->pTskHead = NULL;
- pThdPoolManage->pTskQueueManage->pTskTail = NULL;
- /*线程池中所有线程的创建*/
- thdPoolCreat();
- } else {
- /*注意: 如果malloc不成功,一定要free掉之前的malloc申请*/
- free(pThdPoolManage->pThdManage->pth);
- free(pThdPoolManage->pThdManage);
- free(pThdPoolManage);
- /*如果malloc失败,说明错误 flag 赋值为 = 1*/
- flag = 1;
- printf("[%s]:[%d] Warning: failed.\n", __func__, __LINE__);
- }
- } else {
- free(pThdPoolManage->pThdManage);
- free(pThdPoolManage);
- flag = 1;
- printf("[%s]:[%d] Warning: failed.\n", __func__, __LINE__);
- }
- } else {
- free(pThdPoolManage);
- flag = 1;
- printf("[%s]:[%d] Warning: failed.\n", __func__, __LINE__);
- }
- } else {
- flag = 1;
- printf("[%s]:[%d] Warning: failed.\n", __func__, __LINE__);
- }
- return flag;
- }
-
- static void thdPoolCreat()
- {
- ThreadManage *pThreadManage = NULL;
- int thdNum;
- pThreadManage = pThdPoolManage->pThdManage;
- for(thdNum = 0; thdNum < pThreadManage->thdMaxNum; thdNum++){
- pthread_create(&(pThreadManage->pth[thdNum]), NULL, (void*)threadCreatdmd, NULL);
- }
- }
-
- static void threadCreatdmd(void *arg)
- {
- /*注意指针赋值之前要初始化为NULL,以免发生后续出现野指针的情况*/
- TskQueueManage* pTskQueueManage = NULL;
- ThreadManage *pThreadManage = NULL;
- TskNode* pCurTsk = NULL;
-
- printf("threadCreatdmd_creat_success:[ThreadId]=[%x]\n", pthread_self());
- pTskQueueManage = pThdPoolManage->pTskQueueManage;
- pThreadManage = pThdPoolManage->pThdManage;
- while(1){
- /*注意,因为会创建很多个threadCreatdmd函数,由于每个函数都要访问临界代码:即对工作队列的操作,所以必须要枷锁
- 以保证每一个处理函数(threadCreatdmd),在访问工作队列的时候,此工作队列不会被其他的处理函数修改*/
- pthread_mutex_lock(&(pThreadManage->mutex));
- /*最开始创建线程池中的线程需要等待的两种情况,即,while循环条件成立的情况, 1.线程池初始化时候(即没添加任务之前), 2.
- 工作队列没有任务了(即任务都执行完了)*/
- while((pTskQueueManage->tskCurWaitNum == 0)&&(pThdPoolManage->shutFlag == 0)){
- printf("[ThreadId]=[%x]_waiting... ... ...\n", pthread_self());
- /*这时此线程会在这里阻塞*/
- pthread_cond_wait(&(pThreadManage->cond), &(pThreadManage->mutex));
- }
- if(pThdPoolManage->shutFlag == 1){
- pthread_mutex_unlock(&(pThreadManage->mutex));
- printf("[ThreadId]=[%x]_exit\n", pthread_self());
- pthread_exit(NULL);
- }
- printf("[ThreadId]=[%x]_starting_work!!\n", pthread_self());
- assert(pTskQueueManage->tskCurWaitNum != 0);
- assert(pTskQueueManage->pTskHead != NULL);
- (pTskQueueManage->tskCurWaitNum)--;
- /*取工作队列头部节点*/
- pCurTsk = pTskQueueManage->pTskHead;
- /*取头之后,将新头赋给下个元素*/
- pTskQueueManage->pTskHead = pTskQueueManage->pTskHead->pNext;
- /* 注意:如果最后一个元素 这时候 pTskQueueManage->pTskHead 是空,空的话是没有pPre的*/
- if(pTskQueueManage->pTskHead != NULL){
- pTskQueueManage->pTskHead->pPre = NULL;
- }
- pthread_mutex_unlock(&(pThreadManage->mutex));
- /*执行头部任务节点的任务函数(即上面取出的节点)*/
- (pCurTsk->TaskDmd)(pCurTsk->arg);
- free(pCurTsk);
- pCurTsk = NULL;
- }
- }
-
- static int tskAddDmd(TaskFun TaskDmd, void* arg)
- {
- TskNode* pTskNode = NULL;
- TskQueueManage* pTskQueueManage = NULL;
- ThreadManage* pThdManage = NULL;
- int flag = 0;
- pTskQueueManage = pThdPoolManage->pTskQueueManage;
- pThdManage = pThdPoolManage->pThdManage;
- pthread_mutex_lock(&(pThdManage->mutex));
- /*任务添加,创建一个工作节点*/
- pTskNode = (TskNode*)malloc(sizeof(struct _TskNode));
- if(pTskNode != NULL){
- /*将任务(函数赋值给节点)*/
- pTskNode->TaskDmd = TaskDmd;
- pTskNode->pNext = NULL;
- /*赋值参数*/
- pTskNode->arg = arg;
- if(pTskQueueManage->tskCurWaitNum == 0){
- pTskQueueManage->pTskHead = pTskNode;
- pTskQueueManage->pTskTail = pTskNode;
- } else {
- pTskQueueManage->pTskTail->pNext = pTskNode;
- pTskNode->pPre = pTskQueueManage->pTskTail;
- pTskQueueManage->pTskTail = pTskNode;
- }
- (pTskQueueManage->tskCurWaitNum)++;
- } else {
- flag = 1;
- printf("[%s]:[%d] Warning: failed.\n", __func__, __LINE__);
- }
- pthread_mutex_unlock(&(pThdManage->mutex));
- pthread_cond_signal(&(pThdManage->cond));
- return flag;
- }
-
- static void thdPoolDestroy()
- {
- int thdNum;
- pThdPoolManage->shutFlag = 1;
- TskQueueManage *pTskQueueManage = NULL;
- ThreadManage *pThdManage = NULL;
- TskNode* pTskNode = NULL;
- pTskQueueManage = pThdPoolManage->pTskQueueManage;
- pThdManage = pThdPoolManage->pThdManage;
- pthread_cond_broadcast(&(pThdPoolManage->pThdManage->cond));
- for(thdNum = 0; thdNum < pThdManage -> thdMaxNum; thdNum++){
- pthread_join(pThdManage->pth[thdNum], NULL);
- }
- while(pTskQueueManage->pTskHead != NULL){
- pTskNode = pTskQueueManage->pTskHead;
- pTskQueueManage->pTskHead = pTskQueueManage->pTskHead->pNext;
- free(pTskNode);
- }
- pthread_mutex_destroy(&(pThdManage->mutex));
- pthread_cond_destroy(&(pThdManage->cond));
- free(pThdPoolManage->pThdManage->pth);
- free(pThdPoolManage->pThdManage);
- free(pThdPoolManage->pTskQueueManage);
- free(pThdPoolManage);
- pThdPoolManage = NULL;
- }
-
- void TaskDmd(void *arg)
- {
- printf("[ThreadId]=[%x] working on task[%d]\n", pthread_self(), *((int *)arg));
- sleep(1);
- }
- /*测试代码*/
- int main()
- {
- int flag;
- int taskAdd;
- int *taskArg;
- int taskNum = 10;
- int thdMaxNum = 3;
- flag = mainDmdInit(thdMaxNum);
- /*保险起见两秒,因为可能会造成添加任务在线程等待之前执行*/
- sleep(2);
- taskArg = (int *)malloc(sizeof(int) * taskNum);
- memset(taskArg, 0x00, sizeof(int) * taskNum);
- if(flag != 1){
- for(taskAdd = 0; taskAdd < taskNum; taskAdd++){
- taskArg[taskAdd] = taskAdd;
- flag = tskAddDmd((void*)TaskDmd, &(taskArg[taskAdd]));
- if(flag == 1){
- printf("jobAdd error Num=[%d]\n", taskAdd);
- } else {
- printf("jobAdd success Num = [%d]\n", taskAdd);
- }
- }
- } else {
- printf("[%s]:[%d] Warning: failed.\n", __func__, __LINE__);
- }
- sleep(10);
- thdPoolDestroy();
- return 0;
- }
|