分享

大数据Flink的SQL在API中进行操作

 昵称70680357 2020-06-30

使用Flink SQL/TABLE API完成下面功能。

有如下数据

字段分别为:姓名,年龄,性别,班级,考试成绩

zhangsan,18,man,1707e,81.5

lisi,22,woman,1707e,77.5

wangwu,28,woman,1707e,82.0

zhaoliu,24,man,1707e,73.5

qianqi,18,woman,1707e,91.0

maba,22,man,1707e,84.0

sunjiiu,27,woman,1707e,88.0

xiaoming,20,man,1710e,73.5

xiaohong,21,woman,1710e,80.0

xiaozhang,22,man,1710e,73.5

xiaoli,19,woman,1710e,92.0

xiaowang,26,man,1710e,86.5

使用flinktable读取以上数据,进行统计分析。

1.计算各班平均成绩;

2.计算总分最高的班级名称和总分;

3.计算各班最高成绩的学生姓名;

4.计算1707e成绩最高的女生姓名;

5.计算各班男生平均年龄,女生平均年龄;

6.计算1710e最小年龄学生姓名;

7.计算各班男生个数和女生个数。

 

import org.apache.flink.api.scala._
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.types.Row
object Job629 {
语言 方法
4958 N5eN49G4W1
htJg9 抖音如何刷粉丝
6212 2012/12/14 06:49:08

def main(args: Array[String]): Unit = {

//批处理环境
val env = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

//表环境
val tableEnvironment = TableEnvironment.getTableEnvironment(env)

//获取数据
val dataDS: DataSet[String] = env.readTextFile("src/main/resources/job629.txt")
//封装样例类
val stuDS = dataDS.map(x => {
val arr: Array[String] = x.split(",")
Student(arr(0).trim, arr(1).trim.toInt, arr(2).trim, arr(3).trim, arr(4).trim.toDouble)
})

//注册表
tableEnvironment.registerDataSet("stu",stuDS)

// 1.计算各班平均成绩;
val clazzAvg: Table = tableEnvironment.sqlQuery("select clazz,avg(score) from stu group by clazz")
val sql1: DataSet[Row] = tableEnvironment.toDataSet[Row](clazzAvg)
// sql1.print()

// 2.计算总分最高的班级名称和总分;
val scoreMaxClazz: Table = tableEnvironment.sqlQuery(
"""
select c2.clazz,c2.sumScore from
(select sum(score) sumScore from stu group by clazz) c1 join
(select clazz,sum(score) sumScore from stu group by clazz order by sumScore desc limit 1) c2
on c1.sumScore = c2.sumScore
""")
val sql2: DataSet[Row] = tableEnvironment.toDataSet[Row](scoreMaxClazz)
// sql2.print()

// 3.计算各班最高成绩的学生姓名;
val scoreMaxName: Table = tableEnvironment.sqlQuery(
"""
select s2.clazz,s1.name,s2.maxScore from stu s1 join
(select clazz,max(score) maxScore from stu group by clazz) s2
on s1.score = s2.maxScore
""")
val sql3: DataSet[Row] = tableEnvironment.toDataSet[Row](scoreMaxName)
//sql3.print()

// 4.计算1707e成绩最高的女生姓名;
val scoreMaxF: Table = tableEnvironment.sqlQuery(
"""
select s1.name,s2.maxScore from stu s1 join
(select max(score) maxScore from stu where clazz = '1707e' and sex = 'woman') s2
on s1.score = s2.maxScore
""")
val sql4: DataSet[Row] = tableEnvironment.toDataSet[Row](scoreMaxF)
//sql4.print()

// 5.计算各班男生平均年龄,女生平均年龄;
val avgAge: Table = tableEnvironment.sqlQuery(
"""
select clazz,sex,avg(age) from stu group by clazz,sex
""")
val sql5: DataSet[Row] = tableEnvironment.toDataSet[Row](avgAge)
//sql5.print()

// 6.计算1710e最小年龄学生姓名;
val minAgeName: Table = tableEnvironment.sqlQuery(
"""
select s1.name,s2.minAge from stu s1 join
(select min(age) minAge from stu where clazz = '1710e') s2
on s1.age = s2.minAge
""")
val sql6: DataSet[Row] = tableEnvironment.toDataSet[Row](minAgeName)
//sql6.print()

// 7.计算各班男生个数和女生个数。
val count: Table = tableEnvironment.sqlQuery(
"""
select clazz,sex,count(sex) from stu group by clazz,sex
""")
val sql7: DataSet[Row] = tableEnvironment.toDataSet[Row](count)
//sql7.print()



}
}

case class Student(name:String,age:Int,sex:String,clazz:String,score:Double)

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多