TensorFlowOnSpark 项目是由Yahoo 开源的一个软件包,实现TensorFlow 集群服务部署在Spark 平台之上。
tfos.part1.1_1.pdf.jpg
大家好,这次我将分享TensorFlow On Spark 的解决方案,将TensorFlow 集群部署在Spark 平台之上,实现了TensorFlow 与Spark 的无缝连接,更好地解决了两者数据传递的问题。
tfos.part2.2_2.pdf.jpg
这次分享的主要内容包括TensorFlowOnSpark 架构设计,探讨其工作原理,通过理解其设计,更好地理解TensorFlow 集群在Spark 平台上的运行机制。
tfos.part3.3_3.pdf.jpg
首先,探讨TensorFlowOnSpark 的架构与设计。主要包括如下两个基本内容:
tfos.part4.4_4.pdf.jpg
在开始之前,先探讨一下TensorFlowOnSpark 的背景,及其它需要解决的问题。为了实现Spark 利用TensorFlow 深度学习,及其GPU 加速的能力,最常见的解决方案如上图所示。
搭建TensorFlow 集群,并通过利用既有的Spark 集群的数据完成模型的训练,最种再将训练好的模型部署在Spark 集群上,实现数据的预测。
该方案虽然实现了Spark 集群的深度学习,及其GPU 加速的能力,但需要Spark 集群与TensorFlow 集群之间的数据传递,造成冗余的系统复杂度。
tfos.part5.5_5.pdf.jpg
很容易想到,可以将TensorFlow 集群部署在Spark 之上,用于解决集群间数据传递的问题。
依次类同,该方案可实现Caffe 部署在Spark 集群之上,实现Spark 集群对多种深度学习框架的支持能力,并兼容既有Spark 组件的完整性,包括Spark MLLib, Spark Streaming, Spark SQL 等。
tfos.part6.6_6.pdf.jpg
TensorFlowOnSpark 的架构较为简单,Spark Driver 程序并不会参与TensorFlow 内部相关的计算和处理。其设计思路像是将一个TensorFlow 集群运行在了Spark 上,其在每个Spark Executor 中启动TensorFlow 应用程序,然后通过gRPC 或RDMA 方式进行数据传递与交互。
tfos.part7.7_7.pdf.jpg
TensorFlowOnSpark 的Spark 应用程序包括4 个基本过程。
- Reserve:组建
TensorFlow 集群,并在每个Executor 进程上预留监听端口,启动“数据/控制”消息的监听程序。
- Start:在每个
Executor 进程上启动TensorFlow 应用程序;
- Train/Inference:在
TensorFlow 集群上完成模型的训练或推理
- Shutdown:关闭
Executor 进程上的TensorFlow 应用程序,释放相应的系统资源(消息队列)。
tfos.part8.8_8.pdf.jpg
用户直接通过spark-submit 的方式提交Spark 应用程序(mnist_spark.py )。其中通过--py_files 选项附带TensorFlowOnSpark 框架(tfspark.zip ),及其TensorFlow 应用程序(mnist_dist.py ),从而实现TensorFlow 集群在Spark 平台上的部署。
tfos.part9.9_9.pdf.jpg
首先看看TensorFlow 集群的建立过程。首先根据spark-submit 传递的num_executor 参数,通过调用cluster = sc.parallelize(num_executor) 建立一个ParllelCollectionRDD ,其中分区数为num_executor 。也就是说,此时分区数等于Executor 数。
然后再调用cluster.mapPartitions(TFParkNode.reserve) 将ParllelCollectionRDD 变换(transformation)为MapPartitionsRDD ,在每个分区上回调TRSparkNode.reserve 。
TRSparkNode.reserve 将会在该节点上预留一个端口,并驻留一个Manager 服务。Manager持有一个队列,用于完成进程间的同步,实现该节点的“数据/控制”消息的服务。
数据消息启动了两个队列:Input 与Output ,分别用于RDD 与Executor 进程之间的数据交换。
控制消息启动了一个队列:Control ,用于Driver 进程控制PS 任务的生命周期,当模型训练完成之后,通过Driver 发送Stop 的控制消息结束PS 任务。
tfos.part10.10_10.pdf.jpg
这是从分区的角度看待TensorFlow 集群建立的过程,横轴表示RDD 。这里存在两个RDD ,第一个为ParllelCollectionRDD ,然后变换为MapPartitionsRDD 。
纵轴表示同一个分区(Partition),并在每个分区上启动一个Executor 进程 。在Spark 中,分区数等于最终在TaskScheduler 上调度的Task数目。
此处,sc.parallelize(num_executor) 生成一个分区数为num_executor 的ParllelCollectionRDD 。也就是说,此时分区数等于num_executor 数目。
在本例中,num_executor 为3 ,包括1 个PS 任务,2 个Worker 任务。
tfos.part11.11_11.pdf.jpg
TensorFlow 集群建立后,将生成上图所示的领域模型。其中,一个TFCluster 将持有num_executor 个TFSparkNode 节点;在每个TFSparkNode 上驻留一个Manager 服务,并预留一个监听端口,用于监听“数据/控制”消息。
实际上,TFSparkNode 节点承载于Spark Executor 进程之上。
tfos.part12.12_12.pdf.jpg
TensorFlow 集群建立后,通过调用cluster.start 启动集群服务。其结果将在每个Executor 进程上启动TensorFlow 应用程序。
此处,需要对原生的TensorFlow 应用程序进行适配修改,包括2 个部分:
-
Feeding 与Fetching : 数据输入/输出机制修改
-
ClusterSpec : TF 集群的构造描述
其余代码都将保留,最小化TensorFlow 应用程序的修改。
tfos.part13.13_13.pdf.jpg
在cluster 上调用foreachPartition(TFSparkNode.start(map_func)) ,将在每个分区(Executor 进程)上回调TFSparkNode.start(map_func) 。其中,map_func 是对应TF 应用程序的包装。
通过上述过程,将在Spark 上拉起了一个TF 的集群服务。从而使得Spark 集群拥有了深度学习和GPU 加速的能力。
tfos.part14.14_14.pdf.jpg
当Spark 平台上已经拉起了TF 集群服务之后,便可以启动模型的训练或推理过程了。在训练或推理过程中,最重要的是解决数据的Feeding 和Fetching 问题。
TFoS 上提供了两种方案:
-
TensorFlow QueueRunner :利用TensorFlow 提供的FileReader 和QueueRunner 机制。Spark 未参与任何工作,请查阅TensorFlow 官方相关文档。
-
Spark Feeding :首先从RDD 读取分区数据(通过HadoopRDD.compute ),然后将其放在Input 队列中,Executor 进程再从该队列中取出,并进一步通过feed_dict ,调用session.run 将分区数据供给给TensorFlow Graph 中。
tfos.part15.15_15.pdf.jpg
Feeding 过程,就是通过Input Queue 同步实现的。当RDD 读取分区数据后,阻塞式地将分区数据put 到Input 队列中;TFGraph 在session.run 获取Next Batch 时,也是阻塞式地等待数据的到来。
tfos.part16.16_16.pdf.jpg
同样的道理,Fetching 过程与Feeding 过程类同,只是使用Output Queue ,并且数据流方向相反。
session.run 返回的数据,通过put 阻塞式地放入Output Queue ,RDD 也是阻塞式地等待数据到来。
tfos.part17.17_17.pdf.jpg
以模型训练过程为例,讲解RDD 的变换过程。此处以Mnist 手写识别为例,左边表示X ,右边表示Y 。分别通过HadoopRDD 读取分区数据,然后通过MapPartititionRDD 变换分区的数据格式;然后通过zip 算子,实现两个RDD 的折叠,生成ZipPartitionsRDD 。
然后,根据Epochs 超级参数的配置,将该RDD 重复执行Epochs 次,最终将结果汇总,生成UnionRDD 。
在此之前,都是Transformation 的过程,最终调用foreachPartition(train) 启动Action ,触发Spark Job 的提交和任务的运行。
tfos.part18.18_18.pdf.jpg
当模型训练或推理完成之后,分别在Input/Control 队列中投掷Stop (以传递None 实现)消息,当Manager 收到Stop 消息后,停止队列的运行。
最终,Spark 应用程序退出,Executor 进程退出,整个工作流执行结束。
tfos.part19.19_19.pdf.jpg
tfos.part20.20_20.pdf.jpg
推荐资料,强烈推荐直接地源代码阅读。
tfos.part21.21_21.pdf.jpg
感谢大家,最后欢迎大家关注我的简书。
tfos.part22.22_22.pdf.jpg
|