并行优化并行编译
# 参数: set hive.driver.parallel.compilation=true; # 默认这个参数是False; # 搭配参数: hive.driver.parallel.compilation.global.limit # 表示,最大的并行度是多少。默认是3
并行Stage执行 Hive的SQL在翻译成MR任务的时候,可能会有很多的stage(阶段) 阶段之间都会有依赖关系:
如果多个State之间没有依赖,他们如果能够并行执行就能够提高集群的整体资源利用率。 # 参数: set hive.exec.parallel=true,可以开启并发执行,默认为false。 set hive.exec.parallel.thread.number=16; //同一个sql允许的最大并行度,默认为8。 小文件优化
小文件的HDFS影响 对磁盘寻道不友好(排除SSD,这里指的是机械硬盘)
对NameNode的压力很大
Hive执行MapReduce也会产生很多的结果文件。 这些结果文件,都不一定每一个都达到了一个block的大小。 随着时间的推移,结果文件会越攒越多,最终还是导致了文件过多对HDFS的影响,以及文件都不一定达到block的大小,也是一种小文件过多的问题。 对于这个问题,我们可以要求Hive在执行完毕后,合并结果文件。
# 是否开启合并Map端小文件,在Map-only的任务结束时合并小文件,true是打开。 hive.merge.mapfiles # 是否开启合并Reduce端小文件,在map-reduce作业结束时合并小文件。true是打开。 hive.merge.mapredfiles # 合并后MR输出文件的大小,默认为256M。建议设置为255M hive.merge.size.per.task # 对小文件判断的平均大小阈值(结果文件的平均大小如果小于阈值,才会进行合并的操作) hive.merge.smallfiles.avgsize 矢量化查询对于分布式系统的优化,有两个方向:
矢量化查询,就是类型2的优化。 MapReduce默认情况下,对数据的处理是一条条的处理的。
矢量化查询是指,对数据一批批的处理。 要执行矢量化是有要求的:
# 开启矢量化查询 set hive.vectorized.execution.enabled=true; # 开启矢量化在reduce端 set hive.vectorized.execution.reduce.enabled = true;
读取零拷贝优化概念:尽量减少在读取的时候读取的数据量的大小。 条件:
一般情况下,我们写SQL是有一部分的SQL是只会用到表中的部分的 这种场景下,就可以 # 参数 set hive.exec.orc.zerocopy=true;
数据倾斜优化数据倾斜:在分布式程序分配任务的时候,任务分配的不平均。
数据倾斜一旦发生,横向拓展只能缓解这个情况,而不能解决这个情况。 如果遇到数据倾斜,一定要从根本上去解决这个问题。而不是想着加机器来解决。 JOIN的时候的倾斜方案一用前面讲过的map join SMB join 这些优化去解决。
方案二Sekw Join 方案的方式是: 对倾斜列的数据,进行单独处理。也就是遇到倾斜列的数据的时候,直接找一个中间目录临时存储,当前MR不去处理 等当前MR完成后,在单独处理这个倾斜的数据集。 这种解决方式有一个前置条件,Hive必须要知道,哪个列的数据倾斜了。 如何让Hive知晓哪个列是倾斜列,就有2种方式 方式1:运行时判断在执行MR的过程中,Hive会对数据记录 方式2:编译时判断指的就是,执行SQL的人提前知晓某个列就是倾斜列。 在建表的时候,就指定某个列是倾斜列即可。 # 参数 # 开启倾斜优化,针对倾斜优化的总开关 set hive.optimize.skewjoin=true; # 设置运行时判断的时候,对倾斜数据量的阈值 set hive.skewjoin.key=100000; # 开启编译时的倾斜优化,针对编译时的开关 set hive.optimize.skewjoin.compiletime=true; # 示例语句,这个语句用于编译时判断,提前告知Hive哪个列是倾斜的 CREATE TABLE list_bucket_single (key STRING, value STRING) -- 倾斜的字段和需要拆分的key值 SKEWED BY (id) ON (1,5,6) -- 为倾斜值创建子目录单独存放 [STORED AS DIRECTORIES]; -- 上面的参数可以组合一块使用 -- 当表有SKEWED BY的设置,走编译时优化 -- 如果表没有这个设置,就运行时优化
Union优化在前面的优化中,不管是运行时优化,还是编译时优化,都会产生两份结果。 这两份结果最终都需要进行Union操作合并为一份结果 参数:
开启这个参数的时候,对中间数据进行重复性利用。提升union的性能。 重复利用:不会单独开启任务对多份数据执行合并,而是每一个任务在执行之后直接将结果输出到目的地。 不开启Union合并优化: MR1 对普通数据进行处理,输入路径:/tmp/1 MR2 对倾斜数据进行处理,输入路径:/tmp/2 合并后,数据写入最终目的地:/user/hive/warehouse/xxx.db/xxxtable/ 如果开启了优化: MR1 对普通数据进行处理,输入路径:/user/hive/warehouse/xxx.db/xxxtable/ MR2 对倾斜数据进行处理,输入路径:/user/hive/warehouse/xxx.db/xxxtable/ GROUP BY分组统计的倾斜处理对于数据倾斜,典型的两个性能点:
前提条件:对数据走平均打散,不按照hash散列 优化1:利用MapReduce的Combina 机制,在Map端完成预聚合操作。 因为,分组是必聚合的,所以,我们可以做预聚合 参数: hive.map.aggr=true; 优化2:大combina机制,对预聚合产生在第一个MR的reduce端。 最终聚合产生在第二个MR中 将Map端的Combina扩散到真个MR,最终的聚合交由第二个MR来做。 在绝对的性能上:优化1是性能最好的,因为节省了很多的中间数据传输。同时一个MR搞定,不需要搞第二个MR来做。 但是,如果数据量巨大,这个MapReduce的任务的压力就会很大,同时执行时间可能很长。 执行时间过长,中间的变量就不好控。一旦出现问题,重头再来。 所以,对于海量数据一般使用优化2的方式,因为如果出现问题,起码可以从第一阶段的结果再来。 参数: hive.groupby.skewindata=true; MapReduce迭代计算的概念(补充)迭代计算,就是一步步的计算出结果的方式。 方式比一次性计算出结果效率要低,但是稳定性和数据的 在很多的企业业务计算中,有的数据计算是很复杂的。 可能:
MapReduce的计算模型上面提到: 这个是因为MapReduce的计算模型,就2个:
很多的业务计算都受限map和reduce方法的限制,因为,比如map方法 map(传入参数固定){ 我们只能在这个部分,做自由处理。 return 返回形式固定 } reduce(传入参数固定){ 我们只能在这个部分,做自由处理。 return 返回形式固定 } MR的迭代基于前面的概念,所以很多的业务计算本质上是迭代的计算。 如图,某些复杂的业务场景可能会如图所示执行MR的迭代计算。 上图中,MR之间基于HDFS完成数据的共享。 迭代计算中,中间产生的数据,都是中间结果。 这些中间结果就类似数仓中,ODS->DWD->DWM->DWS->APP的迭代过程。
前面分组倾斜处理优化中的优化2方案,就是一种迭代计算的思想延伸。 Hive优化小总结Hive在各方面优化的东西乱七八糟一堆。 我们这个数仓项目,70%时间都在Hive上,30%的时间在业务分析,建模分析上。 很痛苦,我们只想专心做业务分析,不想搞乱七八糟的这优化那优化的。 后面学习Spark和Flink的时候就能体会到专心做业务的快感了。 来源:https://www./content-4-817551.html |
|