HadoopYarn资源调用框架、排序2018-03-19版本修改人修改记录修改时间V1.0王守奎编写2018/3/19目录Yarn资源调 用框架4部分排序4全排序5使用一个分区实现全排序5自定义分区实现全排序5使用采样器分区实现全排序11Yarn资源调用框架部分排序在 mr的app中指定reduce个数为两个(job.setNumReduceTasks(2);),reduce会生成两个文件,文件部 分排序,如下图所示:Mapreduce会根据hash算法对key值进行分区,在各自的分区中进行排序。调用默认的HashPartit ioner,不需要操作,每个reduce聚合的key都是有序的全排序使用一个分区实现全排序使用一个分区默认会对key值排序自定义分 区实现全排序自己根据年份数据,计算key的值,按照key所在区间进行分区需要对key有全面的认知,否则容易出现数据倾斜Apppac kagecom.bm.mapreduce.sort.all;importorg.apache.hadoop.fs.Path;i mportorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.m apreduce.Job;importorg.apache.hadoop.mapreduce.lib.input.FileInp utFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutput Format;/Copyright(C)2018编码界的小菜鸟作者:王守奎2018年10月26日下午6:50 :23/publicclassApp{publicstaticvoidmain(String[]args)th rowsException{if(args.length!=2){System.out.println("Usage: MaxTemperature");System.exit(-1);}//new jobJobjob=Job.getInstance();//findjarbyclassnamejob.setJa rByClass(MyMapper.class);job.setJobName("AllSortApp");//设置reduc etask的数量job.setNumReduceTasks(3);//设置输入输出文件路径FileInputFormat.add InputPath(job,newPath(args[0]));FileOutputFormat.setOutputPath( job,newPath(args[1]));//设置最大最小切片大小,都为128M,结果等同于默认值FileInputForm at.setMaxInputSplitSize(job,10241024128);FileInputFormat.s etMinInputSplitSize(job,10241024128);//设置预合并、mapper、reduce 函数类job.setCombinerClass(MyReducer.class);job.setMapperClass(MyMap per.class);job.setReducerClass(MyReducer.class);//设置输出的key-value class类型job.setOutputKeyClass(IntWritable.class);job.setOutputValu eClass(IntWritable.class);//设置分区类job.setPartitionerClass(AllSortP artitioner.class);//执行job知道完成System.exit(job.waitForCompletion(tr ue)?0:1);}}Mapperpackagecom.bm.mapreduce.sort.all;importjav a.io.IOException;importjava.net.InetAddress;importorg.apache.ha doop.io.IntWritable;importorg.apache.hadoop.io.LongWritable;impo rtorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.M apper;/Copyright(C)2018编码界的小菜鸟作者:王守奎2018年10月26日下午6:43: 33/publicclassMyMapperextendsMapperWritable,IntWritable>{//constantprivatestaticfinalintMISSI NG=9999;//mapfunctionprotectedvoidmap(LongWritablekey,Tex tvalue,Contextcontext)throwsIOException,InterruptedException {//perlineStringline=value.toString();//getyearStringyea r=line.substring(15,19);//aitTempintairTemperature;if(line. charAt(87)==''+''){airTemperature=Integer.parseInt(line.substr ing(88,92));}else{airTemperature=Integer.parseInt(line.subst ring(87,92));}//validairtempdataStringquality=line.substr ing(92,93);if(airTemperature!=MISSING&&quality.matches("[01 459]")){//mapoutputcontext.write(newIntWritable(Integer.parse Int(year)),newIntWritable(airTemperature));Stringhostname=In etAddress.getLocalHost().getHostName();Stringthread=Thread.cur rentThread().getName()+"@"+Thread.currentThread().hashCode(); context.getCounter("m-1","mapper-map-"+hostname+"-"+thread ).increment(1);}System.out.println(key.get());}}Reducerpackageco m.bm.mapreduce.sort.all;importjava.io.IOException;importjava.ne t.InetAddress;importorg.apache.hadoop.io.IntWritable;importorg. apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduc e.Reducer.Context;/Copyright(C)2018编码界的小菜鸟作者:王守奎2018年10 月26日下午6:45:24/publicclassMyReducerextendsReducerle,IntWritable,IntWritable,IntWritable>{//reduceprotectedvo idreduce(IntWritablekey,Iterablevalues,Context context)throwsIOException,InterruptedException{//maxintmaxV alue=Integer.MIN_VALUE;Stringhostname=InetAddress.getLocalHo st().getHostName();Stringthread=Thread.currentThread().getName ()+"@"+Thread.currentThread().hashCode();context.getCounter(" r-1","reducer-reduce-"+hostname+"-"+thread).increment(1);/ /forfor(IntWritablevalue:values){maxValue=Math.max(maxVal ue,value.get());}//outputcontext.write(key,newIntWritable(max Value));}}AllSortPartitionerpackagecom.bm.mapreduce.sort.all;imp ortorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.map reduce.Partitioner;/Copyright(C)2018编码界的小菜鸟作者:王守奎2018年1 0月26日下午6:46:31/publicclassAllSortPartitionerextendsPartiti oner{/Getthepartitionnumber foragivenkey(hencerecord)giventhetotalnumberofparti tionsi.e.numberofreduce-tasksforthejob.@paramkeythek eytobepartioned.@paramvaluetheentryvalue.@paramnumP artitionsthetotalnumberofpartitions.@returnthepartition numberforthekey ./@OverridepublicintgetPartit ion(IntWritablekey,IntWritablevalue,intnum){intstep=5/ num;int[][]arr=newint[num][2];intcount=step;inti=0;whil e(true){if(count==0){arr[i]=newint[]{Integer.MIN_VALUE ,1900+count};}else{arr[i]=newint[]{1900+count,1900 +3count};}i++;count=count+step;if(count>=num){break; }}for(intj=0;jar=key.get();if(year>=row[0]&&yeareturn0;}}运行结果使用采样器分区实现全排序适用于操作SequenceFile(Key-Value)运行com.bm.da taStruction.SequenceFileStructure.write_2(),生成hdfs://namenode:802 0/user/hadoop/data/mySequenceFile.seq文件。练习的文档内容0tom01 tom12tom2...97tom9798tom9899tom99Apppa ckagecom.bm.mapreduce.sort.sampler;importorg.apache.hadoop.conf .Configuration;importorg.apache.hadoop.fs.Path;importorg.apache .hadoop.io.IntWritable;importorg.apache.hadoop.io.Text;importor g.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce. lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib. input.SequenceFileInputFormat;importorg.apache.hadoop.mapreduce. lib.output.FileOutputFormat;importorg.apache.hadoop.mapreduce.li b.partition.InputSampler;importorg.apache.hadoop.mapreduce.lib.p artition.InputSampler.RandomSampler;importorg.apache.hadoop.mapr educe.lib.partition.TotalOrderPartitioner;/Copyright(C)201 8编码界的小菜鸟作者:王守奎2018年10月26日下午7:09:54/publicclassApp{p ublicstaticvoidmain(String[]args)throwsException{//newjo bJobjob=Job.getInstance();Configurationconf=job.getConfigur ation();//findjarbyclassnamejob.setJarByClass(MyMapper.class) ;job.setJobName("AllSortAppwithRandomSampler");//设置输入输出文件路径F ileInputFormat.addInputPath(job,newPath(args[0]));FileOutputFor mat.setOutputPath(job,newPath(args[1]));job.setMapperClass(MyMa pper.class);job.setReducerClass(MyReducer.class);job.setOutputKey Class(IntWritable.class);job.setOutputValueClass(Text.class);//小 于等于Sampler的切片数,一般等于job.setNumReduceTasks(4);//第一个参数表示key会被选中的概率[ 0.0-1.0]//第二个参数是所有选中的切片中获取的采样数量//表示采样点最大数目为,我这里设置10代表我的采样点最大为10, 如果超过10,那么每次有新的采样点生成时//会删除原有的一个采样点,此参数大数据的时候尽量设置多一些//第三个参数是指定最大的 切片数RandomSamplersampler=newInputSampler.Ran domSampler(0.1,10,4);//设置分区文件,即采样后放在的文件的文件名 ,不是完整路径TotalOrderPartitioner.setPartitionFile(conf,newPath("dat a/sampler/par1/"));//采样器分区必须为SequenceFilejob.setInputFormatClass( SequenceFileInputFormat.class);//将采样点写入到分区文件中,这个必须要InputSampler.w ritePartitionFile(job,sampler);//Hadoop内置的名为TotalOrderPartitio ner的全排序job.setPartitionerClass(TotalOrderPartitioner.class);job. waitForCompletion(true);}}Mapperpackagecom.bm.mapreduce.sort.sam pler;importjava.io.IOException;importorg.apache.hadoop.io.IntWr itable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop. mapreduce.Mapper;/Copyright(C)2018编码界的小菜鸟作者:王守奎2018年10月 26日下午7:08:54/publicclassMyMapperextendsMapperText,IntWritable,Text>{//mapfunctionprotectedvoidmap(IntW ritablekey,Textvalue,Contextcontext)throwsIOException,Int erruptedException{context.write(key,value);}}Reducerpackagecom .bm.mapreduce.sort.sampler;importjava.io.IOException;importorg. apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.Text;imp ortorg.apache.hadoop.mapreduce.Reducer;/Copyright(C)2018 编码界的小菜鸟作者:王守奎2018年10月26日下午7:09:23/publicclassMyReducerextendsReducer{@Overrideprotectedvoidreduce(IntWritablekey,Iterablevalues,Contextcontext)throwsIOException,InterruptedException{context.write(key,values.iterator().next());}}输出结果分析运行命令:hadoopjarbigData.jarcom.bm.mapreduce.sort.sampler.Appdata/mySequenceFile.seqdata/out9分区及采样点自动生成了分区文件,通过26,30,95三个数设置了4个分区文件每次的采样点可能都不同分区文件par为SequenceFile文件2、分区结果集2.1、第一个分区key小于202.2、第二个分区key大于等于20,小于302.3、第三个分区key大于等于30,小于952.4、第四个分区key大于等于95魁魁语录:状态良好,再来一次4A项目组4A项目组江湖一哥版权所有 |
|