配色: 字号:
一个简单的MapReduce程序(非WordCount)
2014-08-03 | 阅:  转:  |  分享 
  
经过几天的折腾,终于配置好了?HYPERLINK"http://www.wypblog.com/archives/tag/hadoop"\o"查看Hadoop中的全部文章"\t"_blank"Hadoop?2.2.0(如何配置在Linux平台部署HYPERLINK"http://www.wypblog.com/archives/tag/hadoop"\o"查看Hadoop中的全部文章"\t"_blank"Hadoop?请参见本博客?HYPERLINK"http://www.wypblog.com/archives/790"\o"在Fedora上部署Hadoop2.2.0伪分布式平台"\t"_blank"《在Fedora上部署Hadoop2.2.0伪分布式平台》?),今天主要来说说怎么在Hadoop2.2.0伪分布式上面运行我们写好的?HYPERLINK"http://www.wypblog.com/archives/tag/mapreduce"\o"查看Mapreduce中的全部文章"\t"_blank"Mapreduce?程序。先给出这个程序所依赖的Maven包:


org.apache.hadoop
hadoop-mapreduce-client-core
2.1.1-beta


org.apache.hadoop
hadoop-common
2.1.1-beta


org.apache.hadoop
hadoop-mapreduce-client-common
2.1.1-beta


org.apache.hadoop
hadoop-mapreduce-client-jobclient
2.1.1-beta


好了,现在给出程序,代码如下:
packagecom.wyp.hadoop;

importorg.apache.hadoop.io.IntWritable;
importorg.apache.hadoop.io.LongWritable;
importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.mapred.;

importjava.io.IOException;

/
User:wyp
Date:13-10-25
Time:下午3:26
Email:wyphao.2007@163.com
/
publicclassMaxTemperatureMapperextendsMapReduceBase
implementsMapperText,IntWritable>{
privatestaticfinalintMISSING=9999;

@Override
publicvoidmap(LongWritablekey,Textvalue,
OutputCollectoroutput,
Reporterreporter)throwsIOException{

Stringline=value.toString();
Stringyear=line.substring(15,19);
intairTemperature;
if(line.charAt(87)==''+''){
airTemperature=Integer.parseInt(line.substring(88,92));
}else{
airTemperature=Integer.parseInt(line.substring(87,92));
}

Stringquality=line.substring(92,93);
if(airTemperature!=MISSING&&quality.matches("[01459]")){
output.collect(newText(year),newIntWritable(airTemperature));
}
}
}

packagecom.wyp.hadoop;

importorg.apache.hadoop.io.IntWritable;
importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.mapred.MapReduceBase;
importorg.apache.hadoop.mapred.OutputCollector;
importorg.apache.hadoop.mapred.Reducer;
importorg.apache.hadoop.mapred.Reporter;

importjava.io.IOException;
importjava.util.Iterator;

/
User:wyp
Date:13-10-25
Time:下午3:36
Email:wyphao.2007@163.com
/
publicclassMaxTemperatureReducerextendsMapReduceBase
implementsReducerText,IntWritable>{
@Override
publicvoidreduce(Textkey,Iteratorvalues,
OutputCollectoroutput,
Reporterreporter)throwsIOException{
intmaxValue=Integer.MIN_VALUE;
while(values.hasNext()){
maxValue=Math.max(maxValue,values.next().get());
}

output.collect(key,newIntWritable(maxValue));
}
}

packagecom.wyp.hadoop;

importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.io.IntWritable;
importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.mapred.FileInputFormat;
importorg.apache.hadoop.mapred.FileOutputFormat;
importorg.apache.hadoop.mapred.JobClient;
importorg.apache.hadoop.mapred.JobConf;

importjava.io.IOException;

