配色: 字号:
Hadoop本地运行模式深入理解
2016-12-09 | 阅:  转:  |  分享 
  
Hadoop本地运行模式深入理解



Hadoop的运行模式分为3种:本地运行模式,伪分布运行模式,集群运行模式,相应概念如下:

1、独立模式即本地运行模式(standalone或localmode)

无需运行任何守护进程(daemon),所有程序都在单个JVM上执行。由于在本机模式下测试和调试MapReduce程序较为方便,因此,这种模式适宜用在开发阶段。

2、伪分布运行模式

伪分布:如果Hadoop对应的Java进程都运行在一个物理机器上,称为伪分布运行模式,如下图所示:



[root@hadoop20dir2]#jps

8993Jps

7409SecondaryNameNode

7142NameNode

7260DataNode

8685NodeManager

8590ResourceManager



3、集群模式

如果Hadoop对应的Java进程运行在多台物理机器上,称为集群模式.[集群就是有主有从],如下图所示:



[root@hadoop11local]#jps

18046NameNode

30927Jps

18225SecondaryNameNode



[root@hadoop22~]#jps

9741ResourceManager

16569Jps

[root@hadoop33~]#jps

12775DataNode

20189Jps

12653NodeManager

[root@hadoop44~]#jps

10111DataNode

17519Jps

9988NodeManager

[root@hadoop55~]#jps

11563NodeManager

11686DataNode

19078Jps

[root@hadoop66~]#jps

10682DataNode

10560NodeManager

18085Jps



注意:伪分布模式就是在一台服务器上面模拟集群环境,但仅仅是机器数量少,其通信机制与运行过程与真正的集群模式是一样的,hadoop的伪分布运行模式可以看做是集群运行模式的特殊情况。

为了方便文章的后续说明,先介绍一下hadoop的体系结构:

这里写图片描述

从Hadoop的体系结构可以看出,HDFS与MapReduce分别是Hadoop的标配文件系统与标配计算框架,但是呢?–我们完全可以选择别的文件系统(如Windows的NTFS,Linux的ext4)与别的计算框架(如Spark、storm等)为Hadoop所服务,这恰恰说明了hadoop的松耦合性。在hadoop的配置文件中,我们是通过core-site.xml这个配置文件指定所用的文件系统的。





fs.defaultFS

hdfs://hadoop11:9000





下面将基于Linux与Windows两种开发环境详细说明hadoop的本地运行模式,其中核心知识点如下:

Hadoop的本地执行模式:

1、在windows的eclipse里面直接运行main方法,就会将job提交给本地执行器localjobrunner执行

—-输入输出数据可以放在本地路径下(c:/wc/srcdata/)

