分享

12.5.3 负载平衡的并行基数排序

 盛夏流年闪耀 2013-07-30

12.5.3 负载平衡的并行基数排序

负载平衡的并行基数排序由Andrew sohn等人给出,原始论文中讲解的是在MPI中的实现,不过这个算法也可以很容易地改成在多核环境中实现。

其基本思想是先将数据平均分成若干段,每个线程各自进行一轮分配操作(和串行的分配操作一样),和串行基数排序不同的是,不是每个线程都进行自己的收集操作,而是要在各个线程间进行收集操作。

仍然以前面串行基数排序的数据作为样本数据,以两个线程为例来演示并行基数排序的过程:

先将数据分为两部分,第1个线程对第1部分数据进行分配操作,第2个线程对第2部分数据进行分配操作

(3 721,4 485,3 362,7 941,5 843,6 954)

(9 271,3 457,2 730,2 008,1 996,18 332)

 
(点击查看大图)图12.5.3 负载平衡的并行基数排序示意图1
当分配操作完成后,收集从操作过程如下图所示:
 
(点击查看大图)图12.5.4 负载平衡的并行基数排序示意图2

线程1的箱子0和线程2的箱子0连接起来形成一个新的全局箱子0,线程1的箱子1和线程2的箱子1连接起来形成一个新的全局箱子1,…,这些全局箱子和串行基数排序的分配过程得到的箱子是一样的。

在Andrew Sohn的论文中,给出的方法是通过各个箱子中数据的前缀和,来将数据重新进行平均划分到各个线程中去,以便于进行下一轮分配操作,不断重复这个过程就可以得到一个排好序的序列。

在多核系统中,基本思想和上面基本一样,假设输入数据为一个数组,共使用了p个线程,也是将N个数据分解成p段数据,每段含N/p个数据,每个线程先对自己的N/p个数据进行一趟分配操作。不过收集操作和上面不同,而是按照上面的收集顺序依次将数据指针写入原来的数组中,在这个收集过程中,需要计算各个全局箱子中数据放入到数组中的起始位置,即需要计算前缀和。然后再将数组重新划分成p段,每段包含N/p个数据,重复进行上述分配过程下去,直到所有数据都被排好序为止。

可能有人认为将所有数据依次写入原来数组中会花费大量时间,但是如果不这样做的话,在分配过程中需要对离散的数据进行遍历,比对连续的数组进行遍历需要更大的计算开销。并且写入数据到数组中是一个比较简单的过程,代码复杂度要低。

