ClusterManager、优化Spark2018/12/10版本修改人修改记录修改时间V1.0王守奎编写2018/5/23目录Clus terManager0StandaloneClusterManager0机器准备0环境搭建0a.复制spark到workn ode0b.分别在datanode1、datanode2、datanode3上,创建连接0c.配置文件并分发0d.配置环境变量1e .运行1f.停止集群2g.单独启动spark的各个进程3h.启动sparkshell,然后观察界面3spark的端口4spark 程序41.在eclipse下创建maven项目bigData-Spark42.编写pom.xml文件53.编写scala文件74. 编译75.生成target目录76.提交8常见问题81、DescriptionResourcePathLocationTy pe8提交应用方式8spark-submit详细参数说明9spark-shell9spark-submit10spark使用ya rnclustermanager111.配置环境变量112.重启服务113.提交11ClusterHA12机器准备12配置s park-env.sh并分发12启动集群13访问WebUI141、http://namenode:8080/142、http: //namenode2:808014HA单点故障模仿15关闭namenode-master:stop-master.sh15web UI查看状态15故障恢复16HA模式访问HadoopHA下的数据16优化Spark16开发调优16避免创建重复的RDD16尽可能 复用同一个RDD17对多次使用的RDD进行持久化17尽量避免使用shuffle类算子17使用map-side预聚合的shuffle 操作17使用高性能的算子18使用Kryo优化序列化性能18资源调优19数据倾斜调优20数据倾斜发生时的现象20数据倾斜发生的原理2 0数据倾斜解决201、使用HiveETL预处理数据202、过滤少数导致倾斜的key213、提高shuffle操作的并行度214、 两阶段聚合(局部聚合+全局聚合)215、将reducejoin转为mapjoin226、使用随机前缀和扩容RDD进行join2 2ClusterManagerStandaloneClusterManager机器准备Namenode(Master),da tanode1(worker),datanode2(worker),datanode3(worker)环境搭建a.复制spark到 worknode$>cd/soft$>xcopy.shspark-2.1.1-bin-without-hadoopb.分别在 datanode1、datanode2、datanode3上,创建连接$>cd/soft$>ln-sspark-2.1.1- bin-without-hadoop/sparkc.配置文件并分发SparkStandlone模式采用Master/Slave 结构,slave的配置如下$>cpslaves.templateslaves$>vislavesdatanode1datan ode2datanode3设置spark-env.sh,并分发#HadoopYARN运行模式时需要,Standlone模式暂时不 需要#exportHADOOP_CONF_DIR=/soft/hadoop/etc/hadoop#配置运行的jdk环境expor tJAVA_HOME=/usr/java/jdk1.8.0_172#配置hadoop的classpath,由于使用的是spark -2.1.1-bin-without-hadoop,所以需要关联hadoopclassexportSPARK_DIST_CL ASSPATH=$(/soft/hadoop/bin/hadoopclasspath)d.配置环境变量$>xcopy.sh/e tc/profilee.运行先启动hadoop集群$>start-all.sh//只启动HDFS就可以,start-dfs.sh 启动spark集群修改/soft/spark/sbin目录中的start-all.sh和stop-all.sh,这两个命令同had oop的名称一样,容易产生混乱。如果不修改则需要进入目录再执行./start-all.sh或./stop-all.shcp start-all.shstart-spark-all.shcpstop-all.shstop-spark-all.sh$> start-spark-all.sh查看进程:Master(管理器)+worker(工作节点)访问masterhttp:/ /namenode:8080/访问workerhttp://datanode1:8081/常见问题1、datanode2: JAVA_HOMEisnotset解决方案:在/soft/spark/conf/spark-env.sh中添加如下配置:e xportJAVA_HOME=/usr/java/jdk1.8.0_1722、执行start-all.sh出错,错误:Excep tioninthread“main”java.lang.NoClassDefFoundError:org.slf4j.L ogger解决方案:在Spark的conf/spark-env.sh中配置hadoopclasspath,在其中添加下面这一句e xportSPARK_DIST_CLASSPATH=$(/soft/hadoop/bin/hadoopclasspath),其 中/soft/hadoop为hadoop的安装目录。原因是安装spark使用的是spark-2.1.1-bin-without-h adoop.tgz,如果使用的是spark-2.1.1-bin-hadoop2.7.tgz应该就不需要设置hadoopclas spath。f.停止集群$>/soft/spark/sbin/$>./stop-spark-all.shg.单独启动spark的各 个进程启停master$>cd/soft/spark/sbin/$>./start-master.sh//启动master进 程$>./stop-master.sh//停止master进程$>./spark-daemon.shstartorg.apa che.spark.deploy.master.Master//启动master进程$>./spark-daemon.shst oporg.apache.spark.deploy.master.Master//停止master进程$>cd../bin$ >./spark-classorg.apache.spark.deploy.master.Master//启动master进 程启停worker$>cd/soft/spark/sbin$>./start-slaves.sh//在master节点,启 动所有的worker进程$>./stop-slaves.sh//在master节点,停止所有的worker进程$>./star t-slave.shspark://namenode:7077//在worker节点上启动work进程$>./spark- classorg.apache.spark.deploy.worker.Workerspark://namenode:7077 //在worker节点上启动work进程h.启动sparkshell,然后观察界面在spark集群的任意节点运行如下命令: $>spark-shell--masterspark://namenode:7077spark的端口masterrpcspa rk://namenode:7077masterwebuihttp://namenode:8080workerwebuiht tp://datanode1:8081RESTURLspark://namenode:6066(clustermode)s park程序在win7上进行spark开发(使用eclipse+maven插件),提交给linux的spark集群。1.在ec lipse下创建maven项目bigData-Sparknew-->project-->mavenproject-->qu ikstart-->ok此时创建的工程还不是scala的工程,在项目名称上点击右键,点击AddScalaNature后项目 才转换为scala-maven项目。2.编写pom.xml文件e.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-inst ance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http: //maven.apache.org/xsd/maven-4.0.0.xsd">4.0.0elVersion>com.bmbigData-SparkrtifactId>0.0.1-SNAPSHOTt.build.sourceEncoding>UTF-8perties>junitactId>junit3.8.1provided scope>org.apache.spark>spark-core_2.112.1.1 providedorg.apac he.sparkspark-hive_2.11n>2.1.1provided org.apache.sparkspark-streaming-ka fka_2.111.6.3ency>org.apache.sparkspark-streami ng_2.112.1.1ncy>org.apache.sparkspark-graphx_2 .112.1.1 jdk.toolsjdk.toolsrsion>1.8system${JAVA_HOME}/ lib/tools.jar3 .编写scala文件[Test0.scala]packagecom.bm.sparkimportorg.apache.spa rk.SparkConfimportorg.apache.spark.SparkContext/大数据界的千里马 王守奎/objectTest01{defmain(args:Array[String]){valconf= newSparkConf();valsc=newSparkContext(conf);println("hello world")}}4.编译maven-->compile-->package5.生成target目录找到bigData-Spar k-0.0.1-SNAPSHOT.jar6.提交把bigdata-spark-2.1.1-0.0.1-SNAPSHOT.jar拷贝 到linux,然后执行下面命令进行提交:$>spark-submit--masterspark://namenode:7077 --classcom.bm.spark.Test01bigData-Spark-0.0.1-SNAPSHOT.jar常见问题 1、DescriptionResourcePathLocationTypeTheversionofscalalibrary foundinthebuildpathofbigData-Spark(2.11.8)ispriortoth eoneprovidedbyscalaIDE(2.12.3).SettingaScalaInstallatio nChoicetomatch.bigData-SparkUnknownScalaVersionProblem解决方案:右 击工程–>Scala–>settheScalaInstallation选择scal版本。提交应用方式spark-su bmit可以提交任务到spark集群执行,也可以提交到hadoop的yarn集群执行。spark-submit详细 参数说明参数名(红色常用)参数说明--mastermaster的地址,提交任务到哪里执行,例如spark://host:po rt,yarn,local--deploy-mode在本地(client)启动driver或在cluster 上启动,默认是client--class应用程序的主类,仅针对java或scala应用--name应用程序的名称-- jars用逗号分隔的本地jar包,设置后,这些jar将包含在driver和executor的classpath 下--packages包含在driver和executor的classpath中的jar的maven坐标--e xclude-packages为了避免冲突而指定不包含的package--repositories远程repositor y--confPROP=VALUE指定spark配置属性的值,例如-confspark.executor.extra JavaOptions="-XX:MaxPermSize=256m"--properties-file加载的配置文件,默认为c onf/spark-defaults.conf--driver-memoryDriver内存,默认1G--driver-jav a-options传给driver的额外的Java选项--driver-library-path传给driver的 额外的库路径--driver-class-path传给driver的额外的类路径--driver-coresDriver 的核数,默认是1。在yarn或者standalone下使用--executor-memory每个executor的内 存,默认是1G--total-executor-cores所有executor总共的核数。仅仅在mesos或者stan dalone下使用--num-executors启动的executor数量。默认为2。在yarn下使用--execut or-core每个executor的核数。在yarn或者standalone下使用spark-shell$>spark-sh ell--masterspark://namenode:7077//进入集群shell完全分布式情况下运行分析如下图:sp ark-submit$>spark-submit--masterspark://namenode:7077--classc om.bm.spark.Test01bigdata-spark-2.1.1-0.0.1-SNAPSHOT.jar--master master_urlspark://host:port,mesos://host:port,yarn,orlocal--de ploy-modeDEPLOY_NAMEcluster|client(默认)--classCLASS_NAME指定类 名--nameNAME应用名称--jarsJAR指定第三方依赖jar文件,多个文件","号分割spark提交的部署模式 client模式dirver运行在提交的jvm中$>spark-submit--masterspark://namenode: 7077--classcom.bm.spark.Test01--nameMySparkClientApp--deploy -modeclientbigData-Spark-0.0.1-SNAPSHOT.jarcluster模式driver运行在一个 worker上的jvm中$>spark-submit--masterspark://namenode:7077--class com.bm.spark.Test01--nameMySparkClusterApp--deploy-modeclust erbigData-Spark-0.0.1-SNAPSHOT.jar//注意jar要提前分发到worker节点上spark使 用yarnclustermanagerspark-submit可以提交任务到spark集群执行,也可以提交到hadoo p的yarn集群执行。HadoopYARN模式:集群运行在Yarn资源管理器上,资源管理交给YARN,Spark只负责进行 任务调度和计算。1.配置环境变量HADOOP_CONF_DIR,指向hadoop的配置目录[/soft/spark/conf/sp ark-env.sh]exportHADOOP_CONF_DIR=/soft/hadoop/etc/hadoop分发2.重启服务 $>stop-spark-all.sh$>stop-all.sh$>start-all.sh$>start-spark-all. sh3.提交$>spark-submit--masteryarn--classcom.bm.spark.Test01-- nameMySparkYarnApp--deploy-modeclusterbigData-Spark-0.0.1-SNA PSHOT.jarClusterHASpark集群安装完毕,但是有一个很大的问题,那就是Master节点存在单点故障,要解决此 问题,就要借助zookeeper,并且启动至少两个(可以多个)Master节点来实现高可靠,配置方式比较简单。机器准备Nameno de(Master),datanode1(worker),datanode2(worker),datanode3(worker), namenode2(master)。Namenode2部署一套spark。配置spark-env.sh并分发在namenode节点 中,停止spark所有服务,修改配置文件spark-env.sh,在该配置文件中删掉SPARK_MASTER_IP并添加如下配置。 然后分发exportSPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZO OKEEPER-Dspark.deploy.zookeeper.url=datanode1,datanode2,datanode 3-Dspark.deploy.zookeeper.dir=/spark"参数说明:#exportSPARK_MASTER_I P=namenode#这个配置要注释掉。说明:-Dspark.deploy.recoveryMode=ZOOKEEPER# 说明整个集群状态是通过zookeeper来维护的,整个集群状态的恢复也是通过zookeeper来维护的。就是说用zookeeper 做了spark的HA配置,Master(Active)挂掉的话,Master(standby)要想变成Master(Active) 的话,Master(Standby)就要向zookeeper读取整个集群状态信息,然后进行恢复所有Worker和Driver的状态 信息,和所有的Application状态信息;-Dspark.deploy.zookeeper.url=datanode1,da tanode2,datanode3#配置zookeeper,默认端口2181,不是需要显示说明,默认可以不写-Dspark.de ploy.zookeeper.dir=/spark#保存spark的元数据,保存了spark的作业运行状态;zookeeper 会保存spark集群的所有的状态信息,包括所有的Workers信息,所有的Applactions信息,所有的Driver信息,如果 集群启动集群启动HDFS集群,如果hadoop-yarn模式,则再启动RM。>start-dfs.sh2、启动spark集群在 namenode上执行start-spark-all.sh脚本,然后在namenode2上执行start-master.sh启动第 二个Master(注意在启动之前先停止启动了的单集群)访问WebUIhttp://namenode:8080/http://nam enode2:8080HA单点故障模仿关闭namenode-master:stop-master.shwebUI查看状态http: //namenode:8080/不能访问http://namenode2:8080故障恢复Namenode状态standbyNa menode2状态aliveHA模式访问HadoopHA下的数据spark-submit--masterspark://na menode:7077,namenode2:7077--classcom.bm.spark.Test01--nameMyS parkClientApp--deploy-modeclientbigData-Spark-0.0.1-SNAPSHOT.j ar优化Spark开发调优避免创建重复的RDD首先是基于某个数据源(比如Hive表或HDFS文件)创建一个初始的RDD;接着对这个 RDD执行某个算子操作,然后得到下一个RDD;以此类推,循环往复,直到计算出最终我们需要的结果。在这个过程中,多个RDD会通过不同 的算子操作(比如map、reduce等)串起来,这个“RDD串”,就是RDDlineage,也就是“RDD的血缘关系链”。?我们 在开发过程中要注意:对于同一份数据,只应该创建一个RDD,不能创建多个RDD来代表同一份数据尽可能复用同一个RDD比如:有一个RD D的数据格式是key-value类型的,另一个是单value类型的,这两个RDD的value数据是完全一样的。那么此时我们可以只使 用key-value类型的那个RDD,因为其中已经包含了另一个的数据。对于类似这种多个RDD的数据有重叠或者包含的情况,我们应该尽 量复用一个RDD,这样可以尽可能地减少RDD的数量,从而尽可能减少算子执行的次数。对多次使用的RDD进行持久化对多次使用的RDD进 行持久化。此时Spark就会根据你的持久化策略,将RDD中的数据保存到内存或者磁盘中。以后每次对这个RDD进行算子操作时,都会直接 从内存或磁盘中提取持久化的RDD数据,然后执行算子,而不会从源头处重新计算一遍这个RDD,再执行算子操作。尽量避免使用shuffl e类算子shuffle过程,简单来说,就是将分布在集群中多个节点上的同一个key,拉取到同一个节点上,进行聚合或join等操作。比 如reduceByKey、join等算子,都会触发shuffle操作。shuffle过程中,各个节点上的相同key都会先写入本地磁 盘文件中,然后其他节点需要通过网络传输拉取各个节点上的磁盘文件中的相同key。而且相同key都拉取到同一个节点进行聚合操作时,还有 可能会因为一个节点上处理的key过多,导致内存不够存放,进而溢写到磁盘文件中。因此在shuffle过程中,可能会发生大量的磁盘文件 读写的IO操作,以及数据的网络传输操作。磁盘IO和网络数据传输也是shuffle性能较差的主要原因。因此在我们的开发过程中,能避免 则尽可能避免使用reduceByKey、join、distinct、repartition等会进行shuffle的算子,尽量使用m ap类的非shuffle算子。这样的话,没有shuffle操作或者仅有较少shuffle操作的Spark作业,可以大大减少性能开销 。使用map-side预聚合的shuffle操作一定要使用shuffle操作,无法用map类的算子来替代,那么尽量使用可以map- side预聚合的算子。?比如如下两幅图,就是典型的例子,分别基于reduceByKey和groupByKey进行单词计数。其中第一 张图是groupByKey的原理图,可以看到,没有进行任何本地聚合时,所有数据都会在集群节点之间传输;第二张图是reduceByK ey的原理图,可以看到,每个节点本地的相同key数据,都进行了预聚合,然后才传输到其他节点上进行全局聚合。使用高性能的算子使用re duceByKey/aggregateByKey替代groupByKey使用mapPartitions替代普通map使用forea chPartitions替代foreach使用repartitionAndSortWithinPartitions替代repart ition与sort类操作使用Kryo优化序列化性能valconf=newSparkConf()conf.set("spa rk.serializer","org.apache.spark.serializer.KryoSerializer")conf .set("spark.kryo.registrationRequired","true")conf.registerKryoC lasses(Array(classOf[MyClass],classOf[MyOtherClass]))资源调优使用spark -submit提交一个Spark作业之后,这个作业就会启动一个对应的Driver进程。根据使用的部署模式(deploy-mode) 不同,Driver进程可能在本地启动,也可能在集群中某个工作节点上启动。Driver进程本身会根据我们设置的参数,占有一定数量的内 存和CPUcore。而Driver进程要做的第一件事情,就是向集群管理器(可以是SparkStandalone集群,也可以是其 他的资源管理集群)申请运行Spark作业需要使用的资源,这里的资源指的就是Executor进程。YARN集群管理器会根据我们为Sp ark作业设置的资源参数,在各个工作节点上,启动一定数量的Executor进程,每个Executor进程都占有一定数量的内存和CP Ucore。在申请到了作业执行所需的资源之后,Driver进程就会开始调度和执行我们编写的作业代码了。Driver进程会将我们编 写的Spark作业代码分拆为多个stage,每个stage执行一部分代码片段,并为每个stage创建一批task,然后将这些tas k分配到各个Executor进程中执行。task是最小的计算单元,负责执行一模一样的计算逻辑(也就是我们自己编写的某个代码片段), 只是每个task处理的数据不同而已。一个stage的所有task都执行完毕之后,会在各个节点本地的磁盘文件中写入计算中间结果,然后 Driver就会调度运行下一个stage。下一个stage的task的输入数据就是上一个stage输出的中间结果。如此循环往复,直 到将我们自己编写的代码逻辑全部执行完,并且计算完所有的数据,得到我们想要的结果为止。Executor的内存主要分为三块:第一块是让 task执行我们自己编写的代码时使用,默认是占Executor总内存的20%;第二块是让task通过shuffle过程拉取了上一个 stage的task的输出后,进行聚合等操作时使用,默认也是占Executor总内存的20%;第三块是让RDD持久化时使用,默认占 Executor总内存的60%。下面资源调优示例代码,根据实际情况进行调整。/spark-submit\--master spark://namenode:7077\--num-executors50\//设置Spark作业总共要用 多少个Executor进程来执行--executor-memory1G\//每个Executor进程的内存--exe cutor-cores2\//每个Executor进程的CPUcore数量--driver-memory1G\ //Driver进程的内存--confspark.default.parallelism=2000\//每个sta ge的默认task数量--confspark.storage.memoryFraction=0.5\//RDD持久化数 据在Executor内存中能占的比例,默认是0.6--confspark.shuffle.memoryFraction=0.3 \//shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存 的比例,默认是0.2数据倾斜调优数据倾斜发生时的现象绝大多数task执行得都非常快,但个别task执行极慢。比如,总共有2000个 task,1997个task都在1分钟之内执行完了,但是剩余两三个task却要一两个小时。这种情况很常见。原本能够正常执行的Spa rk作业,某天突然报出OOM(内存溢出)异常,观察异常栈,是我们写的业务代码造成的。这种情况比较少见。数据倾斜发生的原理在进行sh uffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,比如按照key进行聚合或join等操作。此 时如果某个key对应的数据量特别大的话,就会发生数据倾斜。数据倾斜解决1、使用HiveETL预处理数据方案适用场景:导致数据倾斜 的是Hive表。如果该Hive表中的数据本身很不均匀(比如某个key对应了100万数据,其他key才对应了10条数据),而且业务场 景需要频繁使用Spark对Hive表执行某个分析操作,那么比较适合使用这种技术方案。方案实现思路:此时可以评估一下,是否可以通过H ive来进行数据预处理(即通过HiveETL预先对数据按照key进行聚合,或者是预先和其他表进行join),然后在Spark作业 中针对的数据源就不是原来的Hive表了,而是预处理后的Hive表。此时由于数据已经预先进行过聚合或join操作了,那么在Spark 作业中也就不需要使用原先的shuffle类算子执行这类操作了2、过滤少数导致倾斜的key方案适用场景:如果发现导致倾斜的key就少 数几个,而且对计算本身的影响并不大的话,那么很适合使用这种方案。比如99%的key就对应10条数据,但是只有一个key对应了100 万数据,从而导致了数据倾斜。方案实现思路:如果我们判断那少数几个数据量特别多的key,对作业的执行和计算结果不是特别重要的话,那么 干脆就直接过滤掉那少数几个key。比如,在SparkSQL中可以使用where子句过滤掉这些key或者在SparkCore中对 RDD执行filter算子过滤掉这些key。如果需要每次作业执行时,动态判定哪些key的数据量最多然后再进行过滤,那么可以使用sa mple算子对RDD进行采样,然后计算出每个key的数量,取数据量最多的key过滤掉即可。3、提高shuffle操作的并行度方案适 用场景:如果我们必须要对数据倾斜迎难而上,那么建议优先使用这种方案,因为这是处理数据倾斜最简单的一种方案。方案实现思路:在对RDD 执行shuffle算子时,给shuffle算子传入一个参数,比如reduceByKey(1000),该参数就设置了这个shuffl e算子执行时shufflereadtask的数量。对于SparkSQL中的shuffle类语句,比如groupby、joi n等,需要设置一个参数,即spark.sql.shuffle.partitions,该参数代表了shufflereadtask 的并行度,该值默认是200,对于很多场景来说都有点过小。4、两阶段聚合(局部聚合+全局聚合)?方案适用场景:对RDD执行reduc eByKey等聚合类shuffle算子或者在SparkSQL中使用groupby语句进行分组聚合时,比较适用这种方案。?方案实现思路:这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每个key都打上一个随机数,比如10以内的随机数,此时原先一样的key就变成不一样的了,比如(hello,1)(hello,1)(hello,1)(hello,1),就会变成(1_hello,1)(1_hello,1)(2_hello,1)(2_hello,1)。接着对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合,那么局部聚合结果,就会变成了(1_hello,2)(2_hello,2)。然后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次进行全局聚合操作,就可以得到最终结果了,比如(hello,4)。5、将reducejoin转为mapjoin?方案适用场景:在对RDD使用join类操作,或者是在SparkSQL中使用join语句时,而且join操作中的一个RDD或表的数据量比较小(比如几百M或者一两G),比较适用此方案。方案实现思路:不使用join算子进行连接操作,而使用Broadcast变量与map类算子实现join操作,进而完全规避掉shuffle类的操作,彻底避免数据倾斜的发生和出现。将较小RDD中的数据直接通过collect算子拉取到Driver端的内存中来,然后对其创建一个Broadcast变量;接着对另外一个RDD执行map类算子,在算子函数内,从Broadcast变量中获取较小RDD的全量数据,与当前RDD的每一条数据按照连接key进行比对,如果连接key相同的话,那么就将两个RDD的数据用你需要的方式连接起来。6、使用随机前缀和扩容RDD进行join?方案适用场景:如果在进行join操作时,RDD中有大量的key导致数据倾斜,那么进行分拆key也没什么意义,此时就只能使用最后一种方案来解决问题了。方案实现思路:首先查看RDD/Hive表中的数据分布情况,找到那个造成数据倾斜的RDD/Hive表,比如有多个key都对应了超过1万条数据,然后将该RDD的每条数据都打上一个n以内的随机前缀。同时对另外一个正常的RDD进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都依次打上一个0~n的前缀。最后将两个处理后的RDD进行join即可。魁魁语录:spark学习魁魁语录:spark学习江湖一哥版权所有江湖一哥版权所有 |
|