分享

mapreduce 实现内连接,左连接,右连接,全连接,反连接

 whbsdu 2015-01-08
  1. 测试数据  
  2. more user.txt (用户id,用户名)  
  3. 1   用户1  
  4. 2   用户2  
  5. 3   用户3  
  6.   
  7. more post.txt (用户id,帖子id,标题)  
  8. 1   1   贴子1  
  9. 1   2   贴子2  
  10. 2   3   帖子3  
  11. 4   4   贴子4  
  12. 5   5   贴子5  
  13. 5   6   贴子6  
  14. 5   7   贴子7  

 

 

Java代码  收藏代码
  1. 查询结果  
  2.   
  3. 内连接  
  4. 1   用户1 1   1   贴子1  
  5. 1   用户1 1   2   贴子2  
  6. 2   用户2 2   3   帖子3  
  7.   
  8. 左外连接  
  9. 1   用户1 1   1   贴子1  
  10. 1   用户1 1   2   贴子2  
  11. 2   用户2 2   3   帖子3  
  12. 3   用户3            
  13.   
  14. 右外连接  
  15. 1   用户1 1   1   贴子1  
  16. 1   用户1 1   2   贴子2  
  17. 2   用户2 2   3   帖子3  
  18.         4   4   贴子4  
  19.         5   5   贴子5  
  20.         5   6   贴子6  
  21.         5   7   贴子7  
  22.   
  23.   
  24. 全外连接  
  25. 1   用户1 1   1   贴子1  
  26. 1   用户1 1   2   贴子2  
  27. 2   用户2 2   3   帖子3  
  28. 3   用户3        
  29.         4   4   贴子4  
  30.         5   5   贴子5  
  31.         5   6   贴子6  
  32.         5   7   贴子7  
  33.   
  34. 反连接  
  35. 3   用户3            
  36.         4   4   贴子4  
  37.         5   5   贴子5  
  38.         5   6   贴子6  
  39.         5   7   贴子7  

 

代码如下:

