- 测试数据
- more user.txt (用户id,用户名)
- 1 用户1
- 2 用户2
- 3 用户3
-
- more post.txt (用户id,帖子id,标题)
- 1 1 贴子1
- 1 2 贴子2
- 2 3 帖子3
- 4 4 贴子4
- 5 5 贴子5
- 5 6 贴子6
- 5 7 贴子7
- 查询结果
-
- 内连接
- 1 用户1 1 1 贴子1
- 1 用户1 1 2 贴子2
- 2 用户2 2 3 帖子3
-
- 左外连接
- 1 用户1 1 1 贴子1
- 1 用户1 1 2 贴子2
- 2 用户2 2 3 帖子3
- 3 用户3
-
- 右外连接
- 1 用户1 1 1 贴子1
- 1 用户1 1 2 贴子2
- 2 用户2 2 3 帖子3
- 4 4 贴子4
- 5 5 贴子5
- 5 6 贴子6
- 5 7 贴子7
-
-
- 全外连接
- 1 用户1 1 1 贴子1
- 1 用户1 1 2 贴子2
- 2 用户2 2 3 帖子3
- 3 用户3
- 4 4 贴子4
- 5 5 贴子5
- 5 6 贴子6
- 5 7 贴子7
-
- 反连接
- 3 用户3
- 4 4 贴子4
- 5 5 贴子5
- 5 6 贴子6
- 5 7 贴子7
代码如下:
- package mapreduce.pattern.join;
-
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
- import java.net.URI;
- import java.util.ArrayList;
- import java.util.List;
-
- import multiinput.post.PostJob;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.Writable;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-
- /**
- * mapreduce 实现内连接,左连接,右连接,全连接,反连接
- * user.txt 用户表
- * post.txt 帖子表
- * 关联字段 userId
- * @author wxj
- *
- */
- public class UserAndPostJoinJob
- {
-
- static class UserAndPostWritable implements Writable
- {
-
- /**
- * 类型 U表示用户,P表示帖子
- */
- private String type;
- private String data;
-
- public UserAndPostWritable()
- {
-
- }
-
- public UserAndPostWritable(String type, String data)
- {
- super();
- this.type = type;
- this.data = data;
- }
-
- public String getType()
- {
- return type;
- }
-
- public void setType(String type)
- {
- this.type = type;
- }
-
- public String getData()
- {
- return data;
- }
-
- public void setData(String data)
- {
- this.data = data;
- }
-
- @Override
- public void readFields(DataInput input) throws IOException
- {
- type = input.readUTF();
- data = input.readUTF();
- }
-
- @Override
- public void write(DataOutput output) throws IOException
- {
- output.writeUTF(type);
- output.writeUTF(data);
- }
-
- }
-
- static class UserMapper extends Mapper<LongWritable, Text, Text, UserAndPostWritable>
- {
- @Override
- protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException
- {
- String[] arr = value.toString().split("\t");
- Text userId = new Text(arr[0]);
- context.write(userId, new UserAndPostWritable("U",value.toString()));
- }
-
- }
-
- static class PostMapper extends Mapper<LongWritable, Text, Text, UserAndPostWritable>
- {
- @Override
- protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException
- {
- String[] arr = value.toString().split("\t");
- Text userId = new Text(arr[0]);
- context.write(userId, new UserAndPostWritable("P",value.toString()));
- System.out.println(userId);
- }
-
- }
-
- static class PostReducer extends Reducer<Text, UserAndPostWritable, Text, Text>
- {
-
- private List<Text> users = new ArrayList<Text>();
- private List<Text> posts = new ArrayList<Text>();
-
- private String joinType;
-
-
- @Override
- protected void setup(Context context) throws IOException,InterruptedException
- {
- super.setup(context);
- joinType = context.getConfiguration().get("joinType");
- //System.out.println("joinType: " + joinType);
- }
-
- protected void reduce(Text key, Iterable<UserAndPostWritable> iterable,Context context)throws IOException, InterruptedException
- {
- users.clear();
- posts.clear();
- for(UserAndPostWritable data : iterable)
- {
- //System.out.println(data.getType() + "," + data.getData());
- if(data.getType().equals("U"))
- {
- users.add(new Text(data.getData()));
- }
- else {
- posts.add(new Text(data.getData()));
- }
- }
-
- if(joinType.equals("innerJoin"))//内连接
- {
- if(users.size() > 0 && posts.size() > 0)
- {
- for(Text user : users)
- {
- for(Text post : posts)
- {
- context.write(new Text(user),new Text(post));
- }
- }
- }
- }
- else if(joinType.equals("leftOuter"))//左外连接
- {
- for(Text user : users)
- {
- if(posts.size() > 0)
- {
- for(Text post : posts)
- {
- context.write(new Text(user),new Text(post));
- }
- }
- else {
- context.write(new Text(user),createEmptyPost());
- }
- }
- }
- else if(joinType.equals("rightOuter"))//右外连接
- {
- for(Text post : posts)
- {
- if(users.size() > 0)
- {
- for(Text user : users)
- {
- context.write(new Text(user),new Text(post));
- }
- }
- else {
- context.write(createEmptyUser(), post);
- }
- }
- }
- else if(joinType.equals("allOuter"))//全外连接
- {
- if(users.size() > 0)
- {
- for(Text user : users)
- {
- if(posts.size() > 0)
- {
- for(Text post : posts)
- {
- context.write(new Text(user),new Text(post));
- }
- }
- else{
- context.write(new Text(user),createEmptyUser());
- }
- }
- }else {
- for(Text post : posts)
- {
- if(users.size() > 0)
- {
- for(Text user : users)
- {
- context.write(new Text(user),new Text(post));
- }
- }
- else{
- context.write(createEmptyUser(), post);
- }
- }
- }
- }
- else if(joinType.equals("anti"))//反连接
- {
- if(users.size() == 0 ^ posts.size() == 0)
- {
- for(Text user : users)
- {
- context.write(new Text(user),createEmptyPost());
- }
- for(Text post : posts)
- {
- context.write(createEmptyUser(),new Text(post));
- }
- }
- }
- }
-
- private Text createEmptyUser()
- {
- return new Text(" \t ");
- }
-
- private Text createEmptyPost()
- {
- return new Text(" \t \t ");
- }
-
- }
-
- public static void main(String[] args)
- {
- Configuration configuration = new Configuration();
- try
- {
- FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"),configuration);
- Job job = new Job(configuration);
- job.setJarByClass(PostJob.class);
- //设置连接类型
- //innerJoin,leftOuter,rightOuter,allOuter,anti
- job.getConfiguration().set("joinType", "anti");
- //设置输出到part-r-00000时的分隔符
- job.getConfiguration().set("mapred.textoutputformat.separator", "\t");
-
- MultipleInputs.addInputPath(job, new Path("hdfs://master:9000/input/userandpost/user.txt"),TextInputFormat.class,UserMapper.class);
- MultipleInputs.addInputPath(job, new Path("hdfs://master:9000/input/userandpost/post.txt"), TextInputFormat.class, PostMapper.class);
-
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(UserAndPostWritable.class);
- job.setReducerClass(PostReducer.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
-
- Path outPath = new Path("hdfs://master:9000/output/userandpost");
- if(fs.exists(outPath))
- {
- fs.delete(outPath,true);
- }
- TextOutputFormat.setOutputPath(job, outPath);
- job.setOutputFormatClass(TextOutputFormat.class);
-
- job.waitForCompletion(true);
-
- } catch (Exception e)
- {
- e.printStackTrace();
- }
- }
-
- }
|