12.5.3 负载平衡的并行基数排序
负载平衡的并行基数排序由Andrew sohn等人给出,原始论文中讲解的是在MPI中的实现,不过这个算法也可以很容易地改成在多核环境中实现。
其基本思想是先将数据平均分成若干段,每个线程各自进行一轮分配操作(和串行的分配操作一样),和串行基数排序不同的是,不是每个线程都进行自己的收集操作,而是要在各个线程间进行收集操作。
仍然以前面串行基数排序的数据作为样本数据,以两个线程为例来演示并行基数排序的过程:
先将数据分为两部分,第1个线程对第1部分数据进行分配操作,第2个线程对第2部分数据进行分配操作
(3 721,4 485,3 362,7 941,5 843,6 954)
(9 271,3 457,2 730,2 008,1 996,18 332)
|
(点击查看大图)图12.5.3 负载平衡的并行基数排序示意图1 | 当分配操作完成后,收集从操作过程如下图所示:
|
(点击查看大图)图12.5.4 负载平衡的并行基数排序示意图2 |
线程1的箱子0和线程2的箱子0连接起来形成一个新的全局箱子0,线程1的箱子1和线程2的箱子1连接起来形成一个新的全局箱子1,…,这些全局箱子和串行基数排序的分配过程得到的箱子是一样的。
在Andrew Sohn的论文中,给出的方法是通过各个箱子中数据的前缀和,来将数据重新进行平均划分到各个线程中去,以便于进行下一轮分配操作,不断重复这个过程就可以得到一个排好序的序列。
在多核系统中,基本思想和上面基本一样,假设输入数据为一个数组,共使用了p个线程,也是将N个数据分解成p段数据,每段含N/p个数据,每个线程先对自己的N/p个数据进行一趟分配操作。不过收集操作和上面不同,而是按照上面的收集顺序依次将数据指针写入原来的数组中,在这个收集过程中,需要计算各个全局箱子中数据放入到数组中的起始位置,即需要计算前缀和。然后再将数组重新划分成p段,每段包含N/p个数据,重复进行上述分配过程下去,直到所有数据都被排好序为止。
可能有人认为将所有数据依次写入原来数组中会花费大量时间,但是如果不这样做的话,在分配过程中需要对离散的数据进行遍历,比对连续的数组进行遍历需要更大的计算开销。并且写入数据到数组中是一个比较简单的过程,代码复杂度要低。
下面给出详细的负载平衡的基数排序算法流程图:
|
图12.5.5 负载平衡的并行基数排序流程图 | 下面给出详细的负载平衡的并行基数排序的C 代码:- /** 负载平衡的并行基数排序
- 对数组进行并行基数排序
-
- @param T *ppData - 待排序数据
- @param UINT uDataLen - 数据长度
- @param UINT uRadix - 基数
- @param UINT uMaxKeyLen - 最大关键词长度
- @param GETKEY GetKeyFunc - 关键词取位回调函数
- @return void - 无
- */
- template <class T, class GETKEY>
- void Parallel_RadixSort_LBR(T *pData, UINT uDataLen,
- UINT uRadix, UINT uMaxKeyLen, GETKEY GetKeyFunc)
- {
- int i;
- int nProcessors = omp_get_num_procs();
- UINT uCount = uDataLen / nProcessors;
-
- if ( uDataLen - uCount * nProcessors > 2 )
- {
- uCount ;
- }
-
- T *pOutData = new T[uDataLen];
- DATA_ARRAY_TEMP<T> **ppDataArray
- = new DATA_ARRAY_TEMP<T> *[nProcessors];
- UINT **ppuBoxDataCount = new UINT *[nProcessors];
-
- for ( i = 0; i < nProcessors; i )
- {
- ppDataArray[i] = new DATA_ARRAY_TEMP<T>[uRadix];
- ppuBoxDataCount[i] = new UINT[uRadix];
- }
-
- UINT uKeyIndex;
-
- for ( uKeyIndex = 0; uKeyIndex < uMaxKeyLen; uKeyIndex )
- {
- //下面代码完成全局的一轮将数据分配到箱子里的操作
- #pragma omp parallel for num_threads(nProcessors) schedule(static, 1)
- for ( i = 0; i < nProcessors; i )
- {
- //每个线程计算自己的数据起始位置,其中end是开区间位置
- UINT begin = i * uCount;
- UINT end = (i 1) * uCount;
- if ( end > uDataLen )
- {
- end = uDataLen;
- }
- T * pOut = pOutData begin;
- T *p = pData begin;
-
- Serial_Partitioned<T>(p, end-begin, uRadix, uKeyIndex, GetKeyFunc,
- ppuBoxDataCount[i], pOut, ppDataArray[i]);
- }//for ( i = 0; i < nProcessors; i )
-
- UINT *puCount = ppuBoxDataCount[0];
- #pragma omp parallel for num_threads(dtn(uRadix,
4096)) //最小循环次数设为多少需要测试 - for ( i = 0; i < uRadix; i )
- {
- int k;
- for ( k = 1; k < nProcessors; k )
- {
- UINT * pk = ppuBoxDataCount[k];
- puCount[i] = pk[i];
- }
- }
-
- //计算前缀和
- for ( i = 1; i < uRadix; i )
- {
- puCount[i] = puCount[i-1];
- }
-
- //得到各段数据的起始位置
- for ( i = uRadix - 1; i > 0; i-- )
- {
- puCount[i] = puCount[i-1];
- }
- puCount[0] = 0;
-
- //收集操作,将pOutData里的数据重新放回pData里
- int m;
-
- #pragma omp parallel for private(i) schedule(dynamic)
- for ( m = 0; m < uRadix; m )
- {
- int nIndex = puCount[m];
- for ( i = 0; i < nProcessors; i )
- {
- DATA_ARRAY_TEMP<T> *pDataArray = ppDataArray[i];
- T *pDataTemp = pDataArray[m].pData;
- int k;
- for ( k = 0; k < pDataArray[m].uCount; k )
- {
- pData[nIndex] = pDataTemp[k];
- nIndex;
- }
- }
- }
- //进行下一轮的分配与收集操作
- } //for ( uKeyIndex = 0; uKeyIndex < uMaxKeyLen; uKeyIndex )
-
- //释放表
- for ( i = 0; i < nProcessors; i )
- {
- delete [] ppDataArray[i];
- delete [] ppuBoxDataCount[i];
- }
-
- delete [] ppDataArray;
- delete [] ppuBoxDataCount;
- delete [] pOutData;
- return;
- }
从上面的负载平衡的并行基数排序代码可以看出,相比于串行基数排序,并行基数排序中需要收集操作,增加了部分计算开销,增加的计算开销所占的比例大约占40%左右,因此它的效率只有70%左右。不过由于它的线程粒度较大,因此它的加速比随CPU核数增加呈线性增加趋势。
【责任编辑: 云霞 TEL:(010)68476606】
|