Spark SQL支持多种数据源,如JDBC、HDFS、HBase。它的内部组件,如SQL的语法解析器、分析器等支持重定义进行扩展,能更好的满足不同的业务场景。与Spark Core无缝集成,提供了DataSet/DataFrame的可编程抽象数据模型,并且可被视为一个分布式的SQL查询引擎。 DataSet/DataFrame DataSet/DataFrame都是Spark SQL提供的分布式数据集,相对于RDD而言,除了记录数据以外,还记录表的schema信息。 DataFrame是DataSet以命名列方式组织的分布式数据集,类似于RDBMS中的表,或者R和Python中的 data frame。DataFrame API支持Scala、Java、Python、R。在Scala API中,DataFrame变成类型为Row的Dataset: type DataFrame = Dataset[Row]。 DataFrame在编译期不进行数据中字段的类型检查,在运行期进行检查。但DataSet则与之相反,因为它是强类型的。此外,二者都是使用catalyst进行sql的解析和优化。为了方便,以下统一使用DataSet统称。 DataSet创建 DataSet通常通过加载外部数据或通过RDD转化创建。 1.加载外部数据 以加载json和mysql为例: ..() ..() .(( , , , , )).() 2.RDD转换为DataSet 通过RDD转化创建DataSet,关键在于为RDD指定schema,通常有两种方式(伪代码): ) .().(.()) ) (:, :, :) ) .( (()., (), ().)) ) . (.().( (, , ))) .(((),())) .(,) 操作DataSet的两种风格语法 DSL语法 1.查询DataSet部分列中的内容 personDS.select(col("name")) personDS.select(col("name"), col("age")) 2.查询所有的name和age和salary,并将salary加1000 personDS.select(col("name"), col("age"), col("salary") + 1000) personDS.select(personDS("name"), personDS("age"), personDS("salary") + 1000) 3.过滤age大于18的 personDS.filter(col("age") > 18) 4.按年龄进行分组并统计相同年龄的人数 personDS.groupBy("age").count() 注意:直接使用col方法需要import org.apache.spark.sql.functions._ SQL语法 如果想使用SQL风格的语法,需要将DataSet注册成表 personDS.registerTempTable("person") //查询年龄最大的前两名 val result = sparkSession.sql("select * from person order by age desc limit 2") //保存结果为json文件。注意:如果不指定存储格式,则默认存储为parquet result.write.format("json").save("hdfs://ip:port/res2") Spark SQL的几种使用方式 1.sparksql-shell交互式查询 就是利用Spark提供的shell命令行执行SQL 2.编程 首先要获取Spark SQL编程"入口":SparkSession(当然在早期版本中大家可能更熟悉的是SQLContext,如果是操作hive则为HiveContext)。这里以读取parquet为例: val spark = SparkSession.builder() val df = sparkSession.read.format("parquet").load("/路径/parquet文件") 然后就可以针对df进行业务处理了。 3.Thriftserver beeline客户端连接操作 启动spark-sql的thrift服务,sbin/start-thriftserver.sh,启动脚本中配置好Spark集群服务资源、地址等信息。然后通过beeline连接thrift服务进行数据处理。 hive-jdbc驱动包来访问spark-sql的thrift服务 在项目pom文件中引入相关驱动包,跟访问mysql等jdbc数据源类似。示例: .() .(, , ); { .() .() (.()) { (.()) } } { : .() } { () .() } Spark SQL 获取Hive数据 Spark SQL读取hive数据的关键在于将hive的元数据作为服务暴露给Spark。除了通过上面thriftserver jdbc连接hive的方式,也可以通过下面这种方式: 首先,配置 $HIVE_HOME/conf/hive-site.xml,增加如下内容: <property> <name>hive.metastore.uris</name> <value>thrift://ip:port</value> </property> 然后,启动hive metastore 最后,将hive-site.xml复制或者软链到$SPARK_HOME/conf/。如果hive的元数据存储在mysql中,那么需要将mysql的连接驱动jar包如mysql-connector-java-5.1.12.jar放到$SPARK_HOME/lib/下,启动spark-sql即可操作hive中的库和表。而此时使用hive元数据获取SparkSession的方式为: val spark = SparkSession.builder() .config(sparkConf).enableHiveSupport().getOrCreate() UDF、UDAF、Aggregator UDF UDF是最基础的用户自定义函数,以自定义一个求字符串长度的udf为例: {(:) .} ..(,) ..() .() .() UDAF 定义UDAF,需要继承抽象类UserDefinedAggregateFunction,它是弱类型的,下面的aggregator是强类型的。以求平均数为例: ....{, } ..... ..... ..... { : ((, ) :: ) : { ((, ) :: (, ) :: ) } : : (: ): { () () } (: , : ): { (.()) { () .() .() () .() } } (: , : ): { () .() .() () .() .() } (: ): .(). .() } ..(, ) ..() .() .() .() .() Aggregator ....{, , } ..... (: , : ) ( : , : ) [, , ] { : (, ) (: , : ): { . . . } (: , : ): { . . . . } (: ): .. . : [] . : [] . } ..().[] .() ..() .() .() |
|