分享

基于HIVE文件格式的map reduce代码编写

 犇馆长 2018-12-28

我们的数据绝大多数都是在HIVE上,对HIVE的SEQUENCEFILE和RCFILE的存储格式都有利用,为了满足HIVE的数据开放,hive client的方式就比较单一,直接访问HIVE生成的HDFS数据也是一种必要途径,所以本文整理测试了如何编写基于TEXTFILE、SEQUENCEFILE、RCFILE的数据的map reduce的代码。以wordcount的逻辑展示3种MR的代码。


其实只要知道MAP的输入格式是什么,就知道如何在MAP中处理数据;只要知道REDUCE(也可能只有MAP)的输出格式,就知道如何把处理结果转成输出格式。

表1:

 
如下代码片段是运行一个MR的最简单的配置:定义job、配置job、运行job

Java代码  
  1. //map/reduce的job配置类,向hadoop框架描述map-reduce执行的工作   
  2. JobConf conf = new JobConf(WordCountRC.class);  
  3. //设置一个用户定义的job名称  
  4. conf.setJobName("WordCountRC");  
  5.   
  6. //为job的输出数据设置Key类  
  7. conf.setOutputKeyClass(Text.class);  
  8. //为job输出设置value类   
  9. conf.setOutputValueClass(IntWritable.class);  
  10.   
  11. //为job设置Mapper类  
  12. conf.setMapperClass(MapClass.class);  
  13. //为job设置Combiner类  
  14. conf.setCombinerClass(Reduce.class);  
  15. //为job设置Reduce类  
  16. conf.setReducerClass(Reduce.class);  
  17.   
  18. //为map-reduce任务设置InputFormat实现类  
  19. conf.setInputFormat(RCFileInputFormat.class);  
  20. //为map-reduce任务设置OutputFormat实现类  
  21. conf.setOutputFormat(TextOutputFormat.class);  
  22.   
  23. //为map-reduce job设置路径数组作为输入列表  
  24. FileInputFormat.setInputPaths(conf, new Path(args[0]));  
  25. //为map-reduce job设置路径数组作为输出列表  
  26. FileOutputFormat.setOutputPath(conf, new Path(args[1]));  
  27.   
  28. //运行一个job  
  29. JobClient.runJob(conf);  



而此刻,我们更多的是关注配置InputFormat和OutputFormat的setInputFormat和setOutputFormat。根据我们不同的输入输出做相应的配置,可以选择表1的任何格式。
当我们确定了输入输出格式,接下来就是来在实现map和reduce函数时首选对输入格式做相应的处理,然后处理具体的业务逻辑,最后把处理后的数据转成既定的输出格式。

 

如下是处理textfile、sequencefile、rcfile输入文件的wordcount代码,大家可以比较一下具体区别,应该就能处理更多其它输入文件或者输出文件格式的数据。
代码1:textfile版wordcount

Java代码  
  1. import java.io.IOException;  
  2. import java.util.Iterator;  
  3. import java.util.StringTokenizer;  
  4.   
  5. import org.apache.hadoop.fs.Path;  
  6. import org.apache.hadoop.io.IntWritable;  
  7. import org.apache.hadoop.io.LongWritable;  
  8. import org.apache.hadoop.io.Text;  
  9. import org.apache.hadoop.mapred.FileInputFormat;  
  10. import org.apache.hadoop.mapred.FileOutputFormat;  
  11. import org.apache.hadoop.mapred.JobClient;  
  12. import org.apache.hadoop.mapred.JobConf;  
  13. import org.apache.hadoop.mapred.MapReduceBase;  
  14. import org.apache.hadoop.mapred.Mapper;  
  15. import org.apache.hadoop.mapred.OutputCollector;  
  16. import org.apache.hadoop.mapred.Reducer;  
  17. import org.apache.hadoop.mapred.Reporter;  
  18.   
  19.   
  20. public class WordCountTxt{  
  21.    
  22.   public static class MapClass extends MapReduceBase  
  23.     implements Mapper<LongWritable, Text, Text, IntWritable> {  
  24.      
  25.     private final static IntWritable one = new IntWritable(1);  
  26.     private Text word = new Text();  
  27.      
  28.        @Override  
  29.        public void map(LongWritable key, Text value,  
  30.                      OutputCollector<Text, IntWritable> output,  
  31.             Reporter reporter) throws IOException {  
  32.               String line = value.toString();  
  33.               StringTokenizer itr = new StringTokenizer(line);  
  34.               while (itr.hasMoreTokens()) {  
  35.                      word.set(itr.nextToken());  
  36.                      output.collect(word, one);  
  37.               }  
  38.   }  
  39.   }  
  40.   
  41.   public static class Reduce extends MapReduceBase  
  42.     implements Reducer<Text, IntWritable, Text, IntWritable> {  
  43.      
  44.        @Override  
  45.     public void reduce(Text key, Iterator<IntWritable> values,  
  46.                        OutputCollector<Text, IntWritable> output,  
  47.                        Reporter reporter) throws IOException {  
  48.       int sum = 0;  
  49.       while (values.hasNext()) {  
  50.         sum += values.next().get();  
  51.       }  
  52.       output.collect(key, new IntWritable(sum));  
  53.     }  
  54.   }  
  55.    
  56.   public static void main(String[] args) throws Exception {  
  57.          JobConf conf = new JobConf(WordCountTxt.class);  
  58.          conf.setJobName("wordcounttxt");  
  59.           
  60.          conf.setOutputKeyClass(Text.class);  
  61.          conf.setOutputValueClass(IntWritable.class);  
  62.           
  63.          conf.setMapperClass(MapClass.class);  
  64.          conf.setCombinerClass(Reduce.class);  
  65.          conf.setReducerClass(Reduce.class);  
  66.           
  67.          FileInputFormat.setInputPaths(conf, new Path(args[0]));  
  68.          FileOutputFormat.setOutputPath(conf, new Path(args[1]));  
  69.                 
  70.          JobClient.runJob(conf);     
  71.   }  
  72.     
  73. }  



