1、Spark SQL窗口函数解析 2、Spark SQL窗口函数实战 /**
* Scala代码
*/
package com.tom.spark.sql
import org.apache.spark.sql.DataFrame
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
object SparkSQLWindowFunctionOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("SparkSQLWindowFunctionOps")
val sc = new SparkContext(conf)
val hiveContext = new HiveContext(sc)
hiveContext.sql("use hive") //使用名称为hive的数据库,接下来所有的表操作都位于这个库
/**
* 如果要创建的表存在的话就删除,然后创建我们要导入数据的表
*/
hiveContext.sql("DROP TABLE IF EXISTS scores")
hiveContext.sql("CREATE TABLE IF NOT EXISTS scores(name STRING,score INTEGER)"
+ "ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' LINES TERMINATED BY '\\n'")
//把要处理的数据导入到Hive的表中
hiveContext.sql("LOAD DATA LOCAL INPATH '/root/Documents/SparkApps/resources/topNGroup.txt' INTO TABLE scores")
/**
* 使用子查询的方式完成目标数据的提取,在目标数据内部使用窗口函数row_number来进行分组排序
* PARTITION BY:指定窗口函数分组的Key;
* ORDER BY:分组后进行排序;
*
*/
val result = hiveContext.sql("SELECT name, score FROM (" +
"SELECT name, score, row_number() OVER (PARTITION BY name ORDER BY score DESC) rank FROM scores" +
") sub_scores " +
"WHERE rank<=4")
result.show()
//把数据保存到Hive数据仓库中
hiveContext.sql("DROP TABLE EXISTS sortedResultScores")
result.saveAsTable("sortedResultScores")
}
}
|