分享

多路平衡归并的实现--败者树

 moonboat 2014-05-07


记录败者,胜者参加下一轮比赛,当新的元素到达的时候,log2(K)调整就可以选出胜者,下面的log2(K)代进1式,就可以抵消掉k的影响,从而与k无关,那么我们就可以,通过增大k,减少IO次数,并且不会降低内部归并的效率












算法实现
1、一共k路,b[i]对应第i路,比如一共5路 b[0],b[1],b[2],b[3],b[4]对应5路的首元素
2、ls[i],是一棵败者树ls[0],ls[1],ls[2],ls[3],ls[4],分别存储上面的b[i]的下标i,这样就利用元素的下标,组织了成它们的树关系

初始化时,ls[i]=i;

然后从最后一个结点开始调整,t=(s+k)/2表示的是s的父结点, 然后逐渐向上向父结点比较t=t/2,最后到达ls[0],它是冠军,经过所有的结点调整,就成了一棵败者树,最后输出b[ls[0]],

k路合并,根据ls[0],可以知道是从哪路输出,然后再从哪路输入





一个java 的模仿实现,可运行

点击(此处)折叠或打开

  1. package my.sort;

  2. import java.io.BufferedInputStream;
  3. import java.io.BufferedOutputStream;
  4. import java.io.BufferedWriter;
  5. import java.io.DataInputStream;
  6. import java.io.DataOutputStream;
  7. import java.io.File;
  8. import java.io.FileInputStream;
  9. import java.io.FileNotFoundException;
  10. import java.io.FileOutputStream;
  11. import java.io.FileWriter;
  12. import java.io.IOException;
  13. import java.util.ArrayList;
  14. import java.util.Arrays;
  15. import java.util.Iterator;
  16. import java.util.Random;

  17. /**
  18.  * 基于大数据量的外排序算法,分为二路归并和多路归并
  19.  * @author java2king
  20.  * @link http://blog.csdn.net/Java2King
  21.  *
  22.  */
  23. public class ExternalSort {

  24.     public static int ITEM_COUNT = 10000000; //总数 

  25.     public static int BUFFER_SIZE = 1024*4*1000;// 一次缓冲读取
  26.     
  27.     public static int FILE_COUNT = 1024*1000*1*4;// 每个文件的记录数1
  28.     
  29.     public static File MAIN_FILE = new File("mainset");//要排序的文件

  30.     /**
  31.      * 二路归并
  32.      * @param file
  33.      * @return
  34.      * @throws IOException
  35.      */
  36.     public File sort(File file) throws IOException {
  37.         ArrayList<File> files = split(file);
  38.         return process(files);
  39.     }
  40.     /**
  41.      * 多路归并
  42.      * @param file
  43.      * @throws IOException
  44.      */
  45.     public void mSort(File file) throws IOException{
  46.         ArrayList<File> files = split(file);
  47.         multipleMerge(files);
  48.         
  49.     }

  50.     // recursive method to merge the lists until we are left with a
  51.     // single merged list
  52.     private File process(ArrayList<File> list) throws IOException {
  53.         if (list.size() == 1) {
  54.             return list.get(0);
  55.         }
  56.         ArrayList<File> inter = new ArrayList<File>();
  57.         for (Iterator<File> itr = list.iterator(); itr.hasNext();) {
  58.             File one = itr.next();
  59.             if (itr.hasNext()) {
  60.                 File two = itr.next();
  61.                 inter.add(merge(one, two));
  62.             } else {
  63.               // return one;
  64.                 inter.add(one);
  65.             }
  66.         }
  67.         return process(inter);
  68.     }
  69.     /**
  70.      * Splits the original file into a number of sub files. 
  71.      */
  72.     private ArrayList<File> split(File file) throws IOException {
  73.         ArrayList<File> files = new ArrayList<File>();
  74.         int[] buffer = new int[FILE_COUNT];//FILE_COUNT每个文件的记录数
  75.         FileInputStream fr = new FileInputStream(file);
  76.         BufferedInputStream bin = new BufferedInputStream(fr,BUFFER_SIZE);
  77.         DataInputStream din=new DataInputStream(bin); 
  78.         boolean fileComplete = false;
  79.         
  80.         while (!fileComplete) {
  81.             int index = buffer.length;
  82.             for (int i = 0; i < buffer.length && !fileComplete; i++) {
  83.                 try {
  84.                      buffer[i] = din.readInt();
  85.                 } catch (Exception e) {
  86.                     fileComplete = true;
  87.                     index = i;
  88.                 }
  89.             }
  90.             if (index != 0 && buffer[0] > -1) {
  91.                 Arrays.sort(buffer, 0, index);
  92.                 File f = new File("set" + new Random().nextInt());
  93.          // File temp = File.createTempFile("josp", ".tmp", f); 
  94.                 FileOutputStream writer = new FileOutputStream(f);
  95.                 BufferedOutputStream bOutputStream = new BufferedOutputStream(writer);
  96.              
  97.                 DataOutputStream dout=new DataOutputStream(bOutputStream); 
  98.                 for (int j = 0; j < index; j++) {
  99.                     dout.writeInt(buffer[j]);
  100.                 }
  101.                 dout.close();
  102.                 bOutputStream.close();
  103.                 writer.close();
  104.                 files.add(f);
  105.                
  106.             }

  107.         }
  108.         din.close();
  109.         bin.close();
  110.         fr.close();
  111.         return files;
  112.     }
  113.     /**
  114.      * 多路归并
  115.      * @param list
  116.      * @throws IOException
  117.      */
  118.     private void multipleMerge(ArrayList<File> list) throws IOException 
  119.     {
  120.         
  121.         int fileSize = list.size();
  122.         if(fileSize == 1)
  123.         {
  124.             return;
  125.         }
  126.         ArrayList<DataInputStream> dinlist = new ArrayList<DataInputStream>();
  127.         int[] ext = new int[fileSize];//比较数组
  128.     //    File output = new File("multipleMerged");
  129.         FileOutputStream os = new FileOutputStream(MAIN_FILE);
  130.         BufferedOutputStream bout = new BufferedOutputStream(os);
  131.         DataOutputStream dout = new DataOutputStream(bout);

  132.         for (int i = 0; i < fileSize; i++) 
  133.         {
  134.             try {
  135.                 dinlist.add(i, new DataInputStream(new BufferedInputStream(
  136.                         new FileInputStream(list.get(i)), BUFFER_SIZE)));
  137.             } catch (Exception e) {
  138.                 e.printStackTrace();
  139.             }
  140.         }
  141.         int index = 0;
  142.         for (int i = 0; i < fileSize; i++) {
  143.             try {
  144.                 ext[i] = dinlist.get(i).readInt();
  145.             } catch (Exception e) {
  146.                 System.err.println("file_" + i + "为空");
  147.                 ext[i] = -1;
  148.             }
  149.         }
  150.         int count = fileSize;
  151.         int[] sum = new int[fileSize];
  152.         
  153.         while (count > 1) 
  154.         {
  155.             index = getMinIndex(ext);
  156.             dout.writeInt(ext[index]);
  157.             sum[index]++;
  158.             try {
  159.                 ext[index] = dinlist.get(index).readInt();
  160.             } catch (Exception e) {
  161.                 ext[index] = -1;
  162.                 count--;
  163.                 dinlist.get(index).close();
  164.         //        System.err.println(index + "空,写进:" +sum[index]);
  165.                 
  166.             }
  167.         }
  168.         int sIndex = getSIndex(ext);
  169.         dout.writeInt(ext[sIndex]);
  170.         while (true) {
  171.             try {
  172.                 dout.writeInt(dinlist.get(sIndex).readInt());
  173.             } catch (Exception e) {
  174.                 dinlist.get(sIndex).close();
  175.                 break;
  176.             }
  177.         }
  178.         dout.close();
  179.     }
  180.     
  181.     //找到剩下的最后一个文件输入流
  182.     public int getSIndex(int[] ext){
  183.         int result = 0;
  184.         for (int i = 0; i < ext.length; i++) {
  185.             if(ext[i]!= -1){
  186.                 result = i;
  187.                 break;
  188.             }
  189.         }
  190.         return result;
  191.     }
  192.     //找到数据中最小的一个
  193.     public int getMinIndex(int[] ext){
  194.         int min = 2147483647;
  195.         int index = -1;
  196.         for (int i = 0; i < ext.length; i++) {
  197.             if(ext[i] != -&& ext[i] < min){
  198.                 min = ext[i];
  199.                 index = i;
  200.             }
  201.         }
  202.         return index;
  203.     }
  204.     /**
  205.      * 二路归并
  206.      * 
  207.      * @param one
  208.      * @param two
  209.      * @return
  210.      * @throws IOException
  211.      */
  212.     private File merge(File one, File two) throws IOException {
  213.         FileInputStream fis1 = new FileInputStream(one);
  214.         FileInputStream fis2 = new FileInputStream(two);
  215.         BufferedInputStream bin1 = new BufferedInputStream(fis1,BUFFER_SIZE);
  216.         BufferedInputStream bin2 = new BufferedInputStream(fis2,BUFFER_SIZE);
  217.         
  218.         DataInputStream din1=new DataInputStream(bin1); 
  219.         DataInputStream din2=new DataInputStream(bin2); 
  220.         
  221.         File output = new File("merged" + new Random().nextInt());
  222.         FileOutputStream os = new FileOutputStream(output);
  223.         BufferedOutputStream bout = new BufferedOutputStream(os);
  224.         DataOutputStream dout=new DataOutputStream(bout); 
  225.    
  226.         int a = -1;//= din1.readInt();
  227.         int b = -1;//= din2.readInt();
  228.         
  229.         boolean finished = false;
  230.         boolean emptyA = false;//
  231.         int flag = 0;
  232.         while (!finished) {

  233.             if (flag != 1) {
  234.                 try {
  235.                     a = din1.readInt();
  236.                 } catch (Exception e) {
  237.                     emptyA = true;
  238.                     break;
  239.                 }
  240.             }
  241.             if (flag != 2) {
  242.                 try {
  243.                     b = din2.readInt();
  244.                 } catch (Exception e) {
  245.                     emptyA = false;
  246.                     break;
  247.                 }
  248.             }
  249.             if(> b){
  250.                 dout.writeInt(b);
  251.                 flag = 1;
  252.             }else if( a < b){
  253.                 dout.writeInt(a);
  254.                 flag = 2;
  255.             }else if(== b){
  256.                 dout.write(a);
  257.                 dout.write(b);
  258.                 flag = 0;
  259.             }
  260.         }
  261.         finished = false;
  262.         if(emptyA){
  263.             dout.writeInt(b);
  264.             while(!finished){
  265.                 try {
  266.                     b = din2.readInt();
  267.                 } catch (Exception e) {
  268.                     break;
  269.                 }
  270.                 dout.writeInt(b);
  271.             }
  272.         }else{
  273.             dout.writeInt(a);
  274.             while(!finished){
  275.                 try {
  276.                     a = din1.readInt();
  277.                 } catch (Exception e) {
  278.                     break;
  279.                 }
  280.                 dout.writeInt(a);
  281.             }
  282.         }
  283.         dout.close();
  284.         os.close();
  285.         bin1.close();
  286.         bin2.close();
  287.         bout.close();
  288.         return output;
  289.     }

  290.  

  291.   

  292.     /**
  293.      * @param args
  294.      * @throws IOException
  295.      */
  296.     public static void main(String[] args) throws IOException {
  297.               
  298.         Random random = new Random(System.currentTimeMillis());
  299.         FileOutputStream fw = new FileOutputStream(MAIN_FILE);
  300.         BufferedOutputStream bout = new BufferedOutputStream(fw);
  301.         DataOutputStream dout=new DataOutputStream(bout); 
  302.         //ITEM_COUNT = 10000000; //总数 
  303.         for (int i = 0; i < ITEM_COUNT; i++) {
  304.             int ger = random.nextInt();
  305.             ger = ger < 0 ? -ger : ger;
  306.             dout.writeInt(ger);

  307.         }
  308.         dout.close();
  309.         bout.close();
  310.         fw.close();
  311.         ExternalSort sort = new ExternalSort();
  312.         System.out.println("Original:");

  313.         long start = System.currentTimeMillis();
  314.         sort.mSort(MAIN_FILE);
  315.        
  316.         
  317.         long end = System.currentTimeMillis();
  318.         System.out.println((end - start)/1000 + "s");
  319.         recordFile((end - start)/1000 ,true);
  320.     }

  321.     private static void recordFile(long time,boolean isBuffer)
  322.             throws FileNotFoundException, IOException {
  323.         BufferedWriter bw = new BufferedWriter(new FileWriter("log",true));
  324.         bw.write("FILE_COUNT = "+FILE_COUNT+";对"+ ITEM_COUNT + "条数据 "+ ITEM_COUNT*4/(1024*1204) +"MB排序耗时:" + time + "s ");
  325.         if(isBuffer){
  326.             bw.write(" 使用缓冲:"+BUFFER_SIZE*4/(1024*1204) +"MB");
  327.         }
  328.         bw.newLine();
  329.         bw.close();
  330.     }

  331. }

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多