在上一篇文章中,威廉展示了如何构建一个简单的Spark集群,本文将介绍如何在Spark集群上部署运行我们的程序
首先来看下Spark的简要工作流程
Spark应用运行在各自独立的进程中,由主程序(也被称为driver 程序)中的SparkContext 对象协调管理。SparkContext 可连接到多种cluster manager ,包括Spark本身提供的standalone cluster manager ,以及YARN ,Mesos 。成功连接后,SparkContext 会请求在Worker/Slave 主机上运行executor 进程用于管理数据,处理运算,并将JAR 包或.py 文件发送给executor 。最后,SparkContext 发送task 给executor 执行
以下几点值得注意:
- 应用有各自独立的
executor 进程,多线程处理task ,不同应用的task 运行在不同的JVM中,这样有利于不同任务的隔离,但也导致了在不同应用中,若不依靠外部数据存储,数据将无法共享
- Spark对于
cluster manager 是不可知的,不会影响到如YARN ,Mesos 等上运行的其他程序
driver 程序监听executor 连接,接口可以通过spark.driver.port ,spark.fileserver.port 配置,需保证这些接口可以被executor 连接到
driver 程序需要和executor 通信,因此最好能保证它们处于同一网段;若不得不将driver 启动在远程主机上,最好能打开RPC(远程过程调用协议),以减少driver 和executor 之间的通信时间
Spark支持运行Scala,Java及Python编写的应用,并提供了Python及Scala的Shell
Scala Shell
./bin/spark-shell --master local[2]
表示在本地使用2线程运行scala shell,更多参数可以通过./bin/spark-shell --help 查阅
spark@master:~/spark-1.3.1-bin-hadoop2.6$ ./bin/spark-shell --master local[2]
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging./log4j/1.2/faq.html#noconfig for more info.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/05/13 08:10:59 INFO SecurityManager: Changing view acls to: spark
15/05/13 08:10:59 INFO SecurityManager: Changing modify acls to: spark
15/05/13 08:10:59 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(spark); users with modify permissions: Set(spark)
15/05/13 08:10:59 INFO HttpServer: Starting HTTP Server
15/05/13 08:11:00 INFO Server: jetty-8.y.z-SNAPSHOT
15/05/13 08:11:00 INFO AbstractConnector: Started SocketConnector@0.0.0.0:46123
15/05/13 08:11:00 INFO Utils: Successfully started service 'HTTP class server' on port 46123.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.3.1
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_75)
Type in expressions to have them evaluated.
Type :help for more information.
15/05/13 08:11:11 WARN Utils: Your hostname, master resolves to a loopback address: 127.0.1.1; using 192.168.32.130 instead (on interface eth0)
15/05/13 08:11:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
15/05/13 08:11:11 INFO SparkContext: Running Spark version 1.3.1
15/05/13 08:11:11 INFO SecurityManager: Changing view acls to: spark
15/05/13 08:11:11 INFO SecurityManager: Changing modify acls to: spark
15/05/13 08:11:11 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(spark); users with modify permissions: Set(spark)
15/05/13 08:11:12 INFO Slf4jLogger: Slf4jLogger started
15/05/13 08:11:12 INFO Remoting: Starting remoting
15/05/13 08:11:13 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@master:35542]
15/05/13 08:11:13 INFO Utils: Successfully started service 'sparkDriver' on port 35542.
15/05/13 08:11:13 INFO SparkEnv: Registering MapOutputTracker
15/05/13 08:11:13 INFO SparkEnv: Registering BlockManagerMaster
15/05/13 08:11:13 INFO DiskBlockManager: Created local directory at /tmp/spark-a4407ace-acf9-4e46-9d52-f6270bed95dc/blockmgr-58bd1003-13d9-4adf-a8c5-85d9683a6164
15/05/13 08:11:13 INFO MemoryStore: MemoryStore started with capacity 267.3 MB
15/05/13 08:11:14 INFO HttpFileServer: HTTP File server directory is /tmp/spark-4ff109b4-3c02-4ba7-824b-0a21f3ce702f/httpd-48930b06-f315-4d57-b8ef-701ce6c0215d
15/05/13 08:11:14 INFO HttpServer: Starting HTTP Server
15/05/13 08:11:14 INFO Server: jetty-8.y.z-SNAPSHOT
15/05/13 08:11:14 INFO AbstractConnector: Started SocketConnector@0.0.0.0:45312
15/05/13 08:11:14 INFO Utils: Successfully started service 'HTTP file server' on port 45312.
15/05/13 08:11:14 INFO SparkEnv: Registering OutputCommitCoordinator
15/05/13 08:11:14 INFO Server: jetty-8.y.z-SNAPSHOT
15/05/13 08:11:15 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
15/05/13 08:11:15 INFO Utils: Successfully started service 'SparkUI' on port 4040.
15/05/13 08:11:15 INFO SparkUI: Started SparkUI at http://master:4040
15/05/13 08:11:15 INFO Executor: Starting executor ID <driver> on host localhost
15/05/13 08:11:15 INFO Executor: Using REPL class URI: http://192.168.32.130:46123
15/05/13 08:11:15 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@master:35542/user/HeartbeatReceiver
15/05/13 08:11:16 INFO NettyBlockTransferService: Server created on 60574
15/05/13 08:11:16 INFO BlockManagerMaster: Trying to register BlockManager
15/05/13 08:11:16 INFO BlockManagerMasterActor: Registering block manager localhost:60574 with 267.3 MB RAM, BlockManagerId(<driver>, localhost, 60574)
15/05/13 08:11:16 INFO BlockManagerMaster: Registered BlockManager
15/05/13 08:11:17 INFO SparkILoop: Created spark context..
Spark context available as sc.
15/05/13 08:11:18 INFO SparkILoop: Created sql context (with Hive support)..
SQL context available as sqlContext.
scala>
当前log level 为默认的INFO,可以看到许多详细的信息,对于我们以后的详细学习很有帮助;但目前我们觉得log太多了点,于是可以修改conf/log4j.properties ,使warning 以上级别的log才会在控制台中显示
log4j.rootCategory=WARN, console
Python Shell
./bin/pyspark --master local[2]
与Scala Shell类似的启动命令,修改log level 之后,这次我们看到log信息就少了许多
spark@master:~/spark-1.3.1-bin-hadoop2.6/bin$ ./pyspark --master local[2]
Python 2.7.6 (default, Mar 22 2014, 22:59:56)
[GCC 4.8.2] on linux2
Type "help", "copyright", "credits" or "license" for more information.
15/05/13 08:44:20 WARN Utils: Your hostname, master resolves to a loopback address: 127.0.1.1; using 192.168.32.130 instead (on interface eth0)
15/05/13 08:44:20 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
15/05/13 08:44:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 1.3.1
/_/
Using Python version 2.7.6 (default, Mar 22 2014 22:59:56)
SparkContext available as sc, HiveContext available as sqlContext.
>>>
在Spark Shell中可以进行一些交互式的操作,但更普遍的情况是打包部署完整的程序提交到Spark来执行
Spark_submit
Spark提供了spark_submit 脚本来处理程序的提交,其具体使用格式是这样的
./bin/spark-submit --class <main-class> --master <master-url> --deploy-mode <deploy-mode> --conf <key>=<value> ... # 其他选项
<application-jar> [application-arguments]
- Python应用的提交较为简单,只需把
.py 文件放置在<application-jar> 的位置,并可以用--py-files 参数来对应引用的.zip .egg .py 文件
- deploy_mode有client(默认),cluster两种
- client:使用
spark_submit 本身进程运行driver 程序,控制台进行输入输出,用户需要和worker 主机处在同一网段,比如直接登录到master 主机进行操作的用户,并且适合交互式操作,如Shell
- cluster:
driver 程序将被部署到worker 主机,以减少driver 与executor 间的通信成本,适用于在集群以外的远程主机提交应用的情况;目前不支持Mesos 集群及Python 程序
- 使用
cluster 模式部署在Spark Standalone Cluster 的情况,可以添加--supervise 参数来保证driver 在任务失败时自行重新提交
Spark_submit实例
# 本地8线程
./bin/spark-submit --class org.apache.spark.examples.SparkPi --master local[8] /path/to/examples.jar 100
# client deploy mode运行在Spark Standalone cluster
./bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://207.184.161.138:7077 --executor-memory 20G --total-executor-cores 100 /path/to/examples.jar 1000
# client deploy mode运行在Spark Standalone cluster,任务失败自动重新提交
./bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://207.184.161.138:7077 --deploy-mode cluster
--supervise
--executor-memory 20G --total-executor-cores 100 /path/to/examples.jar 1000
# 运行在YARN集群上
export HADOOP_CONF_DIR=XXX
./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster \ # can also be `yarn-client` for client mode
--executor-memory 20G --num-executors 50 /path/to/examples.jar 1000
# 在Spark Standalone cluster上运行Python程序
./bin/spark-submit --master spark://207.184.161.138:7077 examples/src/main/python/pi.py 1000
Master URL设置
Master URL |
含义 |
local |
本地单线程 |
local[K] |
本地K线程 |
local[*] |
本地最大线程,与CPU核数相同 |
spark://HOST:PORT |
Spark Standalone Cluster,端口默认7077 |
mesos://HOST:PORT |
Mesos Cluster,端口默认5050 |
yarn-client |
YARN cluster,client模式,需配置HADOOP_CONF_DIR环境变量 |
yarn-cluster |
YARN cluster,cluster模式,需配置HADOOP_CONF_DIR环境变量 |
配置文件
Spark-submit 默认会读取conf/spark-defaults.conf 的配置信息,也可通过--conf 参数来提供,还有在程序中通过SparkConf 对象设置;这几种方法的优先级为SparkConf > --conf > spark-defaults.conf
添加--verbose 参数到Spark-submit 会在log中记录各项配置参数是从何而来的
文件传输方式
Spark-submit 支持以下几种JAR ,.py 的URL格式,对应不同的传输方式
URL |
含义 |
绝对路径, file: |
executor 从driver 的HTTP file server 获取文件 |
hdfs:, http:, https:, ftp: |
executor 通过相应协议从driver 获取文件 |
local: |
executor 从本地获取文件,需保证文件存在于每个executor 本身的文件系统,这种模式减少了网络传输的成本 |
值得注意的是,随着运行的程序增多,传输到executor 的程序文件会占据越来越多的存储空间。YARN 集群拥有自动清理的功能,Spark Standalone 集群需要通过spark.worker.cleanup.appDataTtl 属性来设置自动清理
本文所涉及信息来源于
1. Spark官方文档 https://spark./docs/latest/
2. OReilly Learning Spark Lightning-Fast Big Data Analysis
|