代码2:sequencefile版wordcount

Java代码  
  1. import java.io.IOException;  
  2. import java.util.Iterator;  
  3. import java.util.StringTokenizer;  
  4.   
  5. import org.apache.hadoop.fs.Path;  
  6. import org.apache.hadoop.io.IntWritable;  
  7. import org.apache.hadoop.io.Text;  
  8. import org.apache.hadoop.mapred.FileInputFormat;  
  9. import org.apache.hadoop.mapred.FileOutputFormat;  
  10. import org.apache.hadoop.mapred.JobClient;  
  11. import org.apache.hadoop.mapred.JobConf;  
  12. import org.apache.hadoop.mapred.MapReduceBase;  
  13. import org.apache.hadoop.mapred.Mapper;  
  14. import org.apache.hadoop.mapred.OutputCollector;  
  15. import org.apache.hadoop.mapred.Reducer;  
  16. import org.apache.hadoop.mapred.Reporter;  
  17. import org.apache.hadoop.mapred.SequenceFileAsTextInputFormat;  
  18. import org.apache.hadoop.mapred.TextOutputFormat;  
  19.   
  20.   
  21. public class WordCountSeq {  
  22.   
  23.          public static class MapClass extends MapReduceBase  
  24.            implements Mapper<Text, Text, Text, IntWritable> {  
  25.             
  26.            private final static IntWritable one = new IntWritable(1);  
  27.            private Text word = new Text();  
  28.             
  29.               @Override  
  30.               public void map(Text key, Text value,  
  31.                            OutputCollector<Text, IntWritable> output,  
  32.                    Reporter reporter) throws IOException {  
  33.                      String line = value.toString();  
  34.                      StringTokenizer itr = new StringTokenizer(line);  
  35.                      while (itr.hasMoreTokens()) {  
  36.                            word.set(itr.nextToken());  
  37.                            output.collect(word, one);  
  38.                      }  
  39.          }  
  40.          }  
  41.   
  42.          public static class Reduce extends MapReduceBase  
  43.            implements Reducer<Text, IntWritable, Text, IntWritable> {  
  44.             
  45.               @Override  
  46.            public void reduce(Text key, Iterator<IntWritable> values,  
  47.                               OutputCollector<Text, IntWritable> output,  
  48.                               Reporter reporter) throws IOException {  
  49.              int sum = 0;  
  50.              while (values.hasNext()) {  
  51.                sum += values.next().get();  
  52.              }  
  53.              output.collect(key, new IntWritable(sum));  
  54.            }  
  55.          }  
  56.          /** 
  57.           * @param args 
  58.         * @throws IOException 
  59.           */  
  60.          public static void main(String[] args) throws IOException {  
  61.               // TODO Auto-generated method stub  
  62.                 JobConf conf = new JobConf(WordCountSeq.class);  
  63.                 conf.setJobName("wordcountseq");  
  64.                  
  65.                 conf.setOutputKeyClass(Text.class);  
  66.                 conf.setOutputValueClass(IntWritable.class);  
  67.                  
  68.                 conf.setMapperClass(MapClass.class);  
  69.                 conf.setCombinerClass(Reduce.class);  
  70.                 conf.setReducerClass(Reduce.class);  
  71.                  
  72.                 conf.setInputFormat(SequenceFileAsTextInputFormat.class);  
  73.                 conf.setOutputFormat(TextOutputFormat.class);  
  74.                  
  75.                 FileInputFormat.setInputPaths(conf, new Path(args[0]));  
  76.                 FileOutputFormat.setOutputPath(conf, new Path(args[1]));  
  77.                        
  78.                 JobClient.runJob(conf);  
  79.          }  
  80.   
  81. }  



代码3:rcfile版wordcount

