配色: 字号:
11-Hadoop Yarn资源调用框架、排序
2022-09-15 | 阅:  转:  |  分享 
  
HadoopYarn资源调用框架、排序2018-03-19版本修改人修改记录修改时间V1.0王守奎编写2018/3/19目录Yarn资源调
用框架4部分排序4全排序5使用一个分区实现全排序5自定义分区实现全排序5使用采样器分区实现全排序11Yarn资源调用框架部分排序在
mr的app中指定reduce个数为两个(job.setNumReduceTasks(2);),reduce会生成两个文件,文件部
分排序,如下图所示:Mapreduce会根据hash算法对key值进行分区,在各自的分区中进行排序。调用默认的HashPartit
ioner,不需要操作,每个reduce聚合的key都是有序的全排序使用一个分区实现全排序使用一个分区默认会对key值排序自定义分
区实现全排序自己根据年份数据,计算key的值,按照key所在区间进行分区需要对key有全面的认知,否则容易出现数据倾斜Apppac
kagecom.bm.mapreduce.sort.all;importorg.apache.hadoop.fs.Path;i
mportorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.m
apreduce.Job;importorg.apache.hadoop.mapreduce.lib.input.FileInp
utFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutput
Format;/Copyright(C)2018编码界的小菜鸟作者:王守奎2018年10月26日下午6:50
:23/publicclassApp{publicstaticvoidmain(String[]args)th
rowsException{if(args.length!=2){System.out.println("Usage:
MaxTemperature");System.exit(-1);}//new
jobJobjob=Job.getInstance();//findjarbyclassnamejob.setJa
rByClass(MyMapper.class);job.setJobName("AllSortApp");//设置reduc
etask的数量job.setNumReduceTasks(3);//设置输入输出文件路径FileInputFormat.add
InputPath(job,newPath(args[0]));FileOutputFormat.setOutputPath(
job,newPath(args[1]));//设置最大最小切片大小,都为128M,结果等同于默认值FileInputForm
at.setMaxInputSplitSize(job,10241024128);FileInputFormat.s
etMinInputSplitSize(job,10241024128);//设置预合并、mapper、reduce
函数类job.setCombinerClass(MyReducer.class);job.setMapperClass(MyMap
per.class);job.setReducerClass(MyReducer.class);//设置输出的key-value
class类型job.setOutputKeyClass(IntWritable.class);job.setOutputValu
eClass(IntWritable.class);//设置分区类job.setPartitionerClass(AllSortP
artitioner.class);//执行job知道完成System.exit(job.waitForCompletion(tr
ue)?0:1);}}Mapperpackagecom.bm.mapreduce.sort.all;importjav
a.io.IOException;importjava.net.InetAddress;importorg.apache.ha
doop.io.IntWritable;importorg.apache.hadoop.io.LongWritable;impo
rtorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.M
apper;/Copyright(C)2018编码界的小菜鸟作者:王守奎2018年10月26日下午6:43:
33/publicclassMyMapperextendsMapperWritable,IntWritable>{//constantprivatestaticfinalintMISSI
NG=9999;//mapfunctionprotectedvoidmap(LongWritablekey,Tex
tvalue,Contextcontext)throwsIOException,InterruptedException
{//perlineStringline=value.toString();//getyearStringyea
r=line.substring(15,19);//aitTempintairTemperature;if(line.
charAt(87)==''+''){airTemperature=Integer.parseInt(line.substr
ing(88,92));}else{airTemperature=Integer.parseInt(line.subst
ring(87,92));}//validairtempdataStringquality=line.substr
ing(92,93);if(airTemperature!=MISSING&&quality.matches("[01
459]")){//mapoutputcontext.write(newIntWritable(Integer.parse
Int(year)),newIntWritable(airTemperature));Stringhostname=In
etAddress.getLocalHost().getHostName();Stringthread=Thread.cur
rentThread().getName()+"@"+Thread.currentThread().hashCode();
context.getCounter("m-1","mapper-map-"+hostname+"-"+thread
).increment(1);}System.out.println(key.get());}}Reducerpackageco
m.bm.mapreduce.sort.all;importjava.io.IOException;importjava.ne
t.InetAddress;importorg.apache.hadoop.io.IntWritable;importorg.
apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduc
e.Reducer.Context;/Copyright(C)2018编码界的小菜鸟作者:王守奎2018年10
月26日下午6:45:24/publicclassMyReducerextendsReducerle,IntWritable,IntWritable,IntWritable>{//reduceprotectedvo
idreduce(IntWritablekey,Iterablevalues,Context
context)throwsIOException,InterruptedException{//maxintmaxV
alue=Integer.MIN_VALUE;Stringhostname=InetAddress.getLocalHo
st().getHostName();Stringthread=Thread.currentThread().getName
()+"@"+Thread.currentThread().hashCode();context.getCounter("
r-1","reducer-reduce-"+hostname+"-"+thread).increment(1);/
/forfor(IntWritablevalue:values){maxValue=Math.max(maxVal
ue,value.get());}//outputcontext.write(key,newIntWritable(max
Value));}}AllSortPartitionerpackagecom.bm.mapreduce.sort.all;imp
ortorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.map
reduce.Partitioner;/Copyright(C)2018编码界的小菜鸟作者:王守奎2018年1
0月26日下午6:46:31/publicclassAllSortPartitionerextendsPartiti
oner{/Getthepartitionnumber
foragivenkey(hencerecord)giventhetotalnumberofparti
tionsi.e.numberofreduce-tasksforthejob.@paramkeythek
eytobepartioned.@paramvaluetheentryvalue.@paramnumP
artitionsthetotalnumberofpartitions.@returnthepartition
numberforthekey./@OverridepublicintgetPartit
ion(IntWritablekey,IntWritablevalue,intnum){intstep=5/
num;int[][]arr=newint[num][2];intcount=step;inti=0;whil
e(true){if(count==0){arr[i]=newint[]{Integer.MIN_VALUE
,1900+count};}else{arr[i]=newint[]{1900+count,1900
+3count};}i++;count=count+step;if(count>=num){break;
}}for(intj=0;jar=key.get();if(year>=row[0]&&yeareturn0;}}运行结果使用采样器分区实现全排序适用于操作SequenceFile(Key-Value)运行com.bm.da
taStruction.SequenceFileStructure.write_2(),生成hdfs://namenode:802
0/user/hadoop/data/mySequenceFile.seq文件。练习的文档内容0tom01
tom12tom2...97tom9798tom9899tom99Apppa
ckagecom.bm.mapreduce.sort.sampler;importorg.apache.hadoop.conf
.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache
.hadoop.io.IntWritable;importorg.apache.hadoop.io.Text;importor
g.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.
lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.
input.SequenceFileInputFormat;importorg.apache.hadoop.mapreduce.
lib.output.FileOutputFormat;importorg.apache.hadoop.mapreduce.li
b.partition.InputSampler;importorg.apache.hadoop.mapreduce.lib.p
artition.InputSampler.RandomSampler;importorg.apache.hadoop.mapr
educe.lib.partition.TotalOrderPartitioner;/Copyright(C)201
8编码界的小菜鸟作者:王守奎2018年10月26日下午7:09:54/publicclassApp{p
ublicstaticvoidmain(String[]args)throwsException{//newjo
bJobjob=Job.getInstance();Configurationconf=job.getConfigur
ation();//findjarbyclassnamejob.setJarByClass(MyMapper.class)
;job.setJobName("AllSortAppwithRandomSampler");//设置输入输出文件路径F
ileInputFormat.addInputPath(job,newPath(args[0]));FileOutputFor
mat.setOutputPath(job,newPath(args[1]));job.setMapperClass(MyMa
pper.class);job.setReducerClass(MyReducer.class);job.setOutputKey
Class(IntWritable.class);job.setOutputValueClass(Text.class);//小
于等于Sampler的切片数,一般等于job.setNumReduceTasks(4);//第一个参数表示key会被选中的概率[
0.0-1.0]//第二个参数是所有选中的切片中获取的采样数量//表示采样点最大数目为,我这里设置10代表我的采样点最大为10,
如果超过10,那么每次有新的采样点生成时//会删除原有的一个采样点,此参数大数据的时候尽量设置多一些//第三个参数是指定最大的
切片数RandomSamplersampler=newInputSampler.Ran
domSampler(0.1,10,4);//设置分区文件,即采样后放在的文件的文件名
,不是完整路径TotalOrderPartitioner.setPartitionFile(conf,newPath("dat
a/sampler/par1/"));//采样器分区必须为SequenceFilejob.setInputFormatClass(
SequenceFileInputFormat.class);//将采样点写入到分区文件中,这个必须要InputSampler.w
ritePartitionFile(job,sampler);//Hadoop内置的名为TotalOrderPartitio
ner的全排序job.setPartitionerClass(TotalOrderPartitioner.class);job.
waitForCompletion(true);}}Mapperpackagecom.bm.mapreduce.sort.sam
pler;importjava.io.IOException;importorg.apache.hadoop.io.IntWr
itable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.
mapreduce.Mapper;/Copyright(C)2018编码界的小菜鸟作者:王守奎2018年10月
26日下午7:08:54/publicclassMyMapperextendsMapperText,IntWritable,Text>{//mapfunctionprotectedvoidmap(IntW
ritablekey,Textvalue,Contextcontext)throwsIOException,Int
erruptedException{context.write(key,value);}}Reducerpackagecom
.bm.mapreduce.sort.sampler;importjava.io.IOException;importorg.
apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.Text;imp
ortorg.apache.hadoop.mapreduce.Reducer;/Copyright(C)2018
编码界的小菜鸟作者:王守奎2018年10月26日下午7:09:23/publicclassMyReducerextendsReducer{@Overrideprotectedvoidreduce(IntWritablekey,Iterablevalues,Contextcontext)throwsIOException,InterruptedException{context.write(key,values.iterator().next());}}输出结果分析运行命令:hadoopjarbigData.jarcom.bm.mapreduce.sort.sampler.Appdata/mySequenceFile.seqdata/out9分区及采样点自动生成了分区文件,通过26,30,95三个数设置了4个分区文件每次的采样点可能都不同分区文件par为SequenceFile文件2、分区结果集2.1、第一个分区key小于202.2、第二个分区key大于等于20,小于302.3、第三个分区key大于等于30,小于952.4、第四个分区key大于等于95魁魁语录:状态良好,再来一次4A项目组4A项目组江湖一哥版权所有
献花(0)
+1
(本文系王守奎的图...原创)