Java代码  收藏代码
  1. package mapreduce.pattern.join;  
  2.   
  3. import java.io.DataInput;  
  4. import java.io.DataOutput;  
  5. import java.io.IOException;  
  6. import java.net.URI;  
  7. import java.util.ArrayList;  
  8. import java.util.List;  
  9.   
  10. import multiinput.post.PostJob;  
  11.   
  12. import org.apache.hadoop.conf.Configuration;  
  13. import org.apache.hadoop.fs.FileSystem;  
  14. import org.apache.hadoop.fs.Path;  
  15. import org.apache.hadoop.io.LongWritable;  
  16. import org.apache.hadoop.io.Text;  
  17. import org.apache.hadoop.io.Writable;  
  18. import org.apache.hadoop.mapreduce.Job;  
  19. import org.apache.hadoop.mapreduce.Mapper;  
  20. import org.apache.hadoop.mapreduce.Reducer;  
  21. import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;  
  22. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
  23. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
  24.   
  25. /** 
  26.  * mapreduce 实现内连接,左连接,右连接,全连接,反连接 
  27.  * user.txt 用户表 
  28.  * post.txt 帖子表 
  29.  * 关联字段 userId 
  30.  * @author wxj 
  31.  * 
  32.  */  
  33. public class UserAndPostJoinJob  
  34. {  
  35.   
  36.     static class UserAndPostWritable implements Writable  
  37.     {  
  38.           
  39.         /** 
  40.          * 类型 U表示用户,P表示帖子 
  41.          */  
  42.         private String type;  
  43.         private String data;  
  44.           
  45.         public UserAndPostWritable()  
  46.         {  
  47.               
  48.         }  
  49.           
  50.         public UserAndPostWritable(String type, String data)  
  51.         {  
  52.             super();  
  53.             this.type = type;  
  54.             this.data = data;  
  55.         }  
  56.   
  57.         public String getType()  
  58.         {  
  59.             return type;  
  60.         }  
  61.   
  62.         public void setType(String type)  
  63.         {  
  64.             this.type = type;  
  65.         }  
  66.   
  67.         public String getData()  
  68.         {  
  69.             return data;  
  70.         }  
  71.   
  72.         public void setData(String data)  
  73.         {  
  74.             this.data = data;  
  75.         }  
  76.   
  77.         @Override  
  78.         public void readFields(DataInput input) throws IOException  
  79.         {  
  80.             type = input.readUTF();  
  81.             data = input.readUTF();  
  82.         }  
  83.   
  84.         @Override  
  85.         public void write(DataOutput output) throws IOException  
  86.         {  
  87.             output.writeUTF(type);  
  88.             output.writeUTF(data);  
  89.         }  
  90.           
  91.     }  
  92.       
  93.     static class UserMapper extends Mapper<LongWritable, Text, Text, UserAndPostWritable>  
  94.     {  
  95.         @Override  
  96.         protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException  
  97.         {  
  98.             String[] arr = value.toString().split("\t");  
  99.             Text userId = new Text(arr[0]);  
  100.             context.write(userId, new UserAndPostWritable("U",value.toString()));  
  101.         }  
  102.   
  103.     }  
  104.       
  105.     static class PostMapper extends Mapper<LongWritable, Text, Text, UserAndPostWritable>  
  106.     {  
  107.         @Override  
  108.         protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException  
  109.         {  
  110.             String[] arr = value.toString().split("\t");  
  111.             Text userId = new Text(arr[0]);  
  112.             context.write(userId, new UserAndPostWritable("P",value.toString()));  
  113.             System.out.println(userId);  
  114.         }  
  115.   
  116.     }  
  117.       
  118.     static class PostReducer extends Reducer<Text, UserAndPostWritable, Text, Text>  
  119.      {  
  120.   
  121.         private List<Text> users = new ArrayList<Text>();  
  122.         private List<Text> posts = new ArrayList<Text>();  
  123.           
  124.         private String joinType;  
  125.   
  126.           
  127.         @Override  
  128.         protected void setup(Context context) throws IOException,InterruptedException  
  129.         {  
  130.             super.setup(context);  
  131.             joinType = context.getConfiguration().get("joinType");  
  132.             //System.out.println("joinType: " + joinType);  
  133.         }  
  134.   
  135.         protected void reduce(Text key, Iterable<UserAndPostWritable> iterable,Context context)throws IOException, InterruptedException  
  136.         {  
  137.             users.clear();  
  138.             posts.clear();  
  139.             for(UserAndPostWritable data : iterable)  
  140.             {  
  141.                 //System.out.println(data.getType() + "," + data.getData());  
  142.                 if(data.getType().equals("U"))  
  143.                 {  
  144.                     users.add(new Text(data.getData()));  
  145.                 }  
  146.                 else {  
  147.                     posts.add(new Text(data.getData()));  
  148.                 }  
  149.             }  
  150.               
  151.             if(joinType.equals("innerJoin"))//内连接  
  152.             {  
  153.                 if(users.size() > 0 && posts.size() > 0)  
  154.                 {  
  155.                     for(Text user : users)  
  156.                     {  
  157.                         for(Text post : posts)  
  158.                         {  
  159.                             context.write(new Text(user),new Text(post));  
  160.                         }  
  161.                     }  
  162.                 }  
  163.             }  
  164.             else if(joinType.equals("leftOuter"))//左外连接  
  165.             {  
  166.                 for(Text user : users)  
  167.                 {  
  168.                     if(posts.size() > 0)  
  169.                     {  
  170.                         for(Text post : posts)  
  171.                         {  
  172.                             context.write(new Text(user),new Text(post));  
  173.                         }  
  174.                     }  
  175.                     else {  
  176.                         context.write(new Text(user),createEmptyPost());  
  177.                     }  
  178.                 }  
  179.             }  
  180.             else if(joinType.equals("rightOuter"))//右外连接  
  181.             {  
  182.                 for(Text post : posts)  
  183.                 {  
  184.                     if(users.size() > 0)  
  185.                     {  
  186.                         for(Text user : users)  
  187.                         {  
  188.                             context.write(new Text(user),new Text(post));  
  189.                         }  
  190.                     }  
  191.                     else {  
  192.                         context.write(createEmptyUser(), post);  
  193.                     }  
  194.                 }  
  195.             }  
  196.             else if(joinType.equals("allOuter"))//全外连接  
  197.             {  
  198.                 if(users.size() > 0)  
  199.                 {  
  200.                     for(Text user : users)  
  201.                     {  
  202.                         if(posts.size() > 0)  
  203.                         {  
  204.                             for(Text post : posts)  
  205.                             {  
  206.                                 context.write(new Text(user),new Text(post));  
  207.                             }  
  208.                         }  
  209.                         else{  
  210.                             context.write(new Text(user),createEmptyUser());  
  211.                         }  
  212.                     }  
  213.                 }else {  
  214.                     for(Text post : posts)  
  215.                     {  
  216.                         if(users.size() > 0)  
  217.                         {  
  218.                             for(Text user : users)  
  219.                             {  
  220.                                 context.write(new Text(user),new Text(post));  
  221.                             }  
  222.                         }  
  223.                         else{  
  224.                             context.write(createEmptyUser(), post);  
  225.                         }  
  226.                     }  
  227.                 }  
  228.             }  
  229.             else if(joinType.equals("anti"))//反连接  
  230.             {  
  231.                 if(users.size() == 0 ^ posts.size() == 0)  
  232.                 {  
  233.                     for(Text user : users)  
  234.                     {  
  235.                         context.write(new Text(user),createEmptyPost());  
  236.                     }  
  237.                     for(Text post : posts)  
  238.                     {  
  239.                         context.write(createEmptyUser(),new Text(post));  
  240.                     }  
  241.                 }  
  242.             }  
  243.         }  
  244.           
  245.         private Text createEmptyUser()  
  246.         {  
  247.             return new Text(" \t ");  
  248.         }  
  249.           
  250.         private Text createEmptyPost()  
  251.         {  
  252.             return new Text(" \t \t ");  
  253.         }  
  254.           
  255.      }  
  256.        
  257.      public static void main(String[] args)  
  258.      {  
  259.             Configuration configuration = new Configuration();  
  260.             try  
  261.             {  
  262.                 FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"),configuration);  
  263.                 Job job = new Job(configuration);  
  264.                 job.setJarByClass(PostJob.class);  
  265.                 //设置连接类型  
  266.                 //innerJoin,leftOuter,rightOuter,allOuter,anti  
  267.                 job.getConfiguration().set("joinType", "anti");  
  268.                 //设置输出到part-r-00000时的分隔符  
  269.                 job.getConfiguration().set("mapred.textoutputformat.separator", "\t");  
  270.                   
  271.                 MultipleInputs.addInputPath(job, new Path("hdfs://master:9000/input/userandpost/user.txt"),TextInputFormat.class,UserMapper.class);  
  272.                 MultipleInputs.addInputPath(job, new Path("hdfs://master:9000/input/userandpost/post.txt"), TextInputFormat.class, PostMapper.class);  
  273.                   
  274.                 job.setMapOutputKeyClass(Text.class);  
  275.                 job.setMapOutputValueClass(UserAndPostWritable.class);  
  276.                 job.setReducerClass(PostReducer.class);  
  277.                 job.setOutputKeyClass(Text.class);  
  278.                 job.setOutputValueClass(Text.class);  
  279.                   
  280.                 Path outPath = new Path("hdfs://master:9000/output/userandpost");  
  281.                 if(fs.exists(outPath))  
  282.                 {  
  283.                     fs.delete(outPath,true);  
  284.                 }  
  285.                 TextOutputFormat.setOutputPath(job, outPath);  
  286.                 job.setOutputFormatClass(TextOutputFormat.class);  
  287.                   
  288.                 job.waitForCompletion(true);  
  289.                   
  290.             } catch (Exception e)  
  291.             {  
  292.                 e.printStackTrace();  
  293.             }  
  294.      }  
  295.       
  296. }  

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多