—-输入输出数据也可以放在hdfs中(hdfs://hadoop20:9000/dir)



2、在linux的eclipse里面直接运行main方法,但是不要添加yarn相关的配置,也会提交给localjobrunner执行

—-输入输出数据可以放在本地路径下(/usr/local/)

—-输入输出数据也可以放在hdfs中(hdfs://hadoop20:9000/dir)

首先先基于Linux的开发环境进行介绍:

这里写图片描述

以WordCount程序为例,输入输出文件都放在本地路径下,代码如下:



packageMapReduce;



importjava.io.IOException;





importorg.apache.hadoop.conf.Configuration;

importorg.apache.hadoop.fs.FileSystem;

importorg.apache.hadoop.fs.Path;

importorg.apache.hadoop.io.LongWritable;

importorg.apache.hadoop.io.Text;

importorg.apache.hadoop.mapreduce.Job;

importorg.apache.hadoop.mapreduce.Mapper;

importorg.apache.hadoop.mapreduce.Reducer;

importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;

importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat;

importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

importorg.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

importorg.apache.hadoop.mapreduce.lib.partition.HashPartitioner;









publicclassWordCount

{

publicstaticStringpath1="file:///usr/local/word.txt";//file:///代表本地文件系统中路径的意思

publicstaticStringpath2="file:///usr/local/dir1";

publicstaticvoidmain(String[]args)throwsException

{

Configurationconf=newConfiguration();

FileSystemfileSystem=FileSystem.get(conf);



if(fileSystem.exists(newPath(path2)))

{

fileSystem.delete(newPath(path2),true);

}

Jobjob=Job.getInstance(conf);

job.setJarByClass(WordCount.class);



FileInputFormat.setInputPaths(job,newPath(path1));

job.setInputFormatClass(TextInputFormat.class);

job.setMapperClass(MyMapper.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(LongWritable.class);



job.setNumReduceTasks(1);

job.setPartitionerClass(HashPartitioner.class);





job.setReducerClass(MyReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(LongWritable.class);

job.setOutputFormatClass(TextOutputFormat.class);

FileOutputFormat.setOutputPath(job,newPath(path2));

job.waitForCompletion(true);

}

publicstaticclassMyMapperextendsMapper

{

protectedvoidmap(LongWritablek1,Textv1,Contextcontext)throwsIOException,InterruptedException

{

String[]splited=v1.toString().split("\t");

for(Stringstring:splited)

{

context.write(newText(string),newLongWritable(1L));

}

}

}

publicstaticclassMyReducerextendsReducer

{

protectedvoidreduce(Textk2,Iterablev2s,Contextcontext)throwsIOException,InterruptedException

{

longsum=0L;

for(LongWritablev2:v2s)

{

sum+=v2.get();

}

context.write(k2,newLongWritable(sum));

}

}

}



在程序的运行过程中,相应的java进程如下:



[root@hadoop20local]#jps

7621//对应的是启动的eclipse

9833Jps

9790WordCount//对应的是WordCount程序



下面我们在本地查看运行结果:



[root@hadoop20dir]#pwd

/usr/local/dir1

[root@hadoop20dir1]#morepart-r-00000

hello2

me1

you1



接下来我们将输入路径选择HDFS文件系统中的路径,输出路径还是本地linux文件系统,首先我们在linux上面启动HDFS分布式文件系统。



[root@hadoop20dir]#start-dfs.sh

Startingnamenodeson[hadoop20]

hadoop20:startingnamenode,loggingto/usr/local/hadoop/logs/hadoop-root-namenode-hadoop20.out

hadoop20:startingdatanode,loggingto/usr/local/hadoop/logs/hadoop-root-datanode-hadoop20.out

Startingsecondarynamenodes[0.0.0.0]

0.0.0.0:startingsecondarynamenode,loggingto/usr/local/hadoop/logs/hadoop-root-secondarynamenode-hadoop20.out

[root@hadoop20dir]#jps

10260SecondaryNameNode

7621

10360Jps

9995NameNode

10110DataNode



还是以WordCount程序为例,代码如下:



packageMapReduce;



importjava.io.IOException;





importorg.apache.hadoop.conf.Configuration;

importorg.apache.hadoop.fs.FileSystem;

importorg.apache.hadoop.fs.Path;

importorg.apache.hadoop.io.LongWritable;

importorg.apache.hadoop.io.Text;

importorg.apache.hadoop.mapreduce.Job;

importorg.apache.hadoop.mapreduce.Mapper;

importorg.apache.hadoop.mapreduce.Reducer;

importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;

importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat;

importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

importorg.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

importorg.apache.hadoop.mapreduce.lib.partition.HashPartitioner;









publicclassWordCount

{

publicstaticStringpath1="hdfs://hadoop90:2000/word.txt";//读取HDFS中的测试集

publicstaticStringpath2="file:///usr/local/dir2";//输出数据输出到本地文件系统中

publicstaticvoidmain(String[]args)throwsException

{

Configurationconf=newConfiguration();

FileSystemfileSystem=FileSystem.get(conf);//默认获取的是本地文件系统的FileSystem实例(在这里就是linux文件系统的实例)



if(fileSystem.exists(newPath(path2)))

{

fileSystem.delete(newPath(path2),true);

}

Jobjob=Job.getInstance(conf);

job.setJarByClass(WordCount.class);



FileInputFormat.setInputPaths(job,newPath(path1));

job.setInputFormatClass(TextInputFormat.class);

job.setMapperClass(MyMapper.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(LongWritable.class);



job.setNumReduceTasks(1);

job.setPartitionerClass(HashPartitioner.class);





job.setReducerClass(MyReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(LongWritable.class);

job.setOutputFormatClass(TextOutputFormat.class);

FileOutputFormat.setOutputPath(job,newPath(path2));

job.waitForCompletion(true);

}

publicstaticclassMyMapperextendsMapper

{

protectedvoidmap(LongWritablek1,Textv1,Contextcontext)throwsIOException,InterruptedException

{

String[]splited=v1.toString().split("\t");

for(Stringstring:splited)

{

context.write(newText(string),newLongWritable(1L));

}

}

}

publicstaticclassMyReducerextendsReducer

{

protectedvoidreduce(Textk2,Iterablev2s,Contextcontext)throwsIOException,InterruptedException

{

longsum=0L;

for(LongWritablev2:v2s)

{

sum+=v2.get();

}

context.write(k2,newLongWritable(sum));

}

}

}



运行结果如下:



[root@hadoop20dir2]#morepart-r-00000

hello2

me1

you1

[root@hadoop20dir2]#pwd

/usr/local/dir2



接下来我们将输入输出路径都换成HDFS中的路径:

代码如下:



packageMapReduce;



importjava.io.IOException;



importorg.apache.hadoop.conf.Configuration;

importorg.apache.hadoop.fs.FileSystem;

importorg.apache.hadoop.fs.Path;

importorg.apache.hadoop.io.LongWritable;

importorg.apache.hadoop.io.Text;

importorg.apache.hadoop.mapreduce.Job;

importorg.apache.hadoop.mapreduce.Mapper;

importorg.apache.hadoop.mapreduce.Reducer;

importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;

importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat;

importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

importorg.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

importorg.apache.hadoop.mapreduce.lib.partition.HashPartitioner;





publicclassWordCount

{

publicstaticStringpath1="hdfs://hadoop20:9000/word.txt";//读取HDFS中的测试集

publicstaticStringpath2="hdfs://hadoop20:9000/dir3";

publicstaticvoidmain(String[]args)throwsException

{

Configurationconf=newConfiguration();

FileSystemfileSystem=FileSystem.get(conf);



if(fileSystem.exists(newPath(path2)))

{

fileSystem.delete(newPath(path2),true);

}

Jobjob=Job.getInstance(conf);

job.setJarByClass(WordCount.class);



FileInputFormat.setInputPaths(job,newPath(path1));

job.setInputFormatClass(TextInputFormat.class);

job.setMapperClass(MyMapper.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(LongWritable.class);



job.setNumReduceTasks(1);

job.setPartitionerClass(HashPartitioner.class);





job.setReducerClass(MyReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(LongWritable.class);

job.setOutputFormatClass(TextOutputFormat.class);

FileOutputFormat.setOutputPath(job,newPath(path2));

job.waitForCompletion(true);

}

publicstaticclassMyMapperextendsMapper

{

protectedvoidmap(LongWritablek1,Textv1,Contextcontext)throwsIOException,InterruptedException

{

String[]splited=v1.toString().split("\t");

for(Stringstring:splited)

{

context.write(newText(string),newLongWritable(1L));

}

}

}

publicstaticclassMyReducerextendsReducer

{

protectedvoidreduce(Textk2,Iterablev2s,Contextcontext)throwsIOException,InterruptedException

{

longsum=0L;

for(LongWritablev2:v2s)

{

sum+=v2.get();

}

context.write(k2,newLongWritable(sum));

}

}

}



程序抛出异常:

这里写图片描述

处理措施:



Configurationconf=newConfiguration();

conf.set("fs.defaultFS","hdfs://hadoop20:9000/");//加入此行代码,表示获取HDFS中的FileSystem实例,而不在是默认linux文件系统的FileSystem实例



查看运行结果:



[root@hadoop20hadoop]#hadoopfs-cat/dir3/part-r-00000

hello2

me1

you1



好了,从上面的3个例子可以看出,在Linux这种开发环境下,Hadoop的本地运行模式是很简单的,不用配置任何文件,但是在Windows开发环境下,我们却需要配置很多文件。

在这里先说明一下,因为我的电脑是64位,所以我在windows上面安装的jdk1.7、eclipse、hadoop2.4.1都是64位的,下载链接如下:

http://blog.csdn.net/a2011480169/article/details/51814212

在Windows开发环境中实现Hadoop的本地运行模式,详细步骤如下:

1、在本地安装好jdk、hadoop2.4.1,并配置好环境变量:JAVA_HOME、HADOOP_HOME、Path路径(配置好环境变量后最好重启电脑)。

这里写图片描述

这里写图片描述

这里写图片描述

这里写图片描述

2、用hadoop-common-2.2.0-bin-master的bin目录替换本地hadoop2.4.1的bin目录,因为hadoop2.0版本中没有hadoop.dll和winutils.exe这两个文件。

hadoop-common-2.2.0-bin-master的下载链接如下:

http://blog.csdn.net/a2011480169/article/details/51814212

如果缺少hadoop.dll和winutils.exe话,程序将会抛出下面异常:



java.io.IOException:CouldnotlocateexecutableD:\hadoop-2.4.1\bin\winutils.exeintheHadoopbinaries.



java.lang.Exception:java.lang.NullPointerException



所以用hadoop-common-2.2.0-bin-master的bin目录替换本地hadoop2.4.1的bin目录是必要的一个步骤。

注意:如果只是将hadoop-common-2.2.0-bin-master的bin目录中的hadoop.dll和winutils.exe这两个文件添加到hadoop2.4.1的bin目录中,也是可行的,但最好用用hadoop-common-2.2.0-bin-master的bin目录替换本地hadoop2.4.1的bin目录。

上面这两个步骤完成之后我们就可以跑程序了,从而实现Hadoop的本地运行模式:

首先输入输出路径都选择windows的文件系统:

代码如下:



packageMapReduce;



importjava.io.IOException;





importorg.apache.hadoop.conf.Configuration;

importorg.apache.hadoop.fs.FileSystem;

importorg.apache.hadoop.fs.Path;

importorg.apache.hadoop.io.LongWritable;

importorg.apachewww.baiyuewang.net.hadoop.io.Text;

importorg.apache.hadoop.mapreduce.Job;

importorg.apache.hadoop.mapreduce.Mapper;

importorg.apache.hadoop.mapreduce.Reducer;

importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;

importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat;

importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

importorg.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

importorg.apache.hadoop.mapreduce.lib.partition.HashPartitioner;





publicclassWordCount

{

publicstaticStringpath1="file:///C:\\word.txt";//读取本地windows文件系统中的数据

publicstaticStringpath2="file:///D:\\dir";

publicstaticvoidmain(String[]args)throwsException

{

Configurationconf=newConfiguration();

FileSystemfileSystem=FileSystem.get(conf);



if(fileSystem.exists(newPath(path2)))

{

fileSystem.delete(newPath(path2),true);

}

Jobjob=Job.getInstance(conf);

job.setJarByClass(WordCount.class);



FileInputFormat.setInputPaths(job,newPath(path1));

job.setInputFormatClass(TextInputFormat.class);

job.setMapperClass(MyMapper.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(LongWritable.class);



job.setNumReduceTasks(1);

job.setPartitionerClass(HashPartitioner.class);





job.setReducerClass(MyReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(LongWritable.class);

job.setOutputFormatClass(TextOutputFormat.class);

FileOutputFormat.setOutputPath(job,newPath(path2));

job.waitForCompletion(true);

}

publicstaticclassMyMapperextendsMapper

{

protectedvoidmap(LongWritablek1,Textv1,Contextcontext)throwsIOException,InterruptedException

{

String[]splited=v1.toString().split("\t");

for(Stringstring:splited)

{

context.write(newText(string),newLongWritable(1L));

}

}

}

publicstaticclassMyReducerextendsReducer

{

protectedvoidreduce(Textk2,Iterablev2s,Contextcontext)throwsIOException,InterruptedException

{

longsum=0L;

for(LongWritablev2:v2s)

{

sum+=v2.get();

}

context.write(k2,newLongWritable(sum));

}

}

}



在dos下查看运行中的java进程:

这里写图片描述

其中28568为windows中启动的eclipse进程。

接下来我们查看运行结果:

这里写图片描述

part-r-00000中的内容如下:



hello2

me1

you1



接下来输入路径选择windows本地,输出路径换成HDFS文件系统,代码如下:



packageMapReduce;



importjava.io.IOException;





importorg.apache.hadoop.conf.Configuration;

importorg.apache.hadoop.fs.FileSystem;

importorg.apache.hadoop.fs.Path;

importorg.apache.hadoop.io.LongWritable;

importorg.apache.hadoop.io.Text;

importorg.apache.hadoop.mapreduce.Job;

importorg.apache.hadoop.mapreduce.Mapper;

importorg.apache.hadoop.mapreduce.Reducer;

importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;

importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat;

importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

importorg.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

importorg.apache.hadoop.mapreduce.lib.partition.HashPartitioner;







publicclassWordCount

{

publicstaticStringpath1="file:///C:\\word.txt";//读取windows文件系统中的数据

publicstaticStringpath2="hdfs://hadoop20:9000/dir";//输出到hdfs中

publicstaticvoidmain(String[]args)throwsException

{

Configurationconf=newConfiguration();

FileSystemfileSystem=FileSystem.get(conf);

if(fileSystem.exists(newPath(path2)))

{

fileSystem.delete(newPath(path2),true);

}

Jobjob=Job.getInstance(conf);

job.setJarByClass(WordCount.class);



FileInputFormat.setInputPaths(job,newPath(path1));

job.setInputFormatClass(TextInputFormat.class);

job.setMapperClass(MyMapper.class);

job.setMapOutputwww.wang027.comKeyClass(Text.class);

job.setMapOutputValueClass(LongWritable.class);



job.setNumReduceTasks(1);

job.setPartitionerClass(HashPartitioner.class);





job.setReducerClass(MyReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(LongWritable.class);

job.setOutputFormatClass(TextOutputFormat.class);

FileOutputFormat.setOutputPath(job,newPath(path2));

job.waitForCompletion(true);

}

publicstaticclassMyMapperextendsMapper

{

protectedvoidmap(LongWritablek1,Textv1,Contextcontext)throwsIOException,InterruptedException

{

String[]splited=v1.toString().split("\t");

for(Stringstring:splited)

{

context.write(newText(string),newLongWritable(1L));

}

}

}

publicstaticclassMyReducerextendsReducer

{

protectedvoidreduce(Textk2,Iterablev2s,Contextcontext)throwsIOException,InterruptedException

{

longsum=0L;

for(LongWritablev2:v2s)

{

sum+=v2.get();

}

context.write(k2,newLongWritable(sum));

}

}

}



程序抛出异常:

这里写图片描述

处理措施同上:



Configurationconf=newConfiguration();

conf.set("fs.defaultFS","hdfs://hadoop20:9000/");

FileSystemfileSystem=FileSystem.get(conf);//获取HDFS中的FileSystem实例



查看运行结果:



[root@hadoop20dir4]#hadoopfs-cat/dir/part-r-00000

hello2

me1

you1



好的,到这里hadoop的本地文件系统就讲述完了,注意一下几点:

1、file:\\代表本地文件系统,hdfs://代表hdfs分布式文件系统

2、linux下的hadoop本地运行模式很简单,但是windows下的hadoop本地运行模式需要配置相应文件。

3、MapReduce所用的文件放在哪里是没有关系的(可以放在Windows本地文件系统、可以放在Linux本地文件系统、也可以放在HDFS分布式文件系统中),最后是通过FileSystem这个实例来获取文件的。

献花(0)
+1
(本文系thedust79首藏)