分享

线程池[高级]

 astrotycoon 2015-02-02
  1. #线程池配置文件filename:[thread_pool_config.conf] 其中‘#’为注释 不准以=开始   
  2. #线程池最小线程数   
  3. MIN_THREAD_NUM = 3  
  4.   
  5. #线程池最大线程数   
  6. MAX_THREAD_NUM =50  
  7.   
  8. #线程池默认线程数   
  9. DEF_THREAD_NUM = 25  
  10.   
  11. #管理线程动态调节时间间隔(s)   
  12. MANAGE_ADJUST_INTERVAL = 5  
  13.   
  14. #线程数与任务峰值比例   
  15. THREAD_WORKER_HIGH_RATIO = 3  
  16.   
  17. #任务数与线程数低谷比例   
  18. THREAD_WORKER_LOW_RATIO = 1  




说明:这里有3个参数比较关键:
    其中MANAGE_ADJUST_INTERVAL 表示管理线程的动态调节时间间隔,这里线程池的动态调优是通过后台线程统一管理,类似nginx中的master进程。
    THREAD_WORKER_HIGH_RATIO 指的是任务数目是线程数目的多少倍,主要指示任务峰值状态,也就是满负载状况下,需要增加线程(当然线程不能无限指增加,不然的话会增加CPU的负担,这里用MAX_THREAD_NUM 来限制最大线程数)
    THREAD_WORKER_LOW_RATIO 指的是任务数目是线程数目的多少倍,主要指示任务平静期,这时很多线程处于阻塞空闲状态,需要减少线程,以便均衡资源。

配置文件读取模块[readConf.c]不在赘述。

