分享

线程池技术个人理解以及c语言的简单实现

 写意人生 2014-04-27

转载:http://blog.csdn.net/mxgsgtc/article/details/11694901     

这几天闲来无事,网上无意中看到了关于线程池的东西,发现挺有意思的,找了挺多资料,研究一下,线程池技术,个人理解,线程池是个集合(概念上的,当然是线程的集合),假设这个集合中有3个线程A , B, C 这三个线程初始化的时候就是等待的状态,等待任务的到来,假设有任务1, 2, 3, 4, 5(任务处理的内容是一样的),线程池会怎么处理呢

①:A会处理1任务(任务其实就是函数),B会处理2任务,C处理3任务

②:当1任务结束之后,A会处理4任务;2任务结束后B会处理5任务,3任务结束后,C会等待(因为没有任务了)③:当所有任务处理完之后,A,B,C都会等待状态,等待新任务的到来上面陈述的是一般简单线程池的具体事例,那么如何实现,下面贴出代码(linux环境)

  1. #include <stdio.h>  
  2. #include <stdlib.h>  
  3. #include <pthread.h>  
  4. #include <unistd.h>  
  5. #include <sys/types.h>  
  6. #include <assert.h>  
  7. #include <string.h>  
  8. typedef void* (*TaskFun)(void *arg);  
  9.   
  10. typedef struct _TskNode{  
  11.     TaskFun TaskDmd;        /*task节点任务处理函数*/  
  12.     void *arg;              /*传入任务处理函数的参数*/  
  13.     struct _TskNode *pPre ; /*前一个任务节点*/  
  14.     struct _TskNode *pNext; /*后一个任务节点*/  
  15. } TskNode; /*队列节点*/  
  16.   
  17. typedef struct _tskQueueManage{  
  18.     int tskCurWaitNum;         /*当前任务队列的任务数量*/  
  19.     struct _TskNode *pTskHead; /*当前任务队列的首节点*/  
  20.     struct _TskNode *pTskTail; /*当前任务队列的尾节点*/  
  21. } TskQueueManage;              /*任务队列描述符*/  
  22.   
  23. typedef struct _threadManage{  
  24.     int thdMaxNum;              /*线程池容纳最大线程数量*/  
  25.     pthread_t *pth;             /*线程指针*/  
  26.     pthread_mutex_t mutex;      /*线程锁*/  
  27.     pthread_cond_t  cond;       /*线程条件变量*/  
  28. } ThreadManage;                 /*线程描述符*/  
  29.   
  30. typedef struct _thredPoolManage{  
  31.     int shutFlag;                    /*线程池摧毁标识*/  
  32.     ThreadManage   *pThdManage;      /*线程描述符指针*/  
  33.     TskQueueManage *pTskQueueManage; /*任务队列描述符指针*/  
  34. } ThdPoolManage;                     /*线程池描述符*/  
  35.   
  36. /*初始化,上述描述符*/  
  37. static int  mainDmdInit(int thdMaxNum);  
  38. /*线程的创建*/  
  39. static void thdPoolCreat();  
  40. /*线程池中的线程启动后处理*/  
  41. static void threadCreatdmd();  
  42. /*任务添加*/  
  43. static int  tskAddDmd(TaskFun TaskDmd, void* arg);  
  44. /*线程池的销毁*/  
  45. static void thdPoolDestroy();  
  46. /*线程池描述符指针*/  
  47. ThdPoolManage* pThdPoolManage = NULL;  
  48. static int  mainDmdInit(int thdMaxNum)  
  49. {  
  50.     int flag = 0;  
  51.     /*线程池描述符的创建*/  
  52.     pThdPoolManage = (ThdPoolManage *)malloc(sizeof(struct _thredPoolManage));  
  53.     if(pThdPoolManage != NULL){  
  54.         /*线程描述符的创建*/  
  55.         pThdPoolManage->pThdManage = (ThreadManage *)malloc(sizeof(struct _threadManage));  
  56.         if(pThdPoolManage->pThdManage != NULL){  
  57.             /*线程互斥锁于条件变量的初始化*/  
  58.             pthread_mutex_init(&(pThdPoolManage->pThdManage->mutex), NULL);  
  59.             pthread_cond_init(&(pThdPoolManage->pThdManage->cond), NULL);  
  60.             /*将线程池中允许的最大线程数赋值给线程池描述符成员*/  
  61.             pThdPoolManage->pThdManage->thdMaxNum = thdMaxNum;  
  62.             /*线程pthread_t的创建*/  
  63.             pThdPoolManage->pThdManage->pth = (pthread_t*)malloc(thdMaxNum*sizeof(pthread_t));  
  64.             if(pThdPoolManage->pThdManage->pth != NULL){  
  65.                 /*工作队列描述符的创建*/  
  66.                 pThdPoolManage->pTskQueueManage = (TskQueueManage *)malloc(sizeof(struct _tskQueueManage));  
  67.                 if(pThdPoolManage->pTskQueueManage != NULL){  
  68.                     /*初始队列工作描述符*/  
  69.                     pThdPoolManage->pTskQueueManage->tskCurWaitNum = 0;  
  70.                     pThdPoolManage->pTskQueueManage->pTskHead  = NULL;  
  71.                     pThdPoolManage->pTskQueueManage->pTskTail  = NULL;  
  72.                     /*线程池中所有线程的创建*/  
  73.                     thdPoolCreat();  
  74.                 } else {  
  75.                     /*注意: 如果malloc不成功,一定要free掉之前的malloc申请*/  
  76.                     free(pThdPoolManage->pThdManage->pth);  
  77.                     free(pThdPoolManage->pThdManage);  
  78.                     free(pThdPoolManage);  
  79.                     /*如果malloc失败,说明错误 flag 赋值为 = 1*/  
  80.                     flag = 1;  
  81.                     printf("[%s]:[%d] Warning: failed.\n", __func__, __LINE__);  
  82.                 }  
  83.             } else {  
  84.                 free(pThdPoolManage->pThdManage);  
  85.                 free(pThdPoolManage);  
  86.                 flag = 1;  
  87.                 printf("[%s]:[%d] Warning: failed.\n", __func__, __LINE__);  
  88.             }  
  89.         } else {  
  90.             free(pThdPoolManage);  
  91.             flag = 1;  
  92.             printf("[%s]:[%d] Warning: failed.\n", __func__, __LINE__);  
  93.         }  
  94.     } else {  
  95.         flag = 1;  
  96.         printf("[%s]:[%d] Warning: failed.\n", __func__, __LINE__);  
  97.     }  
  98.     return flag;  
  99. }  
  100.   
  101. static void thdPoolCreat()  
  102. {      
  103.     ThreadManage *pThreadManage = NULL;  
  104.     int thdNum;  
  105.     pThreadManage = pThdPoolManage->pThdManage;  
  106.     for(thdNum = 0; thdNum < pThreadManage->thdMaxNum; thdNum++){  
  107.         pthread_create(&(pThreadManage->pth[thdNum]), NULL, (void*)threadCreatdmd, NULL);  
  108.     }  
  109. }  
  110.   
  111. static void threadCreatdmd(void *arg)  
  112. {  
  113.     /*注意指针赋值之前要初始化为NULL,以免发生后续出现野指针的情况*/  
  114.     TskQueueManage* pTskQueueManage = NULL;  
  115.     ThreadManage *pThreadManage = NULL;  
  116.     TskNode* pCurTsk = NULL;  
  117.   
  118.     printf("threadCreatdmd_creat_success:[ThreadId]=[%x]\n", pthread_self());  
  119.     pTskQueueManage = pThdPoolManage->pTskQueueManage;  
  120.     pThreadManage = pThdPoolManage->pThdManage;  
  121.     while(1){  
  122.         /*注意,因为会创建很多个threadCreatdmd函数,由于每个函数都要访问临界代码:即对工作队列的操作,所以必须要枷锁 
  123.           以保证每一个处理函数(threadCreatdmd),在访问工作队列的时候,此工作队列不会被其他的处理函数修改*/  
  124.         pthread_mutex_lock(&(pThreadManage->mutex));   
  125.         /*最开始创建线程池中的线程需要等待的两种情况,即,while循环条件成立的情况, 1.线程池初始化时候(即没添加任务之前), 2. 
  126.           工作队列没有任务了(即任务都执行完了)*/  
  127.         while((pTskQueueManage->tskCurWaitNum == 0)&&(pThdPoolManage->shutFlag == 0)){  
  128.             printf("[ThreadId]=[%x]_waiting... ... ...\n", pthread_self());  
  129.             /*这时此线程会在这里阻塞*/  
  130.             pthread_cond_wait(&(pThreadManage->cond), &(pThreadManage->mutex));  
  131.         }  
  132.         if(pThdPoolManage->shutFlag == 1){  
  133.             pthread_mutex_unlock(&(pThreadManage->mutex));  
  134.             printf("[ThreadId]=[%x]_exit\n", pthread_self());  
  135.             pthread_exit(NULL);  
  136.         }  
  137.         printf("[ThreadId]=[%x]_starting_work!!\n", pthread_self());  
  138.         assert(pTskQueueManage->tskCurWaitNum != 0);  
  139.         assert(pTskQueueManage->pTskHead != NULL);  
  140.         (pTskQueueManage->tskCurWaitNum)--;  
  141.         /*取工作队列头部节点*/  
  142.         pCurTsk = pTskQueueManage->pTskHead;  
  143.         /*取头之后,将新头赋给下个元素*/  
  144.         pTskQueueManage->pTskHead = pTskQueueManage->pTskHead->pNext;  
  145.         /* 注意:如果最后一个元素 这时候 pTskQueueManage->pTskHead 是空,空的话是没有pPre的*/  
  146.         if(pTskQueueManage->pTskHead != NULL){  
  147.             pTskQueueManage->pTskHead->pPre = NULL;  
  148.         }  
  149.         pthread_mutex_unlock(&(pThreadManage->mutex));  
  150.         /*执行头部任务节点的任务函数(即上面取出的节点)*/  
  151.         (pCurTsk->TaskDmd)(pCurTsk->arg);  
  152.         free(pCurTsk);  
  153.         pCurTsk = NULL;  
  154.     }  
  155. }  
  156.   
  157. static int  tskAddDmd(TaskFun TaskDmd, void* arg)  
  158. {  
  159.     TskNode* pTskNode = NULL;  
  160.     TskQueueManage* pTskQueueManage = NULL;  
  161.     ThreadManage* pThdManage = NULL;  
  162.     int flag = 0;  
  163.     pTskQueueManage = pThdPoolManage->pTskQueueManage;  
  164.     pThdManage = pThdPoolManage->pThdManage;  
  165.     pthread_mutex_lock(&(pThdManage->mutex));  
  166.     /*任务添加,创建一个工作节点*/  
  167.     pTskNode = (TskNode*)malloc(sizeof(struct _TskNode));  
  168.     if(pTskNode != NULL){  
  169.         /*将任务(函数赋值给节点)*/  
  170.         pTskNode->TaskDmd = TaskDmd;  
  171.         pTskNode->pNext   = NULL;  
  172.         /*赋值参数*/  
  173.         pTskNode->arg     = arg;  
  174.         if(pTskQueueManage->tskCurWaitNum == 0){  
  175.             pTskQueueManage->pTskHead = pTskNode;  
  176.             pTskQueueManage->pTskTail = pTskNode;  
  177.         } else {  
  178.             pTskQueueManage->pTskTail->pNext = pTskNode;  
  179.             pTskNode->pPre = pTskQueueManage->pTskTail;  
  180.             pTskQueueManage->pTskTail = pTskNode;  
  181.         }  
  182.         (pTskQueueManage->tskCurWaitNum)++;  
  183.     } else {  
  184.         flag = 1;  
  185.         printf("[%s]:[%d] Warning: failed.\n", __func__, __LINE__);  
  186.     }  
  187.     pthread_mutex_unlock(&(pThdManage->mutex));  
  188.     pthread_cond_signal(&(pThdManage->cond));  
  189.     return flag;  
  190. }  
  191.   
  192. static void  thdPoolDestroy()  
  193. {  
  194.     int thdNum;  
  195.     pThdPoolManage->shutFlag = 1;  
  196.     TskQueueManage *pTskQueueManage = NULL;  
  197.     ThreadManage   *pThdManage = NULL;  
  198.     TskNode*  pTskNode = NULL;  
  199.     pTskQueueManage = pThdPoolManage->pTskQueueManage;  
  200.     pThdManage   = pThdPoolManage->pThdManage;  
  201.     pthread_cond_broadcast(&(pThdPoolManage->pThdManage->cond));  
  202.     for(thdNum = 0; thdNum < pThdManage -> thdMaxNum; thdNum++){  
  203.         pthread_join(pThdManage->pth[thdNum], NULL);  
  204.     }  
  205.     while(pTskQueueManage->pTskHead != NULL){  
  206.         pTskNode = pTskQueueManage->pTskHead;  
  207.         pTskQueueManage->pTskHead = pTskQueueManage->pTskHead->pNext;  
  208.         free(pTskNode);  
  209.     }  
  210.     pthread_mutex_destroy(&(pThdManage->mutex));  
  211.     pthread_cond_destroy(&(pThdManage->cond));  
  212.     free(pThdPoolManage->pThdManage->pth);  
  213.     free(pThdPoolManage->pThdManage);  
  214.     free(pThdPoolManage->pTskQueueManage);  
  215.     free(pThdPoolManage);  
  216.     pThdPoolManage = NULL;  
  217. }  
  218.   
  219. void TaskDmd(void *arg)  
  220. {  
  221.     printf("[ThreadId]=[%x] working on task[%d]\n", pthread_self(), *((int *)arg));  
  222.     sleep(1);  
  223. }  
  224. /*测试代码*/  
  225. int main()  
  226. {     
  227.     int flag;  
  228.     int taskAdd;  
  229.     int *taskArg;  
  230.     int taskNum = 10;  
  231.     int thdMaxNum = 3;  
  232.     flag = mainDmdInit(thdMaxNum);  
  233.     /*保险起见两秒,因为可能会造成添加任务在线程等待之前执行*/  
  234.     sleep(2);  
  235.     taskArg = (int *)malloc(sizeof(int) * taskNum);  
  236.     memset(taskArg, 0x00, sizeof(int) * taskNum);  
  237.     if(flag != 1){  
  238.         for(taskAdd = 0; taskAdd < taskNum; taskAdd++){  
  239.             taskArg[taskAdd] = taskAdd;  
  240.             flag = tskAddDmd((void*)TaskDmd, &(taskArg[taskAdd]));  
  241.             if(flag == 1){  
  242.                 printf("jobAdd error Num=[%d]\n", taskAdd);  
  243.             } else {  
  244.                 printf("jobAdd success Num = [%d]\n", taskAdd);  
  245.             }  
  246.         }  
  247.     } else {  
  248.         printf("[%s]:[%d] Warning: failed.\n", __func__, __LINE__);  
  249.     }  
  250.     sleep(10);  
  251.     thdPoolDestroy();  
  252.     return 0;  
  253. }  


  

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多