/
User:wyp
Date:13-10-25
Time:下午3:40
Email:wyphao.2007@163.com
/
publicclassMaxTemperature{

publicstaticvoidmain(String[]args)throwsIOException{
if(args.length!=2){
System.err.println("Error!");
System.exit(1);
}

JobConfconf=newJobConf(MaxTemperature.class);
conf.setJobName("MaxTemperature");

FileInputFormat.addInputPath(conf,newPath(args[0]));
FileOutputFormat.setOutputPath(conf,newPath(args[1]));
conf.setMapperClass(MaxTemperatureMapper.class);
conf.setReducerClass(MaxTemperatureReducer.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);

JobClient.runJob(conf);

}
}
将上面的程序编译和打包成jar文件,然后开始在Hadoop2.2.0(本文假定用户都部署好了Hadoop2.2.0)上面部署了。下面主要讲讲如何去部署:?首先,启动Hadoop2.2.0,命令如下:
[wyp@wyphadoop]$sbin/start-dfs.sh
[wyp@wyphadoop]$sbin/start-yarn.sh
如果你想看看Hadoop2.2.0是否运行成功,运行下面的命令去查看
[wyp@wyphadoop]$jps
9582Main
9684RemoteMavenServer
16082Jps
7011DataNode
7412ResourceManager
7528NodeManager
7222SecondaryNameNode
6832NameNode
其中jps是jdk自带的一个命令,在jdk/bin目录下。如果你电脑上面出现了以上的几个进程(NameNode、SecondaryNameNode、NodeManager、ResourceManager、DataNode这五个进程必须出现!)说明你的Hadoop服务器启动成功了!现在来运行上面打包好的jar文件(这里为Hadoop.jar,其中/home/wyp/IdeaProjects/Hadoop/out/artifacts/Hadoop_jar/Hadoop.jar是它的绝对路径,不知道绝对路径是什么?那你好好去学学吧!),运行下面的命令:
[wyp@wypHadoop_jar]$/home/wyp/Downloads/hadoop/bin/hadoopjar\
/home/wyp/IdeaProjects/Hadoop/out/artifacts/Hadoop_jar/Hadoop.jar\
com/wyp/hadoop/MaxTemperature\
/user/wyp/data.txt\
/user/wyp/result
(上面是一条命令,由于太长了,所以我分行写,在实际情况中,请写一行!)其中,/home/wyp/Downloads/hadoop/bin/hadoop是hadoop的绝对路径,如果你在环境变量中配置好hadoop命令的路径就不需要这样写;com/wyp/hadoop/MaxTemperature是上面程序的main函数的入口;/user/wyp/data.txt是Hadoop文件系统(HDFS)中的绝对路径(注意:这里不是你Linux系统中的绝对路径!),为需要分析文件的路径(也就是input);/user/wyp/result是分析结果输出的绝对路径(注意:这里不是你Linux系统中的绝对路径!而是HDFS上面的路径!而且/user/wyp/result一定不能存在,否则会抛出异常!这是Hadoop的保护机制,你总不想你以前运行好几天的程序突然被你不小心给覆盖掉了吧?所以,如果/user/wyp/result存在,程序会抛出异常,很不错啊)。好了。输入上面的命令,应该会得到下面类似的输出:
13/10/2815:20:44INFOclient.RMProxy:ConnectingtoResourceManagerat/0.0.0.0:8032
13/10/2815:20:44INFOclient.RMProxy:ConnectingtoResourceManagerat/0.0.0.0:8032
13/10/2815:20:45WARNmapreduce.JobSubmitter:Hadoopcommand-lineoptionparsingnotperformed.ImplementtheToolinterfaceandexecuteyourapplicationwithToolRunnertoremedythis.
13/10/2815:20:45WARNmapreduce.JobSubmitter:Nojobjarfileset.Userclassesmaynotbefound.SeeJoborJob#setJar(String).
13/10/2815:20:45INFOmapred.FileInputFormat:Totalinputpathstoprocess:1
13/10/2815:20:46INFOmapreduce.JobSubmitter:numberofsplits:2
13/10/2815:20:46INFOConfiguration.deprecation:user.nameisdeprecated.Instead,usemapreduce.job.user.name
13/10/2815:20:46INFOConfiguration.deprecation:mapred.output.value.classisdeprecated.Instead,usemapreduce.job.output.value.class
13/10/2815:20:46INFOConfiguration.deprecation:mapred.job.nameisdeprecated.Instead,usemapreduce.job.name
13/10/2815:20:46INFOConfiguration.deprecation:mapred.input.dirisdeprecated.Instead,usemapreduce.input.fileinputformat.inputdir
13/10/2815:20:46INFOConfiguration.deprecation:mapred.output.dirisdeprecated.Instead,usemapreduce.output.fileoutputformat.outputdir
13/10/2815:20:46INFOConfiguration.deprecation:mapred.map.tasksisdeprecated.Instead,usemapreduce.job.maps
13/10/2815:20:46INFOConfiguration.deprecation:mapred.output.key.classisdeprecated.Instead,usemapreduce.job.output.key.class
13/10/2815:20:46INFOConfiguration.deprecation:mapred.working.dirisdeprecated.Instead,usemapreduce.job.working.dir
13/10/2815:20:46INFOmapreduce.JobSubmitter:Submittingtokensforjob:job_1382942307976_0008
13/10/2815:20:47INFOmapred.YARNRunner:Jobjarisnotpresent.Notaddinganyjartothelistofresources.
13/10/2815:20:49INFOimpl.YarnClientImpl:Submittedapplicationapplication_1382942307976_0008toResourceManagerat/0.0.0.0:8032
13/10/2815:20:49INFOmapreduce.Job:Theurltotrackthejob:http://wyp:8088/proxy/application_1382942307976_0008/
13/10/2815:20:49INFOmapreduce.Job:Runningjob:job_1382942307976_0008
13/10/2815:20:59INFOmapreduce.Job:Jobjob_1382942307976_0008runninginubermode:false
13/10/2815:20:59INFOmapreduce.Job:map0%reduce0%
13/10/2815:21:35INFOmapreduce.Job:map100%reduce0%
13/10/2815:21:38INFOmapreduce.Job:map0%reduce0%
13/10/2815:21:38INFOmapreduce.Job:TaskId:attempt_1382942307976_0008_m_000000_0,Status:FAILED
Error:java.lang.RuntimeException:Errorinconfiguringobject
atorg.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109)
atorg.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75)
atorg.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
atorg.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:425)
atorg.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
atorg.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
atjava.security.AccessController.doPrivileged(NativeMethod)
atjavax.security.auth.Subject.doAs(Subject.java:415)
atorg.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
atorg.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
Causedby:java.lang.reflect.InvocationTargetException
atsun.reflect.NativeMethodAccessorImpl.invoke0(NativeMethod)
atsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
atsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
atjava.lang.reflect.Method.invoke(Method.java:606)
atorg.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106)
...9more
Causedby:java.lang.RuntimeException:java.lang.RuntimeException:java.lang.ClassNotFoundException:Classcom.wyp.hadoop.MaxTemperatureMapper1notfound
atorg.apache.hadoop.conf.Configuration.getClass(Configuration.java:1752)
atorg.apache.hadoop.mapred.JobConf.getMapperClass(JobConf.java:1058)
atorg.apache.hadoop.mapred.MapRunner.configure(MapRunner.java:38)
...14more
Causedby:java.lang.RuntimeException:java.lang.ClassNotFoundException:Classcom.wyp.hadoop.MaxTemperatureMapper1notfound
atorg.apache.hadoop.conf.Configuration.getClass(Configuration.java:1720)
atorg.apache.hadoop.conf.Configuration.getClass(Configuration.java:1744)
...16more
Causedby:java.lang.ClassNotFoundException:Classcom.wyp.hadoop.MaxTemperatureMapper1notfound
atorg.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1626)
atorg.apache.hadoop.conf.Configuration.getClass(Configuration.java:1718)
...17more