基于初级的线程池优化,这里最主要的问题是解决线程id的管理问题,以及线程取消的管理
这里采用链表来管理线程id,链表的特性便于新增和删除,这里就不在多说了,不懂的可以去看一下维斯的数据结构与算法分析了。
引进thread_revoke结构体来标记全局的取消线程信息,先分析一下线程什么时候需要取消:当任务很少,少到tasknum <threadnum* THREAD_WORKER_LOW_RATIO 时,也就是,任务的增加速度远远赶不上线程的处理速度时,会导致一部分线程处于饥饿状态(阻塞)。那么我们需要取消的就是处于饥饿状态的线程。对于poisx多线程编程中线程的取消,一般采用pthread_cancel()而且此函数是无阻塞返回的(即不等线程是否真的取消)而且,这里有一个很大很大的疑惑,在线程取消的时候,线程必须等到在取消执行点处取消,这里我们的取消执行点是pthread_cond_wait()这线程处理阻塞状态,互斥锁在线程插入阻塞等待队列时已经释放掉。但是,线程本身处于阻塞状态下(放弃了时间片)会执行取消动作吗?我试验了一下,没有……
这里维护一个取消队列,在线程取消时,置全局取消标志位为1,pthread_broadcast()唤醒所有线程,让在线程唤醒时会判断是否进入取消状态,如果是直接主动退出。当然这里有一个取消计数。
先等一等,这样会不会影响到正在工作的线程?答案是不会,因为在工作中的线程已经跳过了线程取消验证~


  1. /**filename:thread_pool.h 
  2.   */  
  3. #ifndef _THREADPOOL_HEAD_   
  4. #define _THREADPOOL_HEAD_   
  5.   
  6. #include <stdio.h>   
  7. #include <stdlib.h>   
  8. #include <unistd.h>   
  9. #include <sys/types.h>   
  10. #include <pthread.h>   
  11. #include <assert.h>   
  12. #include "readConfig.h"   
  13.   
  14. /**********************宏定义区********************/  
  15.   
  16. /* 配置文件名  */  
  17. #define CONFIGFILENAME "thread_pool_config.conf"   
  18.   
  19. /* 线程池最小线程数 */  
  20. #define MIN_THREAD_NUM "MIN_THREAD_NUM"   
  21.   
  22. /* 线程池最大线程数 */  
  23. #define MAX_THREAD_NUM "MAX_THREAD_NUM"   
  24.   
  25. /* 线程池默认线程数 */  
  26. #define DEF_THREAD_NUM "DEF_THREAD_NUM"   
  27.   
  28. /* 管理线程动态调节时间间隔(s) */  
  29. #define MANAGE_ADJUST_INTERVAL "MANAGE_ADJUST_INTERVAL"   
  30.   
  31. /* 线程数与工作峰值比例 */  
  32. #define THREAD_WORKER_HIGH_RATIO "THREAD_WORKER_HIGH_RATIO"   
  33.   
  34. /* 任务与线程数低谷比例 */  
  35. #define THREAD_WORKER_LOW_RATIO "THREAD_WORKER_LOW_RATIO"   
  36.   
  37.   
  38.   
  39.   
  40.   
  41. /************************结构体声明区*************************/  
  42.   
  43. /* 
  44.  *线程池里所有运行和等待的任务都是 一个thread_worker 
  45.  *由于所有任务都在链表中,所以是一个链表结构 
  46.  */  
  47. typedef struct _worker{  
  48.     void *(*process)(void *arg);    /* 工作的处理函数指针 */  
  49.     void *arg;                      /* 处理函数的参数  */  
  50.     struct _worker *next;           /* 下一个工作  */  
  51. }thread_worker;  
  52.   
  53. /*  线程队列节点结构  */  
  54. typedef struct _thread_queue_node{  
  55.     pthread_t   thread_id;  
  56.     struct _thread_queue_node *next;  
  57. }thread_queue_node,*p_thread_queue_node;  
  58.   
  59. /* 线程池结构  */  
  60. typedef struct {  
  61.     int         shutdown;           /* 是否销毁线程池  */  
  62.     pthread_mutex_t     queue_lock;         /* 线程锁  */  
  63.     pthread_mutex_t     remove_queue_lock;      /* 线程锁  */  
  64.     pthread_cond_t      queue_ready;            /* 通知等待队列有新任务条件变量  */  
  65.     thread_queue_node   *thread_queue;          /* 线程池的线程队列 */  
  66.     thread_queue_node   *thread_idle_queue;  
  67.     int             idle_queue_num;   
  68.     int             max_thread_num;         /* 线程池中允许开启的最大线程数  */  
  69.     int             cur_queue_size;         /* 当前等待队列的任务数目  */  
  70.     thread_worker       *queue_head;            /* 线程池所有等待任务  */  
  71. }thread_pool;  
  72.   
  73. /* 线程取消  */  
  74. typedef struct {  
  75.     int             is_revoke;      /*是否需要撤销线程*/  
  76.     int             revoke_count;       /* 已经撤销的线程数  */  
  77.     int             revoke_num;     /* 需要撤销的总数  */  
  78.     pthread_mutex_t     revoke_mutex;       /* 撤销线程加锁  */  
  79.     thread_queue_node   *thread_revoke_queue;   /* 线程撤销队列 */  
  80. }thread_revoke;  
  81.   
  82. /**************************功能函数声明区************************/  
  83.   
  84. /** 向线程池中添加任务  **/  
  85. int pool_add_worker(void *(*process)(void *arg), void *arg);  
  86.   
  87. /** 线程池中的线程  **/  
  88. void *thread_routine(void *arg);  
  89.   
  90. /** 初始化线程池  **/  
  91. void pool_init(int max_thread_num);  
  92.   
  93. /**  销毁线程池  **/  
  94. int pool_destroy();  
  95.   
  96. /** 向线程池中追加线程 **/  
  97. void pool_add_thread(int thread_num);  
  98.   
  99. /** 向线程队列中追加线程 **/  
  100. int thread_queue_add_node(p_thread_queue_node *thread_queue, pthread_t thread_id,int * count);  
  101.   
  102. /**  撤销线程  **/  
  103. void pool_revoke_thread(int thread_num);  
  104.   
  105. /**  从线程队列删除线程  **/  
  106. int thread_queue_remove_node(p_thread_queue_node *thread_queue, pthread_t thread_id,int *count);  
  107.   
  108. /** 获取配置文件中某一项的值  **/  
  109. int get_config_value(char * item);  
  110.   
  111. /*****************全局变量声明区******************/  
  112. /* 线程池  */  
  113. extern thread_pool      *g_pool;  
  114.   
  115. /* 线程取消  */  
  116. extern thread_revoke    *g_thread_revoke;  
  117.   
  118. /* 线程池最大线程数  */  
  119. extern int              g_max_thread_num;  
  120.   
  121. /* 线程池最小线程数  */  
  122. extern int              g_min_thread_num;  
  123.   
  124. /* 默认线程池线程数  */  
  125. extern int              g_def_thread_num;     
  126.   
  127. /* 管理线程调整时间间隔  */  
  128. extern int              g_manage_adjust_interval;  
  129.   
  130. /* 线程任务峰值比率:衡量负载 */  
  131. extern int              g_thread_worker_high_ratio;   
  132.   
  133. /* 线程任务低谷比率:衡量负载 */  
  134. extern int              g_thread_worker_low_ratio;    
  135.   
  136. #endif  


  1. /** filename:thread_pool.c 
  2.  */ 
  3. #include "threadpool.h"   
  4.   
  5. /**************** 全局变量定义区 ***************/  
  6.    
  7. thread_pool     *g_pool = NULL;  
  8. thread_revoke   *g_thread_revoke = NULL;  
  9. int             g_def_thread_num = 0;  
  10. int             g_manage_adjust_interval = 0;  
  11. int             g_max_thread_num = 0;  
  12. int             g_min_thread_num = 0;  
  13. int             g_thread_worker_high_ratio = 0;  
  14. int             g_thread_worker_low_ratio = 0;  
  15.   
  16.   
  17. /** 函数名:    get_config_value  int 的项值 
  18.    *    功能描述:   获取配置文件中某一项的值 
  19.    *    参数列表:  item:为配置文件中的项名 
  20.    *    返回值:    出错返回-1 成功返回项的值    
  21.    */  
  22.  int get_config_value(char *item)  
  23. {  
  24.     char value[50];  
  25.     if(GetParamValue(CONFIGFILENAME,item,value) == NULL)  
  26.     {  
  27.         return -1;  
  28.     }  
  29.   
  30.     return atoi(value);  
  31. }  
  32.   
  33.   
  34.  /**    函数名:    get_config_value  int 的项值 
  35.    *    功能描述:   初始化配置文件项变量的值 
  36.    *    参数列表:  无 
  37.    *    返回值:    无     
  38.    */  
  39. void conf_init()  
  40. {  
  41.     g_max_thread_num = get_config_value(MAX_THREAD_NUM);  
  42.     g_min_thread_num = get_config_value(MIN_THREAD_NUM);  
  43.     g_def_thread_num = get_config_value(DEF_THREAD_NUM);  
  44.     g_manage_adjust_interval = get_config_value(MANAGE_ADJUST_INTERVAL);  
  45.     g_thread_worker_high_ratio = get_config_value(THREAD_WORKER_HIGH_RATIO);  
  46.     g_thread_worker_low_ratio = get_config_value(THREAD_WORKER_LOW_RATIO);  
  47.   
  48. }  
  49. /** 函数名:    pool_init  
  50.   * 功能描述:   初始化线程池 
  51.   * 参数列表:   max_thread_num :输入要建的线程池的线程最大数目 
  52.   * 返回值:    无 
  53.   */  
  54. void pool_init(int max_thread_num)  
  55. {  
  56.     int i;  
  57.     conf_init();  
  58.     if(max_thread_num < g_min_thread_num)  
  59.     {  
  60.         max_thread_num = g_min_thread_num;  
  61.     }  
  62.     else if(max_thread_num > g_max_thread_num)  
  63.     {  
  64.         max_thread_num = g_max_thread_num;  
  65.     }  
  66.     pthread_attr_t attr;  
  67.     int err;  
  68.     err= pthread_attr_init(&attr);  
  69.     if(err != 0)  
  70.     {  
  71.         perror("pthread_attr_init");  
  72.         exit(1);  
  73.     }  
  74.     err = pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED);  
  75.   
  76.     if(err != 0)  
  77.     {  
  78.         perror("pthread_attr_setdetachstate");  
  79.         exit(1);  
  80.     }  
  81.   
  82.     g_pool = (thread_pool *)malloc(sizeof(thread_pool));  
  83.       
  84.     pthread_mutex_init(&(g_pool->queue_lock),NULL);  
  85.     pthread_mutex_init(&(g_pool->remove_queue_lock),NULL);  
  86.     pthread_cond_init(&(g_pool->queue_ready),NULL);  
  87.   
  88.     g_pool->queue_head = NULL;  
  89.     g_pool->max_thread_num = max_thread_num;  
  90.     g_pool->thread_queue =NULL;  
  91.     g_pool->thread_idle_queue = NULL;  
  92.     g_pool->idle_queue_num = 0;  
  93.     g_pool->cur_queue_size = 0;  
  94.     g_pool->shutdown = 0;  
  95.   
  96.     int temp;  
  97.     for(i = 0; i < max_thread_num; i++)  
  98.     {  
  99.         pthread_t thread_id;  
  100.         pthread_create(&thread_id,&attr,thread_routine,NULL);  
  101.         thread_queue_add_node(&(g_pool->thread_queue),thread_id,&temp);  
  102.         printf("temp&&&&&&&&&&&&&%d\n",temp);  
  103.   
  104.     }  
  105.       
  106.     pthread_attr_destroy(&attr);  
  107.   
  108. }  
  109.   
  110. void thread_revoke_init()  
  111. {  
  112.     g_thread_revoke = (thread_revoke *)malloc(sizeof(thread_revoke));  
  113.       
  114.     pthread_mutex_init(&(g_thread_revoke->revoke_mutex),NULL);  
  115.     g_thread_revoke->thread_revoke_queue = NULL;  
  116.     g_thread_revoke->revoke_count = 0;  
  117.     g_thread_revoke->is_revoke = 0;  
  118.     g_thread_revoke->revoke_num = 0;  
  119. }  
  120. /** 函数名:    thread_queue_add_node 
  121.  
  122.   * 功能描述:   向线程池中新增线程 
  123.   * 参数列表:   thread_queue:要增加的线程的线程队列 thread_id:线程的id 
  124.   * 返回值:    成功返回0 失败返回1 
  125.   */  
  126. int thread_queue_add_node(p_thread_queue_node *thread_queue,pthread_t thread_id,int *count)  
  127. {  
  128.     pthread_mutex_lock(&(g_pool->remove_queue_lock));  
  129.     printf("++++count:%d++++++add thread id :%u++++\n",*count,thread_id);  
  130.     thread_queue_node *p = *thread_queue;  
  131.     thread_queue_node *new_node = (thread_queue_node *)malloc(sizeof(thread_queue_node));  
  132.     if(NULL == new_node)  
  133.     {  
  134.         printf("malloc for new thread queue node failed!\n");  
  135.         pthread_mutex_unlock(&(g_pool->remove_queue_lock));  
  136.         return 1;  
  137.     }  
  138.       
  139.     new_node->thread_id = thread_id;  
  140.     new_node->next = NULL;  
  141.       
  142.     /*如果队列为空*/  
  143.     if(NULL == *(thread_queue))  
  144.     {  
  145.         *(thread_queue) = new_node;  
  146.         (*count)++;       
  147.         pthread_mutex_unlock(&(g_pool->remove_queue_lock));  
  148.         return 0;  
  149.     }  
  150.       
  151.     /*每次都将新节点插入到队列头部*/  
  152.     new_node->next = p;  
  153.     *(thread_queue) = new_node;  
  154.     (*count)++;       
  155.     pthread_mutex_unlock(&(g_pool->remove_queue_lock));  
  156.     return 0;  
  157. }  
  158. int thread_queue_remove_node(p_thread_queue_node *thread_queue,pthread_t thread_id,int *count)  
  159. {  
  160.   
  161.     pthread_mutex_lock(&(g_pool->remove_queue_lock));  
  162.     printf("---count:%d------remove threadid : %u----\n",*count,thread_id);  
  163.     p_thread_queue_node current_node,pre_node;  
  164.     if(NULL == *(thread_queue))  
  165.     {  
  166.         printf("revoke a thread node from queue failed!\n");  
  167.         pthread_mutex_unlock(&(g_pool->remove_queue_lock));  
  168.         return 1;  
  169.     }  
  170.   
  171.     current_node = *(thread_queue);  
  172.     pre_node = *(thread_queue);  
  173.     int i = 1;  
  174.     while(i < g_pool->max_thread_num && current_node != NULL)  
  175.     {  
  176.         printf("i = %d, max_thread_num = %d \n",i,g_pool->max_thread_num);  
  177.         i++;  
  178.         if(thread_id == current_node->thread_id)  
  179.         {  
  180.             break;  
  181.         }  
  182.         pre_node = current_node;  
  183.         current_node = current_node->next;  
  184.       
  185.     }  
  186.       
  187.     if(NULL == current_node)  
  188.     {  
  189.         printf("revoke a thread node from queue failed!\n");  
  190.         pthread_mutex_unlock(&(g_pool->remove_queue_lock));  
  191.         return 1;  
  192.     }  
  193.   
  194.     /*找到该线程的位置,删除对应的线程节点 如果要删除的节点就是头节点 */  
  195.     if(current_node->thread_id == (*(thread_queue))->thread_id)  
  196.     {  
  197.         *(thread_queue) = (*(thread_queue))->next;  
  198.         free(current_node);  
  199.         (*count)--;  
  200.         pthread_mutex_unlock(&(g_pool->remove_queue_lock));  
  201.         return 0;  
  202.     }  
  203.   
  204.     /*找到该线程的位置,删除对应的线程节点 如果要删除的节点就是尾节点 */  
  205.     if(current_node->next == NULL)  
  206.     {  
  207.         pre_node->next =NULL;  
  208.         free(current_node);  
  209.         (*count)--;  
  210.         pthread_mutex_unlock(&(g_pool->remove_queue_lock));  
  211.         return 0;  
  212.     }  
  213.     pre_node = current_node->next;  
  214.     free(current_node);  
  215.     (*count)--;  
  216.     printf("0 max_thread_num = %d\n",g_pool->max_thread_num);  
  217.     pthread_mutex_unlock(&(g_pool->remove_queue_lock));  
  218.     return 0;  
  219. }  
  220.   
  221.   
  222. /** 函数名:    pool_add_worker  
  223.   * 功能描述:   向线程池中加任务 
  224.   * 参数列表:   process :函数指针,指向处理函数用作真正的工作处理 
  225.   *             arg:工作队列中的参数 
  226.   * 返回值:    成功返回0,失败返回-1 
  227.   */  
  228. int pool_add_worker(void*(*process)(void *arg),void *arg)  
  229. {  
  230.     thread_worker *new_work = (thread_worker *)malloc(sizeof(thread_worker));  
  231.       
  232.     if(new_work == NULL)  
  233.     {  
  234.         return -1;  
  235.     }  
  236.       
  237.     new_work->process = process;  
  238.     new_work->arg = arg;  
  239.     new_work->next = NULL;  
  240.   
  241.     pthread_mutex_lock(&(g_pool->queue_lock));  
  242.   
  243.     /*将任务加入等待队列中*/  
  244.     thread_worker *member = g_pool->queue_head;  
  245.     if(member != NULL)  
  246.     {  
  247.         while(member->next != NULL)  
  248.         {  
  249.             member = member->next;  
  250.         }  
  251.   
  252.         member->next = new_work;  
  253.     }  
  254.     else  
  255.     {  
  256.         g_pool->queue_head = new_work;  
  257.     }  
  258.   
  259.     assert(g_pool->queue_head != NULL);  
  260.   
  261.     g_pool->cur_queue_size++;  
  262.     pthread_mutex_unlock(&(g_pool->queue_lock));  
  263.     /*等待队列中有新任务了,唤醒一个等待线程处理任务;注意,如果所有的线程都在忙碌,这句话没有任何作用*/  
  264.     pthread_cond_signal(&(g_pool->queue_ready));  
  265.   
  266.     return 0;  
  267. }  
  268.   
  269. /** 函数名:    pool_add_thread  
  270.  
  271.   * 功能描述:   向线程池中新增线程 
  272.   * 参数列表:   thread_num:要增加的线程数目 
  273.   * 返回值:    无 
  274.   */  
  275. void pool_add_thread(int thread_num)  
  276. {  
  277.     int i;  
  278.     pthread_attr_t attr;  
  279.     int err = pthread_attr_init(&attr);  
  280.     if(err != 0)  
  281.     {  
  282.         perror("pthread_attr_init");  
  283.         exit(1);  
  284.     }  
  285.     err = pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED);  
  286.   
  287.     if(err != 0)  
  288.     {  
  289.         perror("pthread_attr_setdetachstate");  
  290.         exit(1);  
  291.     }  
  292.       
  293.     for(i = 0; i < thread_num; i++)  
  294.     {  
  295.         pthread_t thread_id;  
  296.         pthread_create(&thread_id,&attr,thread_routine,NULL);  
  297.         thread_queue_add_node(&(g_pool->thread_queue),thread_id,&(g_pool->max_thread_num));  
  298.     }  
  299.   
  300.     pthread_attr_destroy(&attr);  
  301. }  
  302.   
  303. /** 函数名:    pool_revoke_thread 
  304.  
  305.   * 功能描述:   从线程池线程中撤销线程 
  306.   * 参数列表:   thread_num:要撤销的线程数目 
  307.   * 返回值:    无 
  308.   */  
  309. void pool_revoke_thread(int thread_num)  
  310. {  
  311.     if(thread_num == 0)  
  312.     {  
  313.         return;  
  314.     }  
  315.   
  316.     g_thread_revoke->revoke_num =thread_num;  
  317.     g_thread_revoke->is_revoke = 1;  
  318.     printf("----max_thread_num %d---------revoke %d thread-----\n",g_pool->max_thread_num,thread_num);  
  319.     thread_queue_node * p = g_thread_revoke->thread_revoke_queue;  
  320.   
  321.     pthread_cond_broadcast(&(g_pool->queue_ready));  
  322.   
  323. }  
  324. void * thread_manage(void *arg)  
  325. {  
  326.     int optvalue;  
  327.     int thread_num;  
  328.     while(1)  
  329.      {  
  330.        
  331.          if(g_pool->cur_queue_size > g_thread_worker_high_ratio * g_pool->max_thread_num)  
  332.         {  
  333.              optvalue = 1;  
  334.              thread_num =(g_pool->cur_queue_size -g_thread_worker_high_ratio * g_pool->max_thread_num) / g_thread_worker_high_ratio;  
  335.         }  
  336.         else if (g_pool->cur_queue_size * g_thread_worker_low_ratio < g_pool->max_thread_num)  
  337.         {  
  338.             optvalue = 2;  
  339.             thread_num =(g_pool->max_thread_num -g_thread_worker_low_ratio * g_pool->cur_queue_size) / g_thread_worker_low_ratio;  
  340.         }  
  341.        
  342.         if(1 == optvalue)  
  343.         {  
  344.               
  345.             if(g_pool->max_thread_num + thread_num > g_max_thread_num)  
  346.             {  
  347.                 thread_num = g_max_thread_num - g_pool->max_thread_num;  
  348.             }  
  349.             pool_add_thread(thread_num);  
  350.               
  351.         }  
  352.         else if( 2 == optvalue)  
  353.         {  
  354.             if(g_pool->max_thread_num - thread_num < g_min_thread_num)  
  355.             {  
  356.                 thread_num = g_pool->max_thread_num - g_min_thread_num;  
  357.             }  
  358.         //  pthread_t revoke_tid;   
  359.         //  pthread_create(&revoke_tid,NULL,(void *)pool_revoke_thread,(void *)thread_num);   
  360.             pool_revoke_thread(thread_num);  
  361.         }  
  362.   
  363.         printf("==========ManageThread=============\n");  
  364.   
  365.         printf("cur_queue_size = %d  |  max_thread_num = %d\n",g_pool->cur_queue_size,g_pool->max_thread_num);  
  366.         conf_init();  
  367.         sleep(g_manage_adjust_interval);  
  368.  }  
  369.   
  370.   
  371.   
  372.       
  373.       
  374. }  
  375.   
  376.   
  377. /** 函数名:    pool_destroy  
  378.   * 功能描述:   销毁线程池 
  379.   * 参数列表:   无 
  380.   * 返回值:    成功返回0,失败返回-1 
  381.   */  
  382. int pool_destroy()  
  383. {  
  384.     if(g_pool->shutdown)  
  385.     {  
  386.         return -1;  
  387.     }  
  388.       
  389.     g_pool->shutdown = 1;  
  390.   
  391.     /* 唤醒所有等待线程,线程池要销毁  */  
  392.     pthread_cond_broadcast(&(g_pool->queue_ready));  
  393.   
  394.     /* 阻塞等待线程退出,防止成为僵尸  */  
  395.     thread_queue_node * q = g_pool->thread_queue;  
  396.     thread_queue_node * p = q;  
  397.       
  398.     g_pool->thread_queue = NULL;  
  399.   
  400.     /* 销毁等待队列  */  
  401.     thread_worker *head = NULL;  
  402.   
  403.     while(g_pool->queue_head != NULL)  
  404.     {  
  405.         head = g_pool->queue_head;  
  406.         g_pool->queue_head = g_pool->queue_head->next;  
  407.         free(head);  
  408.     }  
  409.       
  410.     g_pool->queue_head = NULL;  
  411.   
  412.     /* 条件变量和互斥量销毁  */  
  413.     pthread_mutex_destroy(&(g_pool->queue_lock));  
  414.     pthread_mutex_destroy(&(g_pool->remove_queue_lock));  
  415.     pthread_cond_destroy(&(g_pool->queue_ready));  
  416.       
  417.     /* 销毁整个线程池  */  
  418.     free(g_pool);  
  419.     g_pool = NULL;  
  420.   
  421.     return 0;  
  422.       
  423. }  
  424.   
  425. void cleanup(void *arg)  
  426. {  
  427.         thread_queue_remove_node(&(g_pool->thread_queue),pthread_self(),&(g_pool->max_thread_num));  
  428.         pthread_mutex_unlock(&(g_pool->queue_lock));  
  429.         printf("thread ID %d will exit\n",pthread_self());  
  430.       
  431. }  
  432.   
  433. /** 函数名:    thread_routine  
  434.   * 功能描述:   线程池中的线程 
  435.   * 参数列表:   arg 线程附带参数 一般为NULL; 
  436.   * 返回值:    
  437.   */  
  438. void * thread_routine(void *arg)  
  439. {  
  440.     printf("starting thread ID:%u\n",pthread_self());  
  441.     while(1)  
  442.     {  
  443.         pthread_mutex_lock(&(g_pool->queue_lock));  
  444.         /* 如果等待队列为0 并且不销毁线程池,则处于阻塞状态 
  445.          *pthread_cond_wait 是原子操作,等待前解锁,唤醒后加锁 
  446.          */  
  447.   
  448.         while(g_pool->cur_queue_size == 0 && !g_pool->shutdown )  
  449.         {  
  450.             printf("thread ID %u is waiting \n",pthread_self());  
  451.             pthread_cond_wait(&(g_pool->queue_ready),&(g_pool->queue_lock));  
  452.         }  
  453.   
  454.         /* 如果线程池要销毁 */  
  455.         if(g_pool->shutdown)  
  456.         {  
  457.             thread_queue_remove_node(&(g_pool->thread_queue),pthread_self(),&(g_pool->max_thread_num));  
  458.             pthread_mutex_unlock(&(g_pool->queue_lock));  
  459.             printf("thread ID %d will exit\n",pthread_self());  
  460.             pthread_exit(NULL);  
  461.         }  
  462.   
  463.         if(g_thread_revoke->is_revoke != 0 && g_thread_revoke->revoke_count < g_thread_revoke->revoke_num)  
  464.         {  
  465.         /*  if(g_thread_revoke->revoke_count >= g_thread_revoke->revoke_num ) 
  466.             { 
  467.                  
  468.             printf("-revoke-@@jie锁@@+++\n");     
  469.                 pthread_mutex_unlock(&(g_pool->queue_lock)); 
  470.                 continue; 
  471.             }*/  
  472.             thread_queue_remove_node(&(g_pool->thread_queue),pthread_self(),&(g_pool->max_thread_num));  
  473.           
  474.             thread_queue_add_node(&(g_thread_revoke->thread_revoke_queue),pthread_self(),&(g_thread_revoke->revoke_count));  
  475.             g_thread_revoke->revoke_count++;  
  476.             pthread_mutex_unlock(&(g_pool->queue_lock));  
  477.             printf("revoke success thread ID %d will exit\n",pthread_self());  
  478.             pthread_exit(NULL);  
  479.   
  480.         }  
  481.   
  482.         printf("thread ID %u is starting to work\n",pthread_self());  
  483.         assert(g_pool->cur_queue_size != 0);  
  484.         assert(g_pool->queue_head != NULL);  
  485.   
  486.         /* 等待队列长度减1,并且取出链表的头元素  */  
  487.         g_pool->cur_queue_size--;  
  488.         thread_worker * worker = g_pool->queue_head;  
  489.         g_pool->queue_head = worker->next;  
  490.         pthread_mutex_unlock(&(g_pool->queue_lock));  
  491.         printf("************执行任务\n");  
  492.         /* 调用回调函数,执行任务  */  
  493.         (*(worker->process))(worker->arg);  
  494.         free(worker);  
  495.         worker = NULL;  
  496.               
  497.     }  
  498.     pthread_exit(NULL);  
  499. }  
  500.   
  501. void *myprocess(void *arg)  
  502. {  
  503.     printf("thread ID is %u, working on task%d\n",pthread_self(),*(int *)arg);  
  504.     sleep(1);  
  505.     return NULL;  
  506. }  
  507.   
  508. int main(int argc, char * argv[])  
  509. {  
  510.     pthread_t manage_tid;  
  511.     thread_revoke_init();     
  512.     sleep(1);  
  513.     pool_init(g_def_thread_num);  
  514.     sleep(3);  
  515.     pthread_create(&manage_tid,NULL,thread_manage,NULL);  
  516.   
  517.     int i;  
  518.     for(i = 0;  ; i++)  
  519.     {  
  520.           
  521.         pool_add_worker(myprocess,&i);  
  522.         i++;  
  523.         pool_add_worker(myprocess,&i);  
  524.         i++;  
  525.         pool_add_worker(myprocess,&i);  
  526.         i++;  
  527.         pool_add_worker(myprocess,&i);  
  528.         sleep(1);  
  529.         if(i%8== 0)  
  530.         {  
  531.             sleep(10);  
  532.         }  
  533.   
  534.         if(i%10 == 0)  
  535.         {  
  536.             sleep(20);  
  537.         }  
  538.     }  
  539.   
  540.     sleep(5);  
  541.     printf("=======xiaohui========\n");  
  542.     pool_destroy();  
  543.       
  544.   
  545.     return 0;  
  546. }  

转载请注明出处http://blog.csdn.net/lingfengtengfei/article/details/9039135











    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约