目录 正文
影评案例
数据及需求
数据格式
movies.dat 3884条数据
1::Toy Story (1995)::Animation|Children's|Comedy
2::Jumanji (1995)::Adventure|Children's|Fantasy
3::Grumpier Old Men (1995)::Comedy|Romance
4::Waiting to Exhale (1995)::Comedy|Drama
5::Father of the Bride Part II (1995)::Comedy
6::Heat (1995)::Action|Crime|Thriller
7::Sabrina (1995)::Comedy|Romance
8::Tom and Huck (1995)::Adventure|Children's
9::Sudden Death (1995)::Action
10::GoldenEye (1995)::Action|Adventure|Thriller
users.dat 6041条数据
1::F::1::10::48067
2::M::56::16::70072
3::M::25::15::55117
4::M::45::7::02460
5::M::25::20::55455
6::F::50::9::55117
7::M::35::1::06810
8::M::25::12::11413
9::M::25::17::61614
10::F::35::1::95370
ratings.dat 1000210条数据
1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968
1::3408::4::978300275
1::2355::5::978824291
1::1197::3::978302268
1::1287::5::978302039
1::2804::5::978300719
1::594::4::978302268
1::919::4::978301368
数据解释
1、users.dat 数据格式为: 2::M::56::16::70072 对应字段为:UserID BigInt, Gender String, Age Int, Occupation String, Zipcode String 对应字段中文解释:用户id,性别,年龄,职业,邮政编码
2、movies.dat 数据格式为: 2::Jumanji (1995)::Adventure|Children's|Fantasy 对应字段为:MovieID BigInt, Title String, Genres String 对应字段中文解释:电影ID,电影名字,电影类型
3、ratings.dat 数据格式为: 1::1193::5::978300760 对应字段为:UserID BigInt, MovieID BigInt, Rating Double, Timestamped String 对应字段中文解释:用户ID,电影ID,评分,评分时间戳
用户ID,电影ID,评分,评分时间戳,性别,年龄,职业,邮政编码,电影名字,电影类型 userid, movieId, rate, ts, gender, age, occupation, zipcode, movieName, movieType
需求统计
(1)求被评分次数最多的10部电影,并给出评分次数(电影名,评分次数) (2)分别求男性,女性当中评分最高的10部电影(性别,电影名,评分) (3)求movieid = 2116这部电影各年龄段(因为年龄就只有7个,就按这个7个分就好了)的平均影评(年龄段,评分) (4)求最喜欢看电影(影评次数最多)的那位女性评最高分的10部电影的平均影评分(人,电影名,影评) (5)求好片(评分>=4.0)最多的那个年份的最好看的10部电影 (6)求1997年上映的电影中,评分最高的10部Comedy类电影 (7)该影评库中各种类型电影中评价最高的5部电影(类型,电影名,平均影评分) (8)各年评分最高的电影类型(年份,类型,影评分) (9)每个地区最高评分的电影名,把结果存入HDFS(地区,电影名,电影评分)
代码实现
1、求被评分次数最多的10部电影,并给出评分次数(电影名,评分次数)
分析:此问题涉及到2个文件,ratings.dat和movies.dat,2个文件数据量倾斜比较严重,此处应该使用mapjoin方法,先将数据量较小的文件预先加载到内存中
MovieMR1_1.java
1 public class MovieMR1_1 {
2
3 public static void main(String[] args) throws Exception {
4
5 if(args.length < 4) {
6 args = new String[4];
7 args[0] = "/movie/input/";
8 args[1] = "/movie/output/";
9 args[2] = "/movie/cache/movies.dat";
10 args[3] = "/movie/output_last/";
11 }
12
13
14 Configuration conf1 = new Configuration();
15 conf1.set("fs.defaultFS", "hdfs://hadoop1:9000/");
16 System.setProperty("HADOOP_USER_NAME", "hadoop");
17 FileSystem fs1 = FileSystem.get(conf1);
18
19
20 Job job1 = Job.getInstance(conf1);
21
22 job1.setJarByClass(MovieMR1_1.class);
23
24 job1.setMapperClass(MoviesMapJoinRatingsMapper1.class);
25 job1.setReducerClass(MovieMR1Reducer1.class);
26
27 job1.setMapOutputKeyClass(Text.class);
28 job1.setMapOutputValueClass(IntWritable.class);
29
30 job1.setOutputKeyClass(Text.class);
31 job1.setOutputValueClass(IntWritable.class);
32
33
34
35 //缓存普通文件到task运行节点的工作目录
36 URI uri = new URI("hdfs://hadoop1:9000"+args[2]);
37 System.out.println(uri);
38 job1.addCacheFile(uri);
39
40
41 Path inputPath1 = new Path(args[0]);
42 Path outputPath1 = new Path(args[1]);
43 if(fs1.exists(outputPath1)) {
44 fs1.delete(outputPath1, true);
45 }
46 FileInputFormat.setInputPaths(job1, inputPath1);
47 FileOutputFormat.setOutputPath(job1, outputPath1);
48
49 boolean isDone = job1.waitForCompletion(true);
50 System.exit(isDone ? 0 : 1);
51
52 }
53
54 public static class MoviesMapJoinRatingsMapper1 extends Mapper<LongWritable, Text, Text, IntWritable>{
55
56 //用了存放加载到内存中的movies.dat数据
57 private static Map<String,String> movieMap = new HashMap<>();
58 //key:电影ID
59 Text outKey = new Text();
60 //value:电影名+电影类型
61 IntWritable outValue = new IntWritable();
62
63
64 /**
65 * movies.dat: 1::Toy Story (1995)::Animation|Children's|Comedy
66 *
67 *
68 * 将小表(movies.dat)中的数据预先加载到内存中去
69 * */
70 @Override
71 protected void setup(Context context) throws IOException, InterruptedException {
72
73 Path[] localCacheFiles = context.getLocalCacheFiles();
74
75
76 String strPath = localCacheFiles[0].toUri().toString();
77
78 BufferedReader br = new BufferedReader(new FileReader(strPath));
79 String readLine;
80 while((readLine = br.readLine()) != null) {
81
82 String[] split = readLine.split("::");
83 String movieId = split[0];
84 String movieName = split[1];
85 String movieType = split[2];
86
87 movieMap.put(movieId, movieName+"\t"+movieType);
88 }
89
90 br.close();
91 }
92
93
94 /**
95 * movies.dat: 1 :: Toy Story (1995) :: Animation|Children's|Comedy
96 * 电影ID 电影名字 电影类型
97 *
98 * ratings.dat: 1 :: 1193 :: 5 :: 978300760
99 * 用户ID 电影ID 评分 评分时间戳
100 *
101 * value: ratings.dat读取的数据
102 * */
103 @Override
104 protected void map(LongWritable key, Text value, Context context)
105 throws IOException, InterruptedException {
106
107 String[] split = value.toString().split("::");
108
109 String userId = split[0];
110 String movieId = split[1];
111 String movieRate = split[2];
112
113 //根据movieId从内存中获取电影名和类型
114 String movieNameAndType = movieMap.get(movieId);
115 String movieName = movieNameAndType.split("\t")[0];
116 String movieType = movieNameAndType.split("\t")[1];
117
118 outKey.set(movieName);
119 outValue.set(Integer.parseInt(movieRate));
120
121 context.write(outKey, outValue);
122
123 }
124
125 }
126
127
128 public static class MovieMR1Reducer1 extends Reducer<Text, IntWritable, Text, IntWritable>{
129 //每部电影评论的次数
130 int count;
131 //评分次数
132 IntWritable outValue = new IntWritable();
133
134 @Override
135 protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
136
137 count = 0;
138
139 for(IntWritable value : values) {
140 count++;
141 }
142
143 outValue.set(count);
144
145 context.write(key, outValue);
146 }
147
148 }
149
150
151 }
MovieMR1_2.java
1 public class MovieMR1_2 {
2
3 public static void main(String[] args) throws Exception {
4 if(args.length < 2) {
5 args = new String[2];
6 args[0] = "/movie/output/";
7 args[1] = "/movie/output_last/";
8 }
9
10
11 Configuration conf1 = new Configuration();
12 conf1.set("fs.defaultFS", "hdfs://hadoop1:9000/");
13 System.setProperty("HADOOP_USER_NAME", "hadoop");
14 FileSystem fs1 = FileSystem.get(conf1);
15
16
17 Job job = Job.getInstance(conf1);
18
19 job.setJarByClass(MovieMR1_2.class);
20
21 job.setMapperClass(MoviesMapJoinRatingsMapper2.class);
22 job.setReducerClass(MovieMR1Reducer2.class);
23
24
25 job.setMapOutputKeyClass(MovieRating.class);
26 job.setMapOutputValueClass(NullWritable.class);
27
28 job.setOutputKeyClass(MovieRating.class);
29 job.setOutputValueClass(NullWritable.class);
30
31
32 Path inputPath1 = new Path(args[0]);
33 Path outputPath1 = new Path(args[1]);
34 if(fs1.exists(outputPath1)) {
35 fs1.delete(outputPath1, true);
36 }
37 //对第一步的输出结果进行降序排序
38 FileInputFormat.setInputPaths(job, inputPath1);
39 FileOutputFormat.setOutputPath(job, outputPath1);
40
41 boolean isDone = job.waitForCompletion(true);
42 System.exit(isDone ? 0 : 1);
43
44
45 }
46
47 //注意输出类型为自定义对象MovieRating,MovieRating按照降序排序
48 public static class MoviesMapJoinRatingsMapper2 extends Mapper<LongWritable, Text, MovieRating, NullWritable>{
49
50 MovieRating outKey = new MovieRating();
51
52 @Override
53 protected void map(LongWritable key, Text value, Context context)
54 throws IOException, InterruptedException {
55 //'Night Mother (1986) 70
56 String[] split = value.toString().split("\t");
57
58 outKey.setCount(Integer.parseInt(split[1]));;
59 outKey.setMovieName(split[0]);
60
61 context.write(outKey, NullWritable.get());
62
63 }
64
65 }
66
67 //排序之后自然输出,只取前10部电影
68 public static class MovieMR1Reducer2 extends Reducer<MovieRating, NullWritable, MovieRating, NullWritable>{
69
70 Text outKey = new Text();
71 int count = 0;
72
73 @Override
74 protected void reduce(MovieRating key, Iterable<NullWritable> values,Context context) throws IOException, InterruptedException {
75
76 for(NullWritable value : values) {
77 count++;
78 if(count > 10) {
79 return;
80 }
81 context.write(key, value);
82
83 }
84
85 }
86
87 }
88 }
MovieRating.java
1 public class MovieRating implements WritableComparable<MovieRating>{
2 private String movieName;
3 private int count;
4
5 public String getMovieName() {
6 return movieName;
7 }
8 public void setMovieName(String movieName) {
9 this.movieName = movieName;
10 }
11 public int getCount() {
12 return count;
13 }
14 public void setCount(int count) {
15 this.count = count;
16 }
17
18 public MovieRating() {}
19
20 public MovieRating(String movieName, int count) {
21 super();
22 this.movieName = movieName;
23 this.count = count;
24 }
25
26
27 @Override
28 public String toString() {
29 return movieName + "\t" + count;
30 }
31 @Override
32 public void readFields(DataInput in) throws IOException {
33 movieName = in.readUTF();
34 count = in.readInt();
35 }
36 @Override
37 public void write(DataOutput out) throws IOException {
38 out.writeUTF(movieName);
39 out.writeInt(count);
40 }
41 @Override
42 public int compareTo(MovieRating o) {
43 return o.count - this.count ;
44 }
45
46 }
2、分别求男性,女性当中评分最高的10部电影(性别,电影名,评分)
分析:此问题涉及到3个表的联合查询,需要先将2个小表的数据预先加载到内存中,再进行查询
对三表进行联合
MoviesThreeTableJoin.java
1 /**
2 * 进行3表的联合查询
3 *
4 * */
5 public class MoviesThreeTableJoin {
6
7 public static void main(String[] args) throws Exception {
8
9 if(args.length < 4) {
10 args = new String[4];
11 args[0] = "/movie/input/";
12 args[1] = "/movie/output2/";
13 args[2] = "/movie/cache/movies.dat";
14 args[3] = "/movie/cache/users.dat";
15 }
16
17 Configuration conf = new Configuration();
18 conf.set("fs.defaultFS", "hdfs://hadoop1:9000/");
19 System.setProperty("HADOOP_USER_NAME", "hadoop");
20 FileSystem fs = FileSystem.get(conf);
21 Job job = Job.getInstance(conf);
22
23 job.setJarByClass(MoviesThreeTableJoin.class);
24 job.setMapperClass(ThreeTableMapper.class);
25
26 job.setOutputKeyClass(Text.class);
27 job.setOutputValueClass(NullWritable.class);
28
29 URI uriUsers = new URI("hdfs://hadoop1:9000"+args[3]);
30 URI uriMovies = new URI("hdfs://hadoop1:9000"+args[2]);
31 job.addCacheFile(uriUsers);
32 job.addCacheFile(uriMovies);
33
34 Path inputPath = new Path(args[0]);
35 Path outputPath = new Path(args[1]);
36
37 if(fs.exists(outputPath)) {
38 fs.delete(outputPath,true);
39 }
40
41 FileInputFormat.setInputPaths(job, inputPath);
42 FileOutputFormat.setOutputPath(job, outputPath);
43
44 boolean isDone = job.waitForCompletion(true);
45 System.exit(isDone ? 0 : 1);
46
47 }
48
49
50 public static class ThreeTableMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
51
52
53 //用于缓存movies和users中数据
54 private Map<String,String> moviesMap = new HashMap<>();
55 private Map<String,String> usersMap = new HashMap<>();
56 //用来存放读取的ratings.dat中的一行数据
57 String[] ratings;
58
59
60 Text outKey = new Text();
61
62 @Override
63 protected void setup(Context context) throws IOException, InterruptedException {
64
65 BufferedReader br = null;
66
67 Path[] paths = context.getLocalCacheFiles();
68 String usersLine = null;
69 String moviesLine = null;
70
71 for(Path path : paths) {
72 String name = path.toUri().getPath();
73 if(name.contains("movies.dat")) {
74 //读取movies.dat文件中的一行数据
75 br = new BufferedReader(new FileReader(name));
76 while((moviesLine = br.readLine()) != null) {
77 /**对读取的这行数据按照::进行切分
78 * 2::Jumanji (1995)::Adventure|Children's|Fantasy
79 * 电影ID,电影名字,电影类型
80 *
81 *电影ID作为key,其余作为value
82 */
83 String[] split = moviesLine.split("::");
84 moviesMap.put(split[0], split[1]+"::"+split[2]);
85 }
86 }else if(name.contains("users.dat")) {
87 //读取users.dat文件中的一行数据
88 br = new BufferedReader(new FileReader(name));
89 while((usersLine = br.readLine()) != null) {
90 /**
91 * 对读取的这行数据按照::进行切分
92 * 2::M::56::16::70072
93 * 用户id,性别,年龄,职业,邮政编码
94 *
95 * 用户ID作为key,其他的作为value
96 * */
97 String[] split = usersLine.split("::");
98 System.out.println(split[0]+"----"+split[1]);
99 usersMap.put(split[0], split[1]+"::"+split[2]+"::"+split[3]+"::"+split[4]);
100 }
101 }
102
103 }
104
105 }
106
107
108 @Override
109 protected void map(LongWritable key, Text value, Context context)
110 throws IOException, InterruptedException {
111
112 ratings = value.toString().split("::");
113 //通过电影ID和用户ID获取用户表和电影表中的其他信息
114 String movies = moviesMap.get(ratings[1]);
115 String users = usersMap.get(ratings[0]);
116
117 //三表信息的联合
118 String threeTables = value.toString()+"::"+movies+"::"+users;
119 outKey.set(threeTables);
120
121 context.write(outKey, NullWritable.get());
122 }
123 }
124
125
126 }
三表联合之后的数据为
1000::1023::5::975041651::Winnie the Pooh and the Blustery Day (1968)::Animation|Children's::F::25::6::90027
1000::1029::3::975041859::Dumbo (1941)::Animation|Children's|Musical::F::25::6::90027
1000::1036::4::975040964::Die Hard (1988)::Action|Thriller::F::25::6::90027
1000::1104::5::975042421::Streetcar Named Desire, A (1951)::Drama::F::25::6::90027
1000::110::5::975040841::Braveheart (1995)::Action|Drama|War::F::25::6::90027
1000::1196::3::975040841::Star Wars: Episode V - The Empire Strikes Back (1980)::Action|Adventure|Drama|Sci-Fi|War::F::25::6::90027
1000::1198::5::975040841::Raiders of the Lost Ark (1981)::Action|Adventure::F::25::6::90027
1000::1200::4::975041125::Aliens (1986)::Action|Sci-Fi|Thriller|War::F::25::6::90027
1000::1201::5::975041025::Good, The Bad and The Ugly, The (1966)::Action|Western::F::25::6::90027
1000::1210::5::975040629::Star Wars: Episode VI - Return of the Jedi (1983)::Action|Adventure|Romance|Sci-Fi|War::F::25::6::90027
字段解释
1000 :: 1036 :: 4 :: 975040964 :: Die Hard (1988) :: Action|Thriller :: F :: 25 :: 6 :: 90027
用户ID 电影ID 评分 评分时间戳 电影名字 电影类型 性别 年龄 职业 邮政编码
0 1 2 3 4 5 6 7 8 9
要分别求男性,女性当中评分最高的10部电影(性别,电影名,评分)
1、以性别和电影名分组,以电影名+性别为key,以评分为value进行计算;
2、以性别+电影名+评分作为对象,以性别分组,以评分降序进行输出TOP10
业务逻辑:MoviesDemo2.java
1 public class MoviesDemo2 {
2
3 public static void main(String[] args) throws Exception {
4
5 Configuration conf1 = new Configuration();
6 Configuration conf2 = new Configuration();
7 FileSystem fs1 = FileSystem.get(conf1);
8 FileSystem fs2 = FileSystem.get(conf2);
9 Job job1 = Job.getInstance(conf1);
10 Job job2 = Job.getInstance(conf2);
11
12 job1.setJarByClass(MoviesDemo2.class);
13 job1.setMapperClass(MoviesDemo2Mapper1.class);
14 job2.setMapperClass(MoviesDemo2Mapper2.class);
15 job1.setReducerClass(MoviesDemo2Reducer1.class);
16 job2.setReducerClass(MoviesDemo2Reducer2.class);
17
18 job1.setOutputKeyClass(Text.class);
19 job1.setOutputValueClass(DoubleWritable.class);
20
21 job2.setOutputKeyClass(MoviesSexBean.class);
22 job2.setOutputValueClass(NullWritable.class);
23
24 job2.setGroupingComparatorClass(MoviesSexGC.class);
25
26 Path inputPath1 = new Path("D:\\MR\\hw\\movie\\output3he1");
27 Path outputPath1 = new Path("D:\\MR\\hw\\movie\\output2_1");
28 Path inputPath2 = new Path("D:\\MR\\hw\\movie\\output2_1");
29 Path outputPath2 = new Path("D:\\MR\\hw\\movie\\output2_end");
30
31 if(fs1.exists(outputPath1)) {
32 fs1.delete(outputPath1,true);
33 }
34 if(fs2.exists(outputPath2)) {
35 fs2.delete(outputPath2,true);
36 }
37
38
39 FileInputFormat.setInputPaths(job1, inputPath1);
40 FileOutputFormat.setOutputPath(job1, outputPath1);
41
42 FileInputFormat.setInputPaths(job2, inputPath2);
43 FileOutputFormat.setOutputPath(job2, outputPath2);
44
45 JobControl control = new JobControl("MoviesDemo2");
46
47 ControlledJob aJob = new ControlledJob(job1.getConfiguration());
48 ControlledJob bJob = new ControlledJob(job2.getConfiguration());
49
50 bJob.addDependingJob(aJob);
51
52 control.addJob(aJob);
53 control.addJob(bJob);
54
55 Thread thread = new Thread(control);
56 thread.start();
57
58 while(!control.allFinished()) {
59 thread.sleep(1000);
60 }
61 System.exit(0);
62
63
64 }
65
66
67 /**
68 * 数据来源:3个文件关联之后的输出文件
69 * 以电影名+性别为key,以评分为value进行输出
70 *
71 * 1000::1036::4::975040964::Die Hard (1988)::Action|Thriller::F::25::6::90027
72 *
73 * 用户ID::电影ID::评分::评分时间戳::电影名字::电影类型::性别::年龄::职业::邮政编码
74 *
75 * */
76 public static class MoviesDemo2Mapper1 extends Mapper<LongWritable, Text, Text, DoubleWritable>{
77
78 Text outKey = new Text();
79 DoubleWritable outValue = new DoubleWritable();
80
81 @Override
82 protected void map(LongWritable key, Text value,Context context)
83 throws IOException, InterruptedException {
84
85 String[] split = value.toString().split("::");
86 String strKey = split[4]+"\t"+split[6];
87 String strValue = split[2];
88
89 outKey.set(strKey);
90 outValue.set(Double.parseDouble(strValue));
91
92 context.write(outKey, outValue);
93 }
94
95 }
96
97 /**
98 * 以电影名+性别为key,计算平均分
99 * */
100 public static class MoviesDemo2Reducer1 extends Reducer<Text, DoubleWritable, Text, DoubleWritable>{
101
102 DoubleWritable outValue = new DoubleWritable();
103
104 @Override
105 protected void reduce(Text key, Iterable<DoubleWritable> values,Context context)
106 throws IOException, InterruptedException {
107
108 int count = 0;
109 double sum = 0;
110 for(DoubleWritable value : values) {
111 count++;
112 sum += Double.parseDouble(value.toString());
113 }
114 double avg = sum / count;
115
116 outValue.set(avg);
117 context.write(key, outValue);
118 }
119 }
120
121 /**
122 * 以电影名+性别+评分作为对象,以性别分组,以评分降序排序
123 * */
124 public static class MoviesDemo2Mapper2 extends Mapper<LongWritable, Text, MoviesSexBean, NullWritable>{
125
126 MoviesSexBean outKey = new MoviesSexBean();
127
128 @Override
129 protected void map(LongWritable key, Text value,Context context)
130 throws IOException, InterruptedException {
131
132 String[] split = value.toString().split("\t");
133 outKey.setMovieName(split[0]);
134 outKey.setSex(split[1]);
135 outKey.setScore(Double.parseDouble(split[2]));
136
137 context.write(outKey, NullWritable.get());
138
139 }
140 }
141
142 /**
143 * 取性别男女各前10名评分最好的电影
144 * */
145 public static class MoviesDemo2Reducer2 extends Reducer<MoviesSexBean, NullWritable, MoviesSexBean, NullWritable>{
146
147 @Override
148 protected void reduce(MoviesSexBean key, Iterable<NullWritable> values,Context context)
149 throws IOException, InterruptedException {
150
151 int count = 0;
152 for(NullWritable nvl : values) {
153 count++;
154 context.write(key, NullWritable.get());
155 if(count == 10) {
156 return;
157 }
158 }
159
160 }
161 }
162 }
对象:MoviesSexBean.java
1 public class MoviesSexBean implements WritableComparable<MoviesSexBean>{
2
3 private String movieName;
4 private String sex;
5 private double score;
6
7 public MoviesSexBean() {
8 super();
9 }
10 public MoviesSexBean(String movieName, String sex, double score) {
11 super();
12 this.movieName = movieName;
13 this.sex = sex;
14 this.score = score;
15 }
16 public String getMovieName() {
17 return movieName;
18 }
19 public void setMovieName(String movieName) {
20 this.movieName = movieName;
21 }
22 public String getSex() {
23 return sex;
24 }
25 public void setSex(String sex) {
26 this.sex = sex;
27 }
28 public double getScore() {
29 return score;
30 }
31 public void setScore(double score) {
32 this.score = score;
33 }
34 @Override
35 public String toString() {
36 return movieName + "\t" + sex + "\t" + score ;
37 }
38 @Override
39 public void readFields(DataInput in) throws IOException {
40 movieName = in.readUTF();
41 sex = in.readUTF();
42 score = in.readDouble();
43 }
44 @Override
45 public void write(DataOutput out) throws IOException {
46 out.writeUTF(movieName);
47 out.writeUTF(sex);
48 out.writeDouble(score);
49 }
50 @Override
51 public int compareTo(MoviesSexBean o) {
52
53 int result = this.getSex().compareTo(o.getSex());
54 if(result == 0) {
55 double diff = this.getScore() - o.getScore();
56
57 if(diff == 0) {
58 return 0;
59 }else {
60 return diff > 0 ? -1 : 1;
61 }
62
63 }else {
64 return result > 0 ? -1 : 1;
65 }
66
67 }
68
69
70
71 }
分组:MoviesSexGC.java
1 public class MoviesSexGC extends WritableComparator{
2
3 public MoviesSexGC() {
4 super(MoviesSexBean.class,true);
5 }
6
7 @Override
8 public int compare(WritableComparable a, WritableComparable b) {
9
10 MoviesSexBean msb1 = (MoviesSexBean)a;
11 MoviesSexBean msb2 = (MoviesSexBean)b;
12
13 return msb1.getSex().compareTo(msb2.getSex());
14 }
15
16 }
3、求movieid = 2116这部电影各年龄段(因为年龄就只有7个,就按这个7个分就好了)的平均影评(年龄段,评分)
以第二部三表联合之后的文件进行操作
1 public class MovieDemo3 {
2
3 public static void main(String[] args) throws Exception {
4
5 Configuration conf = new Configuration();
6 FileSystem fs = FileSystem.get(conf);
7 Job job = Job.getInstance(conf);
8
9 job.setJarByClass(MovieDemo3.class);
10 job.setMapperClass(MovieDemo3Mapper.class);
11 job.setReducerClass(MovieDemo3Reducer.class);
12
13 job.setOutputKeyClass(Text.class);
14 job.setOutputValueClass(DoubleWritable.class);
15
16 Path inputPath = new Path("D:\\MR\\hw\\movie\\3he1");
17 Path outputPath = new Path("D:\\MR\\hw\\movie\\outpu3");
18
19 if(fs.exists(outputPath)) {
20 fs.delete(outputPath,true);
21 }
22
23 FileInputFormat.setInputPaths(job, inputPath);
24 FileOutputFormat.setOutputPath(job, outputPath);
25
26 boolean isDone = job.waitForCompletion(true);
27 System.exit(isDone ? 0 : 1);
28
29 }
30
31
32 /**
33 * 1000::1036::4::975040964::Die Hard (1988)::Action|Thriller::F::25::6::90027
34 *
35 * 用户ID::电影ID::评分::评分时间戳::电影名字::电影类型::性别::年龄::职业::邮政编码
36 * 0 1 2 3 4 5 6 7 8 9
37 *
38 * key:电影ID+电影名字+年龄段
39 * value:评分
40 * 求movieid = 2116这部电影各年龄段
41 * */
42 public static class MovieDemo3Mapper extends Mapper<LongWritable, Text, Text, DoubleWritable>{
43
44 Text outKey = new Text();
45 DoubleWritable outValue = new DoubleWritable();
46
47 @Override
48 protected void map(LongWritable key, Text value, Context context)
49 throws IOException, InterruptedException {
50
51 String[] split = value.toString().split("::");
52 int movieID = Integer.parseInt(split[1]);
53
54 if(movieID == 2116) {
55 String strKey = split[1]+"\t"+split[4]+"\t"+split[7];
56 String strValue = split[2];
57
58 outKey.set(strKey);
59 outValue.set(Double.parseDouble(strValue));
60
61 context.write(outKey, outValue);
62 }
63
64 }
65 }
66
67
68
69 /**
70 * 对map的输出结果求平均评分
71 * */
72 public static class MovieDemo3Reducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable>{
73
74 DoubleWritable outValue = new DoubleWritable();
75
76 @Override
77 protected void reduce(Text key, Iterable<DoubleWritable> values, Context context)
78 throws IOException, InterruptedException {
79
80 int count = 0;
81 double sum = 0;
82
83 for(DoubleWritable value : values) {
84 count++;
85 sum += Double.parseDouble(value.toString());
86 }
87
88 double avg = sum / count;
89
90 outValue.set(avg);
91
92 context.write(key, outValue);
93
94 }
95
96 }
97
98 }
4、求最喜欢看电影(影评次数最多)的那位女性评最高分的10部电影的平均影评分(人,电影名,影评)
1000 :: 1036 :: 4 :: 975040964 :: Die Hard (1988) :: Action|Thriller :: F :: 25 :: 6 :: 90027
用户ID 电影ID 评分 评分时间戳 电影名字 电影类型 性别 年龄 职业 邮政编码
0 1 2 3 4 5 6 7 8 9
(1)求出评论次数最多的女性ID
MoviesDemo4_1.java
1 public class MoviesDemo4 {
2
3 public static void main(String[] args) throws Exception {
4
5 Configuration conf1 = new Configuration();
6 FileSystem fs1 = FileSystem.get(conf1);
7 Job job1 = Job.getInstance(conf1);
8
9 job1.setJarByClass(MoviesDemo4.class);
10 job1.setMapperClass(MoviesDemo4Mapper1.class);
11 job1.setReducerClass(MoviesDemo4Reducer1.class);
12
13
14 job1.setMapOutputKeyClass(Text.class);
15 job1.setMapOutputValueClass(Text.class);
16 job1.setOutputKeyClass(Text.class);
17 job1.setOutputValueClass(DoubleWritable.class);
18
19
20 Configuration conf2 = new Configuration();
21 FileSystem fs2 = FileSystem.get(conf2);
22 Job job2 = Job.getInstance(conf2);
23
24 job2.setJarByClass(MoviesDemo4.class);
25 job2.setMapperClass(MoviesDemo4Mapper2.class);
26 job2.setReducerClass(MoviesDemo4Reducer2.class);
27
28 job2.setMapOutputKeyClass(Moviegoers.class);
29 job2.setMapOutputValueClass(NullWritable.class);
30 job2.setOutputKeyClass(Moviegoers.class);
31 job2.setOutputValueClass(NullWritable.class);
32
33 Path inputPath1 = new Path("D:\\MR\\hw\\movie\\3he1");
34 Path outputPath1 = new Path("D:\\MR\\hw\\movie\\outpu4_1");
35
36 if(fs1.exists(outputPath1)) {
37 fs1.delete(outputPath1,true);
38 }
39
40 FileInputFormat.setInputPaths(job1, inputPath1);
41 FileOutputFormat.setOutputPath(job1, outputPath1);
42
43
44 Path inputPath2 = new Path("D:\\MR\\hw\\movie\\outpu4_1");
45 Path outputPath2 = new Path("D:\\MR\\hw\\movie\\outpu4_2");
46
47 if(fs2.exists(outputPath2)) {
48 fs2.delete(outputPath2,true);
49 }
50
51 FileInputFormat.setInputPaths(job2, inputPath2);
52 FileOutputFormat.setOutputPath(job2, outputPath2);
53
54 JobControl control = new JobControl("MoviesDemo4");
55
56 ControlledJob ajob = new ControlledJob(job1.getConfiguration());
57 ControlledJob bjob = new ControlledJob(job2.getConfiguration());
58
59 bjob.addDependingJob(ajob);
60
61 control.addJob(ajob);
62 control.addJob(bjob);
63
64 Thread thread = new Thread(control);
65 thread.start();
66
67 while(!control.allFinished()) {
68 thread.sleep(1000);
69 }
70 System.exit(0);
71 }
72
73 /**
74 * 1000::1036::4::975040964::Die Hard (1988)::Action|Thriller::F::25::6::90027
75 *
76 * 用户ID::电影ID::评分::评分时间戳::电影名字::电影类型::性别::年龄::职业::邮政编码
77 * 0 1 2 3 4 5 6 7 8 9
78 *
79 * 1、key:用户ID
80 * 2、value:电影名+评分
81 *
82 * */
83 public static class MoviesDemo4Mapper1 extends Mapper<LongWritable, Text, Text, Text>{
84
85 Text outKey = new Text();
86 Text outValue = new Text();
87
88 @Override
89 protected void map(LongWritable key, Text value, Context context)
90 throws IOException, InterruptedException {
91
92 String[] split = value.toString().split("::");
93
94 String strKey = split[0];
95 String strValue = split[4]+"\t"+split[2];
96
97 if(split[6].equals("F")) {
98 outKey.set(strKey);
99 outValue.set(strValue);
100 context.write(outKey, outValue);
101 }
102
103 }
104
105 }
106
107 //统计每位女性的评论总数
108 public static class MoviesDemo4Reducer1 extends Reducer<Text, Text, Text, IntWritable>{
109
110 IntWritable outValue = new IntWritable();
111
112 @Override
113 protected void reduce(Text key, Iterable<Text> values, Context context)
114 throws IOException, InterruptedException {
115
116 int count = 0;
117 for(Text value : values) {
118 count++;
119 }
120 outValue.set(count);
121 context.write(key, outValue);
122 }
123
124 }
125
126 //对第一次MapReduce的输出结果进行降序排序
127 public static class MoviesDemo4Mapper2 extends Mapper<LongWritable, Text,Moviegoers,NullWritable>{
128
129 Moviegoers outKey = new Moviegoers();
130
131 @Override
132 protected void map(LongWritable key, Text value, Context context)
133 throws IOException, InterruptedException {
134
135 String[] split = value.toString().split("\t");
136
137 outKey.setName(split[0]);
138 outKey.setCount(Integer.parseInt(split[1]));
139 context.write(outKey, NullWritable.get());
140 }
141
142 }
143
144 //排序之后取第一个值(评论最多的女性ID和评论次数)
145 public static class MoviesDemo4Reducer2 extends Reducer<Moviegoers,NullWritable, Moviegoers,NullWritable>{
146
147 int count = 0;
148
149 @Override
150 protected void reduce(Moviegoers key, Iterable<NullWritable> values,Context context)
151 throws IOException, InterruptedException {
152
153 for(NullWritable nvl : values) {
154 count++;
155 if(count > 1) {
156 return;
157 }
158 context.write(key, nvl);
159 }
160
161 }
162
163 }
164
165
166 }
(2)
|