使用Flink SQL/TABLE API完成下面功能。 有如下数据 字段分别为:姓名,年龄,性别,班级,考试成绩 zhangsan,18,man,1707e,81.5 lisi,22,woman,1707e,77.5 wangwu,28,woman,1707e,82.0 zhaoliu,24,man,1707e,73.5 qianqi,18,woman,1707e,91.0 maba,22,man,1707e,84.0 sunjiiu,27,woman,1707e,88.0 xiaoming,20,man,1710e,73.5 xiaohong,21,woman,1710e,80.0 xiaozhang,22,man,1710e,73.5 xiaoli,19,woman,1710e,92.0 xiaowang,26,man,1710e,86.5 使用flinktable读取以上数据,进行统计分析。 1.计算各班平均成绩; 2.计算总分最高的班级名称和总分; 3.计算各班最高成绩的学生姓名; 4.计算1707e成绩最高的女生姓名; 5.计算各班男生平均年龄,女生平均年龄; 6.计算1710e最小年龄学生姓名; 7.计算各班男生个数和女生个数。
import org.apache.flink.api.scala._
def main(args: Array[String]): Unit = { //批处理环境 val env = ExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //表环境 val tableEnvironment = TableEnvironment.getTableEnvironment(env) //获取数据 val dataDS: DataSet[String] = env.readTextFile("src/main/resources/job629.txt") //封装样例类 val stuDS = dataDS.map(x => { val arr: Array[String] = x.split(",") Student(arr(0).trim, arr(1).trim.toInt, arr(2).trim, arr(3).trim, arr(4).trim.toDouble) }) //注册表 tableEnvironment.registerDataSet("stu",stuDS) // 1.计算各班平均成绩; val clazzAvg: Table = tableEnvironment.sqlQuery("select clazz,avg(score) from stu group by clazz") val sql1: DataSet[Row] = tableEnvironment.toDataSet[Row](clazzAvg) // sql1.print() // 2.计算总分最高的班级名称和总分; val scoreMaxClazz: Table = tableEnvironment.sqlQuery( """ select c2.clazz,c2.sumScore from (select sum(score) sumScore from stu group by clazz) c1 join (select clazz,sum(score) sumScore from stu group by clazz order by sumScore desc limit 1) c2 on c1.sumScore = c2.sumScore """) val sql2: DataSet[Row] = tableEnvironment.toDataSet[Row](scoreMaxClazz) // sql2.print() // 3.计算各班最高成绩的学生姓名; val scoreMaxName: Table = tableEnvironment.sqlQuery( """ select s2.clazz,s1.name,s2.maxScore from stu s1 join (select clazz,max(score) maxScore from stu group by clazz) s2 on s1.score = s2.maxScore """) val sql3: DataSet[Row] = tableEnvironment.toDataSet[Row](scoreMaxName) //sql3.print() // 4.计算1707e成绩最高的女生姓名; val scoreMaxF: Table = tableEnvironment.sqlQuery( """ select s1.name,s2.maxScore from stu s1 join (select max(score) maxScore from stu where clazz = '1707e' and sex = 'woman') s2 on s1.score = s2.maxScore """) val sql4: DataSet[Row] = tableEnvironment.toDataSet[Row](scoreMaxF) //sql4.print() // 5.计算各班男生平均年龄,女生平均年龄; val avgAge: Table = tableEnvironment.sqlQuery( """ select clazz,sex,avg(age) from stu group by clazz,sex """) val sql5: DataSet[Row] = tableEnvironment.toDataSet[Row](avgAge) //sql5.print() // 6.计算1710e最小年龄学生姓名; val minAgeName: Table = tableEnvironment.sqlQuery( """ select s1.name,s2.minAge from stu s1 join (select min(age) minAge from stu where clazz = '1710e') s2 on s1.age = s2.minAge """) val sql6: DataSet[Row] = tableEnvironment.toDataSet[Row](minAgeName) //sql6.print() // 7.计算各班男生个数和女生个数。 val count: Table = tableEnvironment.sqlQuery( """ select clazz,sex,count(sex) from stu group by clazz,sex """) val sql7: DataSet[Row] = tableEnvironment.toDataSet[Row](count) //sql7.print() } } case class Student(name:String,age:Int,sex:String,clazz:String,score:Double) |
|
来自: 昵称70680357 > 《待分类》