在单机模式下Hadoop不会使用HDFS,也不会开启任何Hadoop守护进程,所有程序将在一个JVM上运行并且最多只允许拥有一个reducer
在Eclipse中新创建一个hadoop-test的Java工程(特别要注意的是Hadoop需要1.6或1.6以上版本的JDK)
在Hadoop的官网http://apache./apache-mirror/hadoop/common/下载hadoop-1.2.1.tar.gz
解压hadoop-1.2.1.tar.gz得到hadoop-1.2.1目录
将hadoop-1.2.1目录下和hadoop-1.2.1\lib目录下的jar包导入到hadoop-test工程中
接下来编写MapReduce程序(该程序用来统计每月收支结余)
Map:
- import java.io.IOException;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapred.MapReduceBase;
- import org.apache.hadoop.mapred.Mapper;
- import org.apache.hadoop.mapred.OutputCollector;
- import org.apache.hadoop.mapred.Reporter;
-
- public class MapBus extends MapReduceBase
- implements Mapper<LongWritable, Text, Text, LongWritable> {
- @Override
- public void map(LongWritable key, Text date,
- OutputCollector<Text, LongWritable> output,
- Reporter reporter) throws IOException {
- //2013-01-11,-200
- String line = date.toString();
- if(line.contains(",")){
- String[] tmp = line.split(",");
- String month = tmp[0].substring(5, 7);
- int money = Integer.valueOf(tmp[1]).intValue();
- output.collect(new Text(month), new LongWritable(money));
- }
- }
- }
Reduce:
- import java.io.IOException;
- import java.util.Iterator;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapred.MapReduceBase;
- import org.apache.hadoop.mapred.OutputCollector;
- import org.apache.hadoop.mapred.Reducer;
- import org.apache.hadoop.mapred.Reporter;
-
- public class ReduceBus extends MapReduceBase
- implements Reducer<Text, LongWritable, Text, LongWritable> {
- @Override
- public void reduce(Text month, Iterator<LongWritable> money,
- OutputCollector<Text, LongWritable> output, Reporter reporter)
- throws IOException {
- int total_money = 0;
- while(money.hasNext()){
- total_money += money.next().get();
- }
- output.collect(month, new LongWritable(total_money));
- }
- }
Main:
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapred.FileInputFormat;
- import org.apache.hadoop.mapred.FileOutputFormat;
- import org.apache.hadoop.mapred.JobClient;
- import org.apache.hadoop.mapred.JobConf;
-
- public class Wallet {
- public static void main(String[] args){
- if(args.length != 2){
- System.err.println("param error!");
- System.exit(-1);
- }
-
- JobConf jobConf = new JobConf(Wallet.class);
- jobConf.setJobName("My Wallet");
-
- FileInputFormat.addInputPath(jobConf, new Path(args[0]));
- FileOutputFormat.setOutputPath(jobConf, new Path(args[1]));
- jobConf.setMapperClass(MapBus.class);
- jobConf.setReducerClass(ReduceBus.class);
- jobConf.setOutputKeyClass(Text.class);
- jobConf.setOutputValueClass(LongWritable.class);
-
- try{
- JobClient.runJob(jobConf);
- }catch(Exception e){
- e.printStackTrace();
- }
- }
- }
还需准备待分析的文件,在E:\cygwin_root\home\input路径下创建2个文件,一个文件名为:2013-01.txt,另一个文件名为:2013-02.txt
2013-01.txt:
- 2013-01-01,100
- 2013-01-02,-100
- 2013-01-07,100
- 2013-01-10,-100
- 2013-01-11,100
- 2013-01-21,-100
- 2013-01-22,100
- 2013-01-25,-100
- 2013-01-27,100
- 2013-01-18,-100
- 2013-01-09,500
2013-02.txt:
设置好运行参数后,就可以通过Run As -> Java Application运行MapReduce程序了
- java.io.IOException: Failed to set permissions of path:
- \tmp\hadoop-linkage\mapred\staging\linkage1150562408\.staging to 0700
报这个错误的主要原因是后期的hadoop版本增加了对文件路径的校验,修改方式比较简单,将hadoop-core-1.2.1.jar替换为hadoop-0.20.2-core.jar即可
下面是MapReduce程序运行时打印的日志
- 14/02/11 10:54:16 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
- 14/02/11 10:54:16 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
- 14/02/11 10:54:16 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
- 14/02/11 10:54:16 INFO mapred.FileInputFormat: Total input paths to process : 2
- 14/02/11 10:54:17 INFO mapred.JobClient: Running job: job_local_0001
- 14/02/11 10:54:17 INFO mapred.FileInputFormat: Total input paths to process : 2
- 14/02/11 10:54:17 INFO mapred.MapTask: numReduceTasks: 1
- 14/02/11 10:54:17 INFO mapred.MapTask: io.sort.mb = 100
- 14/02/11 10:54:17 INFO mapred.MapTask: data buffer = 79691776/99614720
- 14/02/11 10:54:17 INFO mapred.MapTask: record buffer = 262144/327680
- 14/02/11 10:54:17 INFO mapred.MapTask: Starting flush of map output
- 14/02/11 10:54:18 INFO mapred.MapTask: Finished spill 0
- 14/02/11 10:54:18 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
- 14/02/11 10:54:18 INFO mapred.LocalJobRunner: file:/E:/cygwin_root/home/input/2013-01.txt:0+179
- 14/02/11 10:54:18 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000000_0' done.
- 14/02/11 10:54:18 INFO mapred.MapTask: numReduceTasks: 1
- 14/02/11 10:54:18 INFO mapred.MapTask: io.sort.mb = 100
- 14/02/11 10:54:18 INFO mapred.MapTask: data buffer = 79691776/99614720
- 14/02/11 10:54:18 INFO mapred.MapTask: record buffer = 262144/327680
- 14/02/11 10:54:18 INFO mapred.MapTask: Starting flush of map output
- 14/02/11 10:54:18 INFO mapred.MapTask: Finished spill 0
- 14/02/11 10:54:18 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting
- 14/02/11 10:54:18 INFO mapred.LocalJobRunner: file:/E:/cygwin_root/home/input/2013-02.txt:0+16
- 14/02/11 10:54:18 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000001_0' done.
- 14/02/11 10:54:18 INFO mapred.LocalJobRunner:
- 14/02/11 10:54:18 INFO mapred.Merger: Merging 2 sorted segments
- 14/02/11 10:54:18 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 160 bytes
- 14/02/11 10:54:18 INFO mapred.LocalJobRunner:
- 14/02/11 10:54:18 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
- 14/02/11 10:54:18 INFO mapred.LocalJobRunner:
- 14/02/11 10:54:18 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now
- 14/02/11 10:54:18 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to file:/E:/cygwin_root/home/output
- 14/02/11 10:54:18 INFO mapred.LocalJobRunner: reduce > reduce
- 14/02/11 10:54:18 INFO mapred.TaskRunner: Task 'attempt_local_0001_r_000000_0' done.
- 14/02/11 10:54:18 INFO mapred.JobClient: map 100% reduce 100%
- 14/02/11 10:54:18 INFO mapred.JobClient: Job complete: job_local_0001
- 14/02/11 10:54:18 INFO mapred.JobClient: Counters: 13
- 14/02/11 10:54:18 INFO mapred.JobClient: FileSystemCounters
- 14/02/11 10:54:18 INFO mapred.JobClient: FILE_BYTES_READ=39797
- 14/02/11 10:54:18 INFO mapred.JobClient: FILE_BYTES_WRITTEN=80473
- 14/02/11 10:54:18 INFO mapred.JobClient: Map-Reduce Framework
- 14/02/11 10:54:18 INFO mapred.JobClient: Reduce input groups=2
- 14/02/11 10:54:18 INFO mapred.JobClient: Combine output records=0
- 14/02/11 10:54:18 INFO mapred.JobClient: Map input records=12
- 14/02/11 10:54:18 INFO mapred.JobClient: Reduce shuffle bytes=0
- 14/02/11 10:54:18 INFO mapred.JobClient: Reduce output records=2
- 14/02/11 10:54:18 INFO mapred.JobClient: Spilled Records=24
- 14/02/11 10:54:18 INFO mapred.JobClient: Map output bytes=132
- 14/02/11 10:54:18 INFO mapred.JobClient: Map input bytes=195
- 14/02/11 10:54:18 INFO mapred.JobClient: Combine input records=0
- 14/02/11 10:54:18 INFO mapred.JobClient: Map output records=12
- 14/02/11 10:54:18 INFO mapred.JobClient: Reduce input records=12
运行完成后将在E:\cygwin_root\home\output路径下生成2个文件:.part-00000.crc和part-00000。.part-00000.crc为一二进制文件,是一个保存了part-00000文件校验和的内部文件;part-00000文件中保存了最终的统计结果
特别要注意的是每次运行前都需要先将输出路径删掉,否则会报
- org.apache.hadoop.mapred.FileAlreadyExistsException:
- Output directory file:/E:/cygwin_root/home/output already exists
Hadoop做这个校验的目的是为了避免上一次MapReduce程序没有完成时,再次运行MapReduce程序所产生的中间文件会覆盖掉上一次运行产生的中间文件
|