分享

大数据开发技术之Spark SQL的多种使用方法

 IT小白在线 2021-09-23

Spark SQL支持多种数据源,如JDBC、HDFS、HBase。它的内部组件,如SQL的语法解析器、分析器等支持重定义进行扩展,能更好的满足不同的业务场景。与Spark Core无缝集成,提供了DataSet/DataFrame的可编程抽象数据模型,并且可被视为一个分布式的SQL查询引擎。

DataSet/DataFrame

DataSet/DataFrame都是Spark SQL提供的分布式数据集,相对于RDD而言,除了记录数据以外,还记录表的schema信息。

DataFrame是DataSet以命名列方式组织的分布式数据集,类似于RDBMS中的表,或者R和Python中的 data frame。DataFrame API支持Scala、Java、Python、R。在Scala API中,DataFrame变成类型为Row的Dataset:

type DataFrame = Dataset[Row]。

DataFrame在编译期不进行数据中字段的类型检查,在运行期进行检查。但DataSet则与之相反,因为它是强类型的。此外,二者都是使用catalyst进行sql的解析和优化。为了方便,以下统一使用DataSet统称。

DataSet创建

DataSet通常通过加载外部数据或通过RDD转化创建。

1.加载外部数据

以加载json和mysql为例:

   ..()

   ..()
.((  ,
  ,
  ,   ,   )).()

2.RDD转换为DataSet

通过RDD转化创建DataSet,关键在于为RDD指定schema,通常有两种方式(伪代码):

 

) 
   .().(.())

)  
  (:, :, :)

)  
   .(  (()., (), ().))

) 
  .

 

  

    (.().(  (, , )))

   .(((),()))

   .(,)

操作DataSet的两种风格语法

DSL语法

1.查询DataSet部分列中的内容

personDS.select(col("name"))

personDS.select(col("name"), col("age"))

2.查询所有的name和age和salary,并将salary加1000

personDS.select(col("name"), col("age"), col("salary") + 1000)

personDS.select(personDS("name"), personDS("age"), personDS("salary") + 1000)

3.过滤age大于18的

personDS.filter(col("age") > 18)

4.按年龄进行分组并统计相同年龄的人数

personDS.groupBy("age").count()

注意:直接使用col方法需要import org.apache.spark.sql.functions._

SQL语法

如果想使用SQL风格的语法,需要将DataSet注册成表

personDS.registerTempTable("person")

//查询年龄最大的前两名

val result = sparkSession.sql("select * from person order by age desc limit 2")

//保存结果为json文件。注意:如果不指定存储格式,则默认存储为parquet

result.write.format("json").save("hdfs://ip:port/res2")

Spark SQL的几种使用方式

1.sparksql-shell交互式查询

就是利用Spark提供的shell命令行执行SQL

2.编程

首先要获取Spark SQL编程"入口":SparkSession(当然在早期版本中大家可能更熟悉的是SQLContext,如果是操作hive则为HiveContext)。这里以读取parquet为例:

val spark = SparkSession.builder()
.appName("example").master("local[*]").getOrCreate();

val df = sparkSession.read.format("parquet").load("/路径/parquet文件")

然后就可以针对df进行业务处理了。

3.Thriftserver

beeline客户端连接操作

启动spark-sql的thrift服务,sbin/start-thriftserver.sh,启动脚本中配置好Spark集群服务资源、地址等信息。然后通过beeline连接thrift服务进行数据处理。

hive-jdbc驱动包来访问spark-sql的thrift服务

在项目pom文件中引入相关驱动包,跟访问mysql等jdbc数据源类似。示例:

.()
   .(, , );
 {
     .()
     .()
   (.()) {
    (.())
  }
}  {
   :   .()
} {
  () .()
}

Spark SQL 获取Hive数据

Spark SQL读取hive数据的关键在于将hive的元数据作为服务暴露给Spark。除了通过上面thriftserver jdbc连接hive的方式,也可以通过下面这种方式:

首先,配置 $HIVE_HOME/conf/hive-site.xml,增加如下内容:

<property>

<name>hive.metastore.uris</name>

<value>thrift://ip:port</value>

</property>

然后,启动hive metastore

最后,将hive-site.xml复制或者软链到$SPARK_HOME/conf/。如果hive的元数据存储在mysql中,那么需要将mysql的连接驱动jar包如mysql-connector-java-5.1.12.jar放到$SPARK_HOME/lib/下,启动spark-sql即可操作hive中的库和表。而此时使用hive元数据获取SparkSession的方式为:

val spark = SparkSession.builder()

.config(sparkConf).enableHiveSupport().getOrCreate()

UDF、UDAF、Aggregator

UDF

UDF是最基础的用户自定义函数,以自定义一个求字符串长度的udf为例:

   {(:)  .}
..(,)
  ..()
.()
.()

UDAF

定义UDAF,需要继承抽象类UserDefinedAggregateFunction,它是弱类型的,下面的aggregator是强类型的。以求平均数为例:

 ....{, }
 .....
 .....
 .....

    {
  
   :   ((, ) :: )
  
   :   {
    ((, ) :: (, ) :: )
  }
  
   :   
  
   :   
  
  
  
  
   (: ):   {
    ()  
    ()  
  }
  
   (: , : ):   {
     (.()) {
      ()  .()  .()
      ()  .()  
    }
  }
  
   (: , : ):   {
    ()  .()  .()
    ()  .()  .()
  }
  
   (: ):   .().  .()
}


..(, )

   ..()
.()
.()
   .()
.()

Aggregator

 ....{, , }
 .....

  (: , : )
  ( : ,  : )

   [, , ] {
  
   :   (, )
  
  
   (: , : ):   {
    .  .
    .  
    
  }
  
   (: , : ):   {
    .  .
    .  .
    
  }
  
   (: ):   ..  .
  
   : []  .
  
   : []  .
}

   ..().[]
.()

   ..()
   .()
.()

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多