摘要:从0.8版本起,tensorflow不仅支持多GPU运算,而且还支持分布式计算,包括分布式多GPU计算。可以将其部署在分布式的集群上。本文主要目的是简要介绍tensorflow的分布式架构。来源为其github官方手册的翻译“Distributed TensorFlow”。 Distributed TensorFlow本文介绍了如何搭建一个TensorFlow服务器的集群,以及如何在该分布式集群上部署一个计算图。需要读者对tensorflow的基本概念有一点的了解。其底层使用了gRPC 作为进程内通信的支持库 快速启动-Hello distributed TensorFlow!以下是一个简单的TensorFlow分布式程序的编写实例 # Start a TensorFlow server as a single-process 'cluster'.$ python>>> import tensorflow as tf>>> c = tf.constant('Hello, distributed TensorFlow!')>>> server = tf.train.Server.create_local_server()I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:206] Initialize HostPortsGrpcChannelCache for job local -> {localhost:35204}I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:202] Started server with target: grpc://localhost:35204 #自动建立了一个端口为35204的本地服务>>> sess = tf.Session(server.target) # Create a session on the server.>>> sess.run(c)'Hello, distributed TensorFlow!'
tf.train.Server.create_local_server() 会在本地创建一个单进程集群,该集群中的服务默认为启动状态。 自定义服务端口首先,需要构建一个TensorFlow的服务端可执行版本(grpc_tensorflow_server) 以及一个基于gRPC的客户端。可以使用如下命令进行构建: # CPU-only build.$ bazel build -c opt //tensorflow/core/distributed_runtime/rpc:grpc_tensorflow_server# GPU build.$ bazel build -c opt --config=cuda //tensorflow/core/distributed_runtime/rpc:grpc_tensorflow_server
如果是从最新的源代码创建的Python依赖包,它会自动包含一个基于gRPC的客户端。如果使用的是一个之前发布的二进制版本,需要根据这个安装说明来重新编译安装。在你成功地构建了分布式的TensorFlow组件之后,可以通过如下方式来启动服务器并且判断你的安装是否成功(在tensorflow源码根目录下运行): # Start a TensorFlow server as a single-process 'cluster'.$ bazel-bin/tensorflow/core/distributed_runtime/rpc/grpc_tensorflow_server \ --cluster_spec='local|localhost:2222' --job_name=local --task_index=0 &
#运行以上命令后,终端将会出现一下信息,说明成功启动一个端口为2222的本地服务I tensorflow/core/distributed_runtime/rpc/grpc_tensorflow_server.cc:74] Peer local 1 {localhost:2222}I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:206] Initialize HostPortsGrpcChannelCache for job local -> {localhost:2222}I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:202] Started server with target: grpc://localhost:2222
然后启动Python的交互器并且启动一个Session: $ python>>> import tensorflow as tf>>> c = tf.constant('Hello, distributed TensorFlow!')>>> sess = tf.Session('grpc://localhost:2222')>>> sess.run(c)'Hello, distributed TensorFlow!'
Create a cluster创建集群TensorFlow中的‘集群(cluster)’指的是一系列能够对一个TensorFlow图进行分布式计算的‘任务(task)’。每个任务都关联一个‘服务(server)’。TensorFlow中的服务会包含一个用于创建session的主节点(master)和一个用于图运算的工作节点(worker)。另外, TensorFlow中的集群可以拆分成一个或多个’作业(job)’, 每个作业可以包含一个或多个任务。创建集群的必要条件是为每个任务启动一个服务。这些任务可以运行在不同的机器上,但你也可以在同一台机器上启动多个任务(比如说在本地多个不同的GPU上运行)。每个任务会做如下的两步工作: 1、 创建一个 tf.train.ClusterSpec 用于对集群中的所有任务进行描述,该描述内容对于所有任务应该是相同的。 创建tf.train.ClusterSpec 的具体方法
为每一个任务创建tf.train.Server 的实例 每一个tf.train.Server 对象都包含一个本地设备的集合, 一个向其他任务的连接集合,以及一个可以利用以上资源进行分布式计算的“会话目标”(“session target“)。每一个服务程序都是一个指定作业的一员,其在作业中拥有自己独立的任务号。每一个服务程序都可以和集群中的其他任何服务程序进行通信。 # In task 0:cluster = tf.train.ClusterSpec({'local': ['localhost:2222', 'localhost:2223']})server = tf.train.Server(cluster, job_name='local', task_index=0)
# In task 1:cluster = tf.train.ClusterSpec({'local': ['localhost:2222', 'localhost:2223']})server = tf.train.Server(cluster, job_name='local', task_index=1)
注 :当前手动配置任务节点还是一个比较初级的做法,尤其是在遇到较大的集群管理的情况下。tensorflow团队正在开发一个自动程序化配置任务的节点的工具。例如:集群管理工具Kubernetes。如果你希望tensorflow支持某个特定的管理工具,可以将该请求发到GitHub issue 里。 指定模型中的分布式设备为了将某个操作放在某个特殊的处理过程上,在分布式环境下依然可以使用 with tf.device('/job:ps/task:0'): weights_1 = tf.Variable(...) biases_1 = tf.Variable(...)with tf.device('/job:ps/task:1'): weights_2 = tf.Variable(...) biases_2 = tf.Variable(...)with tf.device('/job:worker/task:7'): input, labels = ... layer_1 = tf.nn.relu(tf.matmul(input, weights_1) + biases_1) logits = tf.nn.relu(tf.matmul(layer_1, weights_2) + biases_2) # ... train_op = ...with tf.Session('grpc://worker7:2222') as sess: for _ in range(10000): sess.run(train_op)
在上面的例子中,Variables在job ps的两个task上被创建,然后计算密集型的部分创建在job work上。TensorFlow会自动地在不同的job之间传输数据。(从job到work是前向传递,而从worker到ps是梯度应用)。 多重复制计算在上面的这个称为“数据并行化”的公用训练配置项里,一般会包含多个用于对不同数据大小进行计算的任务(构成了work作业) 和 一个或多个分布在不同机器上用于不停更新共享参数的任务(构成了ps作业)。 所有的这些任务都可以运行在不同的机器上。实现这养的逻辑有很多的方法,目前TensorFlow团队采用的是构建链接库(lib)的方式来简化模型的工作,其实现了如下几种方法:
分布式训练程序的举例说明 接下来的代码是一个分布式训练程序的大致代码框架,其中实现了图间的拷贝和异步训练两种方法。该示例中包含了参数服务(parameter server)和工作任务(work task)的代码。 import tensorflow as tf# Flags for defining the tf.train.ClusterSpectf.app.flags.DEFINE_string('ps_hosts', '', 'Comma-separated list of hostname:port pairs')tf.app.flags.DEFINE_string('worker_hosts', '', 'Comma-separated list of hostname:port pairs')# Flags for defining the tf.train.Servertf.app.flags.DEFINE_string('job_name', '', 'One of 'ps', 'worker'')tf.app.flags.DEFINE_integer('task_index', 0, 'Index of task within the job')FLAGS = tf.app.flags.FLAGSdef main(_): ps_hosts = FLAGS.ps_hosts.split(',') worker_hosts = FLAGS.worker_hosts(',') # Create a cluster from the parameter server and worker hosts. cluster = tf.train.ClusterSpec({'ps': ps_hosts, 'worker': worker_hosts}) # Create and start a server for the local task. # 创建并启动服务 # 其参数中使用task_index 指定任务的编号 server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index) if FLAGS.job_name == 'ps': server.join() elif FLAGS.job_name == 'worker': # Assigns ops to the local worker by default. # 将op 挂载到各个本地的worker上 with tf.device(tf.train.replica_device_setter( worker_device='/job:worker/task:%d' % FLAGS.task_index, cluster=cluster)): # Build model... loss = ... global_step = tf.Variable(0) train_op = tf.train.AdagradOptimizer(0.01).minimize( loss, global_step=global_step) saver = tf.train.Saver() summary_op = tf.merge_all_summaries() init_op = tf.initialize_all_variables() # Create a 'supervisor', which oversees the training process. sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0), logdir='/tmp/train_logs', init_op=init_op, summary_op=summary_op, saver=saver, global_step=global_step, save_model_secs=600) # The supervisor takes care of session initialization, restoring from # a checkpoint, and closing when done or an error occurs. with sv.managed_session(server.target) as sess: # Loop until the supervisor shuts down or 1000000 steps have completed. step = 0 while not sv.should_stop() and step < 1000000: # Run a training step asynchronously. # See `tf.train.SyncReplicasOptimizer` for additional details on how to # perform *synchronous* training. _, step = sess.run([train_op, global_step]) # Ask for all the services to stop. sv.stop()if __name__ == '__main__': tf.app.run()
使用以下命令可以启动两个参数服务和两个工作任务。(假设上面的Python脚本名字为 train.py) # On ps0.example.com:$ python trainer.py \ --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \ --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \ --job_name=ps --task_index=0# On ps1.example.com:$ python trainer.py \ --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \ --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \ --job_name=ps --task_index=1# On worker0.example.com:$ python trainer.py \ --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \ --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \ --job_name=worker --task_index=0# On worker1.example.com:$ python trainer.py \ --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \ --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \ --job_name=worker --task_index=1
术语客户端(Client) 客户端是一个用于建立TensorFlow计算图并创立与集群进行交互的会话层tensorflow::Session 的程序。一般客户端是通过python或C++实现的。一个独立的客户端进程可以同时与多个TensorFlow的服务端相连 (上面的计算流程一节),同时一个独立的服务端也可以与多个客户端相连。 集群(Cluster) 相关链接: [1] 安装Tensorflow(Linux ubuntu) http://blog.csdn.net/lenbow/article/details/51203526 |
|
来自: bookocea > 《TensorFlow》