下面给出详细的负载平衡的基数排序算法流程图:

 
图12.5.5 负载平衡的并行基数排序流程图
下面给出详细的负载平衡的并行基数排序的C 代码:
  1. /** 负载平衡的并行基数排序  
  2.     对数组进行并行基数排序  
  3.  
  4.     @param  T *ppData - 待排序数据     
  5.     @param  UINT uDataLen - 数据长度      
  6.     @param  UINT uRadix - 基数      
  7.     @param  UINT uMaxKeyLen - 最大关键词长度     
  8.     @param  GETKEY GetKeyFunc - 关键词取位回调函数     
  9.     @return void - 无      
  10. */  
  11. template <class T, class GETKEY> 
  12. void Parallel_RadixSort_LBR(T *pData, UINT uDataLen,  
  13.       UINT uRadix, UINT uMaxKeyLen, GETKEY GetKeyFunc)  
  14. {  
  15.     int i;  
  16.     int nProcessors = omp_get_num_procs();  
  17.     UINT uCount = uDataLen / nProcessors;  
  18.  
  19.     if ( uDataLen - uCount * nProcessors > 2 )  
  20.     {  
  21.         uCount ;  
  22.     }  
  23.  
  24.     T *pOutData = new T[uDataLen];  
  25. DATA_ARRAY_TEMP<T>  **ppDataArray   
  26. = new DATA_ARRAY_TEMP<T> *[nProcessors];  
  27.     UINT **ppuBoxDataCount = new UINT *[nProcessors];  
  28.  
  29.     for ( i = 0; i < nProcessors; i  )  
  30.     {  
  31.         ppDataArray[i] = new DATA_ARRAY_TEMP<T>[uRadix];  
  32.         ppuBoxDataCount[i] = new UINT[uRadix];  
  33.     }  
  34.  
  35.     UINT uKeyIndex;  
  36.  
  37.     for ( uKeyIndex = 0; uKeyIndex < uMaxKeyLen; uKeyIndex  )  
  38.     {  
  39.         //下面代码完成全局的一轮将数据分配到箱子里的操作  
  40. #pragma omp parallel for num_threads(nProcessors) schedule(static, 1)  
  41.         for ( i = 0; i < nProcessors; i  )  
  42.         {  
  43.             //每个线程计算自己的数据起始位置,其中end是开区间位置  
  44.             UINT begin = i * uCount;  
  45.             UINT end = (i 1) * uCount;  
  46.             if ( end > uDataLen )  
  47.             {  
  48.                 end = uDataLen;  
  49.             }  
  50.             T * pOut = pOutData   begin;  
  51.             T *p = pData   begin;  
  52.              
  53. Serial_Partitioned<T>(p, end-begin, uRadix, uKeyIndex, GetKeyFunc,   
  54.                 ppuBoxDataCount[i], pOut, ppDataArray[i]);  
  55.         }//for ( i = 0; i < nProcessors; i  )  
  56.  
  57.         UINT *puCount = ppuBoxDataCount[0];  
  58. #pragma omp parallel for num_threads(dtn(uRadix, 
    4096))  //最小循环次数设为多少需要测试  
  59.         for ( i = 0; i < uRadix; i  )  
  60.         {  
  61.             int k;  
  62.             for ( k = 1; k < nProcessors; k  )  
  63.             {  
  64.                 UINT * pk = ppuBoxDataCount[k];  
  65.                 puCount[i]  = pk[i];  
  66.             }  
  67.         }  
  68.  
  69.         //计算前缀和  
  70.         for ( i = 1; i < uRadix; i  )  
  71.         {  
  72.             puCount[i]  = puCount[i-1];  
  73.         }  
  74.          
  75.         //得到各段数据的起始位置  
  76.         for ( i = uRadix - 1; i > 0; i-- )  
  77.         {  
  78.             puCount[i] = puCount[i-1];  
  79.         }  
  80.         puCount[0] = 0;  
  81.  
  82.         //收集操作,将pOutData里的数据重新放回pData里  
  83.         int m;  
  84.  
  85. #pragma omp parallel for private(i) schedule(dynamic)  
  86.         for ( m = 0; m < uRadix; m  )  
  87.         {  
  88.             int nIndex = puCount[m];  
  89.             for ( i = 0; i < nProcessors; i  )  
  90.             {  
  91.                 DATA_ARRAY_TEMP<T> *pDataArray = ppDataArray[i];  
  92.                 T *pDataTemp = pDataArray[m].pData;  
  93.                 int k;  
  94.                 for ( k = 0; k < pDataArray[m].uCount; k  )  
  95.                 {  
  96.                     pData[nIndex] = pDataTemp[k];  
  97.                      nIndex;  
  98.                 }  
  99.             }  
  100.         }  
  101.         //进行下一轮的分配与收集操作  
  102.     }   //for ( uKeyIndex = 0; uKeyIndex < uMaxKeyLen; uKeyIndex  )  
  103.  
  104.     //释放表  
  105.     for ( i = 0; i < nProcessors; i  )  
  106.     {  
  107.         delete [] ppDataArray[i];  
  108.         delete [] ppuBoxDataCount[i];  
  109.     }  
  110.  
  111.     delete [] ppDataArray;  
  112.     delete [] ppuBoxDataCount;  
  113.     delete [] pOutData;  
  114.     return;  

从上面的负载平衡的并行基数排序代码可以看出,相比于串行基数排序,并行基数排序中需要收集操作,增加了部分计算开销,增加的计算开销所占的比例大约占40%左右,因此它的效率只有70%左右。不过由于它的线程粒度较大,因此它的加速比随CPU核数增加呈线性增加趋势。


【责任编辑:云霞 TEL:(010)68476606】

回书目   上一节   下一节

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多