Java代码  
  1. import java.io.IOException;  
  2. import java.util.Iterator;  
  3.   
  4. import org.apache.hadoop.fs.Path;  
  5. import org.apache.hadoop.hive.ql.io.RCFileInputFormat;  
  6. import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;  
  7. import org.apache.hadoop.io.IntWritable;  
  8. import org.apache.hadoop.io.LongWritable;  
  9. import org.apache.hadoop.io.Text;  
  10. import org.apache.hadoop.mapred.FileInputFormat;  
  11. import org.apache.hadoop.mapred.FileOutputFormat;  
  12. import org.apache.hadoop.mapred.JobClient;  
  13. import org.apache.hadoop.mapred.JobConf;  
  14. import org.apache.hadoop.mapred.MapReduceBase;  
  15. import org.apache.hadoop.mapred.Mapper;  
  16. import org.apache.hadoop.mapred.OutputCollector;  
  17. import org.apache.hadoop.mapred.Reducer;  
  18. import org.apache.hadoop.mapred.Reporter;  
  19. import org.apache.hadoop.mapred.TextOutputFormat;  
  20.   
  21. public class WordCountRC {  
  22.       
  23.      public static class MapClass  
  24.           extends MapReduceBase implements Mapper<LongWritable, BytesRefArrayWritable, Text, IntWritable> {  
  25.            
  26.           private final static IntWritable one = new IntWritable(1);  
  27.           private Text word =new Text();  
  28.       
  29.           @Override  
  30.           public void map(LongWritable key, BytesRefArrayWritable value,  
  31.                     OutputCollector<Text, IntWritable> output, Reporter reporter)  
  32.                     throws IOException {  
  33.                Text txt = new Text();  
  34.                txt.set(value.get(0).getData(), value.get(0).getStart(), value.get(0).getLength());  
  35.                String[] result = txt.toString().split("\\s");  
  36.                for(int i=0; i < result.length; i++){  
  37.                     word.set(result[i]);  
  38.                     output.collect(word, one);      
  39.                }  
  40.           }           
  41.      }  
  42.   
  43.      public static class Reduce  
  44.           extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {  
  45.       
  46.           private IntWritable result = new IntWritable();  
  47.            
  48.           @Override  
  49.           public void reduce(Text key, Iterator<IntWritable> value,  
  50.                     OutputCollector<Text, IntWritable> output, Reporter reporter)  
  51.                     throws IOException {  
  52.                int sum = 0;  
  53.                while (value.hasNext()) {  
  54.                     sum += value.next().get();  
  55.                }  
  56.                 
  57.                result.set(sum);  
  58.                output.collect(key, result);                
  59.           }  
  60.            
  61.      }  
  62.      /** 
  63.      * @param args 
  64.      */  
  65.      public static void main(String[] args) throws IOException{  
  66.           JobConf conf = new JobConf(WordCountRC.class);  
  67.           conf.setJobName("WordCountRC");  
  68.            
  69.           conf.setOutputKeyClass(Text.class);  
  70.           conf.setOutputValueClass(IntWritable.class);  
  71.            
  72.           conf.setMapperClass(MapClass.class);  
  73.           conf.setCombinerClass(Reduce.class);  
  74.           conf.setReducerClass(Reduce.class);  
  75.            
  76.           conf.setInputFormat(RCFileInputFormat.class);  
  77.           conf.setOutputFormat(TextOutputFormat.class);  
  78.            
  79.           FileInputFormat.setInputPaths(conf, new Path(args[0]));  
  80.           FileOutputFormat.setOutputPath(conf, new Path(args[1]));  
  81.            
  82.           JobClient.runJob(conf);  
  83.      }  
  84. }  



原始数据:

Java代码  
  1. hadoop fs -text /group/alidw-dev/seq_input/attempt_201201101606_2339628_m_000000_0  
  2. 12/02/13 17:07:57 INFO util.NativeCodeLoader: Loaded the native-hadoop library  
  3. 12/02/13 17:07:57 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library  
  4. 12/02/13 17:07:57 INFO compress.CodecPool: Got brand-new decompressor  
  5. 12/02/13 17:07:57 INFO compress.CodecPool: Got brand-new decompressor  
  6. 12/02/13 17:07:57 INFO compress.CodecPool: Got brand-new decompressor  
  7. 12/02/13 17:07:57 INFO compress.CodecPool: Got brand-new decompressor  
  8.         hello, i am ok. are you?  
  9.         i am fine too!  



编译打包完成后执行:

Java代码  
  1. hadoop jarWordCountSeq.jar WordCountSeq /group/alidw-dev/seq_input/ /group/alidw-dev/rc_output  



执行完毕就能看到最终结果:

Java代码  
  1. hadoop fs -cat /group/alidw-dev/seq_output/part-00000  
  2. am      2  
  3. are     1  
  4. fine    1  
  5. hello,  1  
  6. i       2  
  7. ok.     1  
  8. too!    1  
  9. you?    1  

 

 

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多