【注】该系列文章以及使用到安装包/测试数据 可以在《 倾情大奉送--Spark入门实战系列 》获取1.1 运行环境说明1.1.1 硬软件环境l 主机操作系统: Windows 64 位,双核 4 线程,主频 2.2G , 10G 内存 l 虚拟软件: VMware? Workstation 9.0.0 build-812388 l 虚拟机操作系统: CentOS6.5 64 位,单核 l 虚拟机运行环境: JDK : 1.7.0_55 64 位 Hadoop : 2.2.0 (需要编译为 64 位) Scala : 2.10.4 Spark : 1.1.0 (需要编译) Hive : 0.13.1 (源代码编译,参见 1.2 ) 1.1.2 集群网络环境本次实验环境只需要 hadoop1 一台机器即可,网络环境配置如下:
1.2 编译 Hive1.2.1 下载 Hive 源代码包这里选择下载的版本为 hive-0.13.1 ,这个版本需要到 apache 的归档服务器下载,下载地址: http://archive./dist/hive/hive-0.13.1/ ,选择 apache-hive-0.13.1-src.tar.gz 文件进行下载:
1.2.2 上传 Hive 源代码包把下载的 hive-0.13.0.tar.gz 安装包,使用 SSH Secure File Transfer 工具(参见第 2 课《 Spark 编译与部署(上) -- 基础环境搭建》 1.3.1 介绍)上传到 /home/hadoop/upload 目录下。 1.2.3 解压缩并移动到编译目录到上传目录下,用如下命令解压缩 hive 安装文件: $cd /home/hadoop/upload $tar -zxf apache-hive-0.13.1-src.tar.gz 改名并移动到 /app/complied 目录下: $sudo mv apache-hive-0.13.1-src /app/complied/hive-0.13.1-src $ll /app/complied 1.2.4 编译 Hive编译 Hive 源代码的时候,需要从网上下载依赖包,所以整个编译过程机器必须保证在联网状态。编译执行如下脚本: $cd /app/complied/hive-0.13.1-src/ $export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m" $mvn -Phadoop-2,dist -Dmaven.test.skip=true clean package
在编译过程中可能出现速度慢或者中断,可以再次启动编译,编译程序会在上次的编译中断处继续进行编译,整个编译过程耗时与网速紧密相关,网速较快的情况需要 1 个小时左右(上图的时间是多次编译后最后成功的界面)。最终编译的结果为 $HIVE_HOME/packaging/target/apache-hive-0.13.1-bin.tar.gz 通过如下命令查看最终编译完成整个目录大小,可以看到大小为 353.6M 左右 $du -s /app/complied/hive-0.13.1-src
【注】 已经编译好的 Hive 包在本系列配套资源 /install/6.hive-0.13.1-src.tar.gz ,读者直接使用 1.3 首次运行 hive-console1.3.1 获取 Spark 源代码由于首次运行 hive-console 需要在 Spark 源代码进行编译,关于 Spark 源代码的获取可以参考第二课《 Spark 编译与部署(下) --Spark 编译安装》方式进行获取,连接地址为 http://spark./downloads.html ,获取源代码后把 Spark 源代码移动到 /app/complied 目录,并命名为 spark-1.1.0-hive 1.3.2 配置 /etc/profile 环境变量第一步 使用如下命令打开 /etc/profile 文件: $sudo vi /etc/profile 第二步 设置如下参数: export HADOOP_HOME=/app/hadoop/hadoop-2.2.0 export HIVE_HOME=/app/complied/hive-0.13.1-src export HIVE_DEV_HOME=/app/complied/hive-0.13.1-src
第三步 生效配置并验证 $sudo vi /etc/profile $echo $HIVE_DEV_HOME 1.3.3 运行 sbt 进行编译运行 hive/console 不需要启动 Spark ,需要进入到 Spark 根目录下使用 sbt/sbt hive/console 进行首次运行编译,编译以后下次可以直接启动。编译 Spark 源代码的时候,需要从网上下载依赖包,所以整个编译过程机器必须保证在联网状态。编译命令如下: $cd /app/complied/spark-1.1.0-hive $sbt/sbt hive/console
编译时间会很长,在编译过程中可能出现速度慢或者中断,可以再次启动编译,编译程序会在上次的编译中断处继续进行编译,整个编译过程耗时与网速紧密相关。
通过如下命令查看最终编译完成整个目录大小,可以看到大小为 267.9M 左右 $du -s /app/complied/spark-1.1.0-hive
【注】 已经编译好的 Spark for hive-console 包在本系列配套资源 /install/6.spark-1.1.0-hive.tar.gz ,可直接使用 1.4 使用 hive-console1.4.1 启动 hive-console进入到 spark 根目录下,使用如下命令启动 hive-console $cd /app/complied/spark-1.1.0-hive $sbt/sbt hive/console
1.4.2 辅助命令 Help 和 Tab 键可以使用 :help 查看帮助内容 scala>:help
可以使用 tab 键查看所有可使用命令、函数
1.4.3 常用操作首先定义 Person 类,在该类中定义 name 、 age 和 state 三个列,然后把该类注册为 people 表并装载数据,最后通过查询到数据存放到 query 中 scala>case class Person(name:String, age:Int, state:String) scala>sparkContext.parallelize(Person("Michael",29,"CA")::Person("Andy",30,"NY")::Person("Justin",19,"CA")::Person("Justin",25,"CA")::Nil).registerTempTable("people")
scala>val query= sql("select * from people")
1.4.3.1 查看查询的 schemascala>query.printSchema scala>query.collect()
1.4.3.2 查看查询的整个运行计划scala>query.queryExecution
1.4.4 查看查询的 Unresolved LogicalPlanscala>query.queryExecution.logical
1.4.4.1 查看查询的 Analyzed LogicalPlanscala>query.queryExecution.analyzed
1.4.4.2 查看优化后的 LogicalPlanscala>query.queryExecution.optimizedPlan
1.4.4.3 查看物理计划scala>query.queryExecution.sparkPlan
1.4.4.4 查看 RDD 的转换过程scala>query.toDebugString
1.4.5 不同数据源的运行计划上面常用操作里介绍了源自 RDD 的数据, SparkSQL 也可以源自多个数据源: jsonFile 、 parquetFile 和 Hive 等。 1.4.5.1 读取 Json 格式数据第一步 Json 测试数据 Json 文件支持嵌套表, SparkSQL 也可以读入嵌套表,如下面形式的 Json 数据,可以使用 jsonFile 读入 SparkSQL 。该文件可以在配套资源 /data/class6 中找到,在以下测试中把文件放到 /home/hadoop/upload/class6 路径中 { "fullname": "Sean Kelly", "org": "SK Consulting", "emailaddrs": [ {"type": "work", "value": "kelly@"}, {"type": "home", "pref": 1, "value": "kelly@"} ], "telephones": [ {"type": "work", "pref": 1, "value": "+1 214 555 1212"}, {"type": "fax", "value": "+1 214 555 1213"}, {"type": "mobile", "value": "+1 214 555 1214"} ], "addresses": [ {"type": "work", "format": "us", "value": "1234 Main StnSpringfield, TX 78080-1216"}, {"type": "home", "format": "us", "value": "5678 Main StnSpringfield, TX 78080-1316"} ], "urls": [ {"type": "work", "value": "http:///"}, {"type": "home", "value": "http:///"} ] } 第二步 读入 Json 数据 使用 jsonFile 读入数据并注册成表 jsonPerson ,然后定义一个查询 jsonQuery scala>jsonFile("/home/hadoop/upload/class6/nestjson.json").registerTempTable("jsonPerson") scala>val jsonQuery = sql("select * from jsonPerson")
第三步 查看 jsonQuery 的 schema scala>jsonQuery.printSchema
第四步 查看 jsonQuery 的整个运行计划 scala>jsonQuery.queryExecution
1.4.5.2 读取 Parquet 格式数据Parquet 数据放在配套资源 /data/class6/wiki_parquet 中,在以下测试中把文件放到 /home/hadoop/upload/class6 路径下 第一步 读入 Parquet 数据 parquet 文件读入并注册成表 parquetWiki ,然后定义一个查询 parquetQuery scala>parquetFile("/home/hadoop/upload/class6/wiki_parquet").registerTempTable("parquetWiki") scala>val parquetQuery = sql("select * from parquetWiki")
有报错但不影响使用
第二步 查询 parquetQuery 的 schema scala>parquetQuery.printSchema
第三步 查询 parquetQuery 的整个运行计划 scala>parquetQuery.queryExecution
第四步 查询取样 scala>parquetQuery.takeSample(false,10,2)
1.4.5.3 读取 hive 内置测试数据在 TestHive 类中已经定义了大量的 hive0.13 的测试数据的表格式,如 src 、 sales 等等,在 hive-console 中可以直接使用;第一次使用的时候, hive-console 会装载一次。下面我们使用 sales 表看看其 schema 和整个运行计划。 第一步 读入测试数据并定义一个查询 hiveQuery scala>val hiveQuery = sql("select * from sales")
第二步 查看 hiveQuery 的 schema scala>hiveQuery.printSchema
第三步 查看 hiveQuery 的整个运行计划 scala>hiveQuery.queryExecution
第四步 其他 SQL 语句的运行计划 scala>val hiveQuery = sql("select * from (select * from src limit 5) a limit 3")
scala>val hiveQuery = sql("select * FROM (select * FROM src) a")
scala>hiveQuery.where('key === 100).queryExecution.toRdd.collect
1.4.6 不同查询的运行计划1.4.6.1 聚合查询scala>sql("select name, age,state as location from people").queryExecution
scala>sql("select name from (select name,state as location from people) a where location='CA'").queryExecution
scala>sql("select sum(age) from people").queryExecution scala>sql("select sum(age) from people").toDebugString
scala>sql("select state,avg(age) from people group by state").queryExecution scala>sql("select state,avg(age) from people group by state").toDebugString
1.4.6.2 Join 操作scala>sql("select a.name,b.name from people a join people b where a.name=b.name").queryExecution scala>sql("select a.name,b.name from people a join people b where a.name=b.name").toDebugString
1.4.6.3 Distinct 操作scala>sql("select distinct a.name,b.name from people a join people b where a.name=b.name").queryExecution scala>sql("select distinct a.name,b.name from people a join people b where a.name=b.name").toDebugString
1.4.7 优化1.4.7.1 CombineFiltersCombineFilters 就是合并 Filter ,在含有多个 Filter 时发生,如下查询: sql("select name from (select * from people where age >=19) a where a.age <30").queryExecution
上面的查询,在 Optimized 的过程中,将 age>=19 和 age<30 这两个 Filter 合并了,合并成 ((age>=19) && (age<30)) 。其实上面还做了一个其他的优化,就是 project 的下推,子查询使用了表的所有列,而主查询使用了列 name ,在查询数据的时候子查询优化成只查列 name 。 1.4.7.2 PushPredicateThroughProjectPushPredicateThroughProject 就是 project 下推,和上面例子中的 project 一样 sql("select name from (select name,state as location from people) a where location='CA'").queryExecution
1.4.7.3 ConstantFolding :ConstantFolding 是常量叠加,用于表达式。如下面的例子: sql("select name,1+2 from people").queryExecution
2 、 SparkSQL 调优Spark 是一个快速的内存计算框架,同时是一个并行运算的框架,在计算性能调优的时候,除了要考虑广为人知的木桶原理外,还要考虑平行运算的 Amdahl 定理。 木桶原理又称短板理论,其核心思想是:一只木桶盛水的多少,并不取决于桶壁上最高的那块木块,而是取决于桶壁上最短的那块。将这个理论应用到系统性能优化上,系统的最终性能取决于系统中性能表现最差的组件。例如,即使系统拥有充足的内存资源和 CPU 资源,但是如果磁盘 I/O 性能低下,那么系统的总体性能是取决于当前最慢的磁盘 I/O 速度,而不是当前最优越的 CPU 或者内存。在这种情况下,如果需要进一步提升系统性能,优化内存或者 CPU 资源是毫无用处的。只有提高磁盘 I/O 性能才能对系统的整体性能进行优化。
Amdahl 定理,一个计算机科学界的经验法则,因吉恩·阿姆达尔而得名。它代表了处理器平行运算之后效率提升的能力。并行计算中的加速比是用并行前的执行速度和并行后的执行速度之比来表示的,它表示了在并行化之后的效率提升情况。阿姆达尔定律是固定负载(计算总量不变时)时的量化标准。可用公式: 来表示。式中 分别表示问题规模的串行分量(问题中不能并行化的那一部分)和并行分量, p 表示处理器数量。当 时,上式的极限是 ,其中 。这意味着无论我们如何增大处理器数目,加速比是无法高于这个数的。 SparkSQL 作为 Spark 的一个组件,在调优的时候,也要充分考虑到上面的两个原理,既要考虑如何充分的利用硬件资源,又要考虑如何利用好分布式系统的并行计算。由于测试环境条件有限,本篇不能做出更详尽的实验数据来说明,只能在理论上加以说明。 2.1 并行性SparkSQL 在集群中运行,将一个查询任务分解成大量的 Task 分配给集群中的各个节点来运行。通常情况下, Task 的数量是大于集群的并行度。比如前面第六章和第七章查询数据时, shuffle 的时候使用了缺省的 spark.sql.shuffle.partitions ,即 200 个 partition ,也就是 200 个 Task :
而实验的集群环境却只能并行 3 个 Task ,也就是说同时只能有 3 个 Task 保持 Running :
这时大家就应该明白了,要跑完这 200 个 Task 就要跑 200/3=67 批次。如何减少运行的批次呢?那就要尽量提高查询任务的并行度。查询任务的并行度由两方面决定:集群的处理能力和集群的有效处理能力。 l 对于 Spark Standalone 集群来说,集群的处理能力是由 conf/spark-env 中的 SPARK_WORKER_INSTANCES 参数、 SPARK_WORKER_CORES 参数决定的;而 SPARK_WORKER_INSTANCES*SPARK_WORKER_CORES 不能超过物理机器的实际 CPU core ; l 集群的有效处理能力是指集群中空闲的集群资源,一般是指使用 spark-submit 或 spark-shell 时指定的 --total-executor-cores ,一般情况下,我们不需要指定,这时候, Spark Standalone 集群会将所有空闲的 core 分配给查询,并且在 Task 轮询运行过程中, Standalone 集群会将其他 spark 应用程序运行完后空闲出来的 core 也分配给正在运行中的查询。 综上所述, SparkSQL 的查询并行度主要和集群的 core 数量相关,合理配置每个节点的 core 可以提高集群的并行度,提高查询的效率。 2.2 高效的数据格式高效的数据格式,一方面是加快了数据的读入速度,另一方面可以减少内存的消耗。高效的数据格式包括多个方面: 2.2.1 数据本地性分布式计算系统的精粹在于移动计算而非移动数据,但是在实际的计算过程中,总存在着移动数据的情况,除非是在集群的所有节点上都保存数据的副本。移动数据,将数据从一个节点移动到另一个节点进行计算,不但消耗了网络 IO ,也消耗了磁盘 IO ,降低了整个计算的效率。为了提高数据的本地性,除了优化算法(也就是修改 spark 内存,难度有点高),就是合理设置数据的副本。设置数据的副本,这需要通过配置参数并长期观察运行状态才能获取的一个经验值。 下面是 Spark webUI 监控 Stage 的一个图: l PROCESS_LOCAL 是指读取缓存在本地节点的数据 l NODE_LOCAL 是指读取本地节点硬盘数据 l ANY 是指读取非本地节点数据 l 通常读取数据 PROCESS_LOCAL>NODE_LOCAL>ANY ,尽量使数据以 PROCESS_LOCAL 或 NODE_LOCAL 方式读取。其中 PROCESS_LOCAL 还和 cache 有关。
2.2.2 合适的数据类型对于要查询的数据,定义合适的数据类型也是非常有必要。对于一个 tinyint 可以使用的数据列,不需要为了方便定义成 int 类型,一个 tinyint 的数据占用了 1 个 byte ,而 int 占用了 4 个 byte 。也就是说,一旦将这数据进行缓存的话,内存的消耗将增加数倍。在 SparkSQL 里,定义合适的数据类型可以节省有限的内存资源。 2.2.3 合适的数据列对于要查询的数据,在写 SQL 语句的时候,尽量写出要查询的列名,如 Select a,b from tbl ,而不是使用 Select * from tbl ;这样不但可以减少磁盘 IO ,也减少缓存时消耗的内存。 2.2.4 优的数据存储格式在查询的时候,最终还是要读取存储在文件系统中的文件。采用更优的数据存储格式,将有利于数据的读取速度。查看 SparkSQL 的 Stage ,可以发现,很多时候,数据读取消耗占有很大的比重。对于 sqlContext 来说,支持 textFiile 、 SequenceFile 、 ParquetFile 、 jsonFile ;对于 hiveContext 来说,支持 AvroFile 、 ORCFile 、 Parquet File ,以及各种压缩。根据自己的业务需求,测试并选择合适的数据存储格式将有利于提高 SparkSQL 的查询效率。 2.3 内存的使用spark 应用程序最纠结的地方就是内存的使用了,也是最能体现“细节是魔鬼”的地方。 Spark 的内存配置项有不少,其中比较重要的几个是: l SPARK_WORKER_MEMORY ,在 conf/spark-env.sh 中配置 SPARK_WORKER_MEMORY 和 SPARK_WORKER_INSTANCES ,可以充分的利用节点的内存资源, SPARK_WORKER_INSTANCES*SPARK_WORKER_MEMORY 不要超过节点本身具备的内存容量; l executor-memory ,在 spark-shell 或 spark-submit 提交 spark 应用程序时申请使用的内存数量;不要超过节点的 SPARK_WORKER_MEMORY ; l spark.storage.memoryFraction spark 应用程序在所申请的内存资源中可用于 cache 的比例 l spark.shuffle.memoryFraction spark 应用程序在所申请的内存资源中可用于 shuffle 的比例 在实际使用上,对于后两个参数,可以根据常用查询的内存消耗情况做适当的变更。另外,在 SparkSQL 使用上,有几点建议: l 对于频繁使用的表或查询才进行缓存,对于只使用一次的表不需要缓存; l 对于 join 操作,优先缓存较小的表; l 要多注意 Stage 的监控,多思考如何才能更多的 Task 使用 PROCESS_LOCAL ; l 要多注意 Storage 的监控,多思考如何才能 Fraction cached 的比例更多
2.4 合适的 Task对于 SparkSQL ,还有一个比较重要的参数,就是 shuffle 时候的 Task 数量,通过 spark.sql.shuffle.partitions 来调节。调节的基础是 spark 集群的处理能力和要处理的数据量, spark 的默认值是 200 。 Task 过多,会产生很多的任务启动开销, Task 多少,每个 Task 的处理时间过长,容易 straggle 。 2.5 其他的一些建议优化的方面的内容很多,但大部分都是细节性的内容,下面就简单地提提: l 想要获取更好的表达式查询速度,可以将 spark.sql.codegen 设置为 Ture ; l 对于大数据集的计算结果,不要使用 collect() ,collect() 就结果返回给 driver ,很容易撑爆 driver 的内存;一般直接输出到分布式文件系统中; l 对于 Worker 倾斜,设置 spark.speculation=true 将持续不给力的节点去掉; l 对于数据倾斜,采用加入部分中间步骤,如聚合后 cache ,具体情况具体分析; l 适当的使用序化方案以及压缩方案; l 善于利用集群监控系统,将集群的运行状况维持在一个合理的、平稳的状态; l 善于解决重点矛盾,多观察 Stage 中的 Task ,查看最耗时的 Task ,查找原因并改善; |
|