ContainerkilledbytheApplicationMaster.
Containerkilledonrequest.Exitcodeis143
程序居然抛出异常(ClassNotFoundException)!这是什么回事?其实我也不太明白!!
在网上Google了一下,找到别人的观点:?经个人总结,这通常是由于以下几种原因造成的:?(1)你编写了一个javalib,封装成了jar,然后再写了一个Hadoop程序,调用这个jar完成mapper和reducer的编写?(2)你编写了一个Hadoop程序,期间调用了一个第三方javalib。?之后,你将自己的jar包或者第三方java包分发到各个TaskTracker的HADOOP_HOME目录下,运行你的JAVA程序,报了以上错误。
那怎么解决呢?一个笨重的方法是,在运行Hadoop作业的时候,先运行下面的命令:
[wyp@wypHadoop_jar]$export\
HADOOP_CLASSPATH=/home/wyp/IdeaProjects/Hadoop/out/artifacts/Hadoop_jar/
其中,/home/wyp/IdeaProjects/Hadoop/out/artifacts/Hadoop_jar/是上面Hadoop.jar文件所在的目录。好了,现在再运行一下Hadoop作业命令:
[wyp@wypHadoop_jar]$hadoopjar/home/wyp/IdeaProjects/Hadoop/out/artifacts/Hadoop_jar/Hadoop.jarcom/wyp/hadoop/MaxTemperature/user/wyp/data.txt/user/wyp/result
13/10/2815:34:16INFOclient.RMProxy:ConnectingtoResourceManagerat/0.0.0.0:8032
13/10/2815:34:16INFOclient.RMProxy:ConnectingtoResourceManagerat/0.0.0.0:8032
13/10/2815:34:17WARNmapreduce.JobSubmitter:Hadoopcommand-lineoptionparsingnotperformed.ImplementtheToolinterfaceandexecuteyourapplicationwithToolRunnertoremedythis.
13/10/2815:34:17INFOmapred.FileInputFormat:Totalinputpathstoprocess:1
13/10/2815:34:17INFOmapreduce.JobSubmitter:numberofsplits:2
13/10/2815:34:17INFOConfiguration.deprecation:user.nameisdeprecated.Instead,usemapreduce.job.user.name
13/10/2815:34:17INFOConfiguration.deprecation:mapred.jarisdeprecated.Instead,usemapreduce.job.jar
13/10/2815:34:17INFOConfiguration.deprecation:mapred.output.value.classisdeprecated.Instead,usemapreduce.job.output.value.class
13/10/2815:34:17INFOConfiguration.deprecation:mapred.job.nameisdeprecated.Instead,usemapreduce.job.name
13/10/2815:34:17INFOConfiguration.deprecation:mapred.input.dirisdeprecated.Instead,usemapreduce.input.fileinputformat.inputdir
13/10/2815:34:17INFOConfiguration.deprecation:mapred.output.dirisdeprecated.Instead,usemapreduce.output.fileoutputformat.outputdir
13/10/2815:34:17INFOConfiguration.deprecation:mapred.map.tasksisdeprecated.Instead,usemapreduce.job.maps
13/10/2815:34:17INFOConfiguration.deprecation:mapred.output.key.classisdeprecated.Instead,usemapreduce.job.output.key.class
13/10/2815:34:17INFOConfiguration.deprecation:mapred.working.dirisdeprecated.Instead,usemapreduce.job.working.dir
13/10/2815:34:18INFOmapreduce.JobSubmitter:Submittingtokensforjob:job_1382942307976_0009
13/10/2815:34:18INFOimpl.YarnClientImpl:Submittedapplicationapplication_1382942307976_0009toResourceManagerat/0.0.0.0:8032
13/10/2815:34:18INFOmapreduce.Job:Theurltotrackthejob:http://wyp:8088/proxy/application_1382942307976_0009/
13/10/2815:34:18INFOmapreduce.Job:Runningjob:job_1382942307976_0009
13/10/2815:34:26INFOmapreduce.Job:Jobjob_1382942307976_0009runninginubermode:false
13/10/2815:34:26INFOmapreduce.Job:map0%reduce0%
13/10/2815:34:41INFOmapreduce.Job:map50%reduce0%
13/10/2815:34:53INFOmapreduce.Job:map100%reduce0%
13/10/2815:35:17INFOmapreduce.Job:map100%reduce100%
13/10/2815:35:18INFOmapreduce.Job:Jobjob_1382942307976_0009completedsuccessfully
13/10/2815:35:18INFOmapreduce.Job:Counters:43
FileSystemCounters
FILE:Numberofbytesread=144425
FILE:Numberofbyteswritten=524725
FILE:Numberofreadoperations=0
FILE:Numberoflargereadoperations=0
FILE:Numberofwriteoperations=0
HDFS:Numberofbytesread=1777598
HDFS:Numberofbyteswritten=18
HDFS:Numberofreadoperations=9
HDFS:Numberoflargereadoperations=0
HDFS:Numberofwriteoperations=2
JobCounters
Launchedmaptasks=2
Launchedreducetasks=1
Data-localmaptasks=2
Totaltimespentbyallmapsinoccupiedslots(ms)=38057
Totaltimespentbyallreducesinoccupiedslots(ms)=24800
Map-ReduceFramework
Mapinputrecords=13130
Mapoutputrecords=13129
Mapoutputbytes=118161
Mapoutputmaterializedbytes=144431
Inputsplitbytes=182
Combineinputrecords=0
Combineoutputrecords=0
Reduceinputgroups=2
Reduceshufflebytes=144431
Reduceinputrecords=13129
Reduceoutputrecords=2
SpilledRecords=26258
ShuffledMaps=2
FailedShuffles=0
MergedMapoutputs=2
GCtimeelapsed(ms)=321
CPUtimespent(ms)=5110
Physicalmemory(bytes)snapshot=552824832
Virtualmemory(bytes)snapshot=1228738560
Totalcommittedheapusage(bytes)=459800576
ShuffleErrors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
FileInputFormatCounters
BytesRead=1777416
FileOutputFormatCounters
BytesWritten=18
到这里,程序就成功运行了!很高兴吧?那么怎么查看刚刚程序运行的结果呢?很简单,运行下面命令:
[wyp@wypHadoop_jar]$hadoopfs-ls/user/wyp
Found2items
-rw-r--r--1wypsupergroup17771682013-10-2517:44/user/wyp/data.txt
drwxr-xr-x-wypsupergroup02013-10-2815:35/user/wyp/result
[wyp@wypHadoop_jar]$hadoopfs-ls/user/wyp/result
Found2items
-rw-r--r--1wypsupergroup02013-10-2815:35/user/wyp/result/_SUCCESS
-rw-r--r--1wypsupergroup182013-10-2815:35/user/wyp/result/part-00000
[wyp@wypHadoop_jar]$hadoopfs-cat/user/wyp/result/part-00000
1901 317
1902 244
到此,你自己写好的一个?HYPERLINK"http://www.wypblog.com/archives/tag/mapreduce"\o"查看Mapreduce中的全部文章"\t"_blank"Mapreduce?程序终于成功运行了!?附程序测试的数据的下载地址:http://pan.baidu.com/s/1iSacM
献花(0)
+1
(本文系yangshiquan...首藏)