分享

tensorflow学习笔记(三十一):构建多GPU代码

 雪柳花明 2017-05-16

构建多GPU代码

结构

  1. 先构建单GPU代码
  2. 写个函数multi_gpu_model(num_gpus)来生成多GPU代码,并将对象保存在collection
  3. feed data
  4. run

如何构建单GPU代码

见之前博客构建TF代码 
不要在单GPU代码中创建optimizer op,因为是multi gpu,所以参数更新的操作是所有的GPU计算完梯度之后,才进行更新的。

如何实现multi_gpu_model函数

def multi_gpu_model(num_gpus=1):
  grads = []
  for i in range(num_gpus):
    with tf.device("/gpu:%d"%i):
      with tf.name_scope("tower_%d"%i):
        model = Model(is_training, config, scope)
        # 放到collection中,方便feed的时候取
        tf.add_to_collection("train_model", model)
        grads.append(model.grad) #grad 是通过tf.gradients(loss, vars)求得
        #以下这些add_to_collection可以直接在模型内部完成。
        # 将loss放到 collection中, 方便以后操作
        tf.add_to_collection("loss",model.loss)
        #将predict放到collection中,方便操作
        tf.add_to_collection("predict", model.predict)
        #将 summary.merge op放到collection中,方便操作
        tf.add_to_collection("merge_summary", model.merge_summary)
        # ...
  with tf.device("cpu:0"):
    averaged_gradients = average_gradients(grads)# average_gradients后面说明
    opt = tf.train.GradientDescentOptimizer(learning_rate)
    train_op=opt.apply_gradients(zip(average_gradients,tf.trainable_variables()))

  return train_op
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

如何feed data


def generate_feed_dic(model, feed_dict, batch_generator):
  x, y = batch_generator.next_batch()
  feed_dict[model.x] = x
  feed_dict[model.y] = y
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

如何实现run_epoch

#这里的scope是用来区别 train 还是 test
def run_epoch(session, data_set, scope, train_op=None, is_training=True):
  batch_generator = BatchGenerator(data_set, batch_size)
  ...
  ...
  if is_training and train_op is not None:
    models = tf.get_collection("train_model")
    # 生成 feed_dict
    feed_dic = {}
    for model in models:
      generate_feed_dic(model, feed_dic, batch_generator)
    #生成fetch_dict
    losses = tf.get_collection("loss", scope)#保证了在 test的时候,不会fetch train的loss
    ...
    ...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

main函数

main 函数干了以下几件事: 
1. 数据处理 
2. 建立多GPU训练模型 
3. 建立单/多GPU测试模型 
4. 创建Saver对象和FileWriter对象 
5. 创建session 
6. run_epoch

data_process()
with tf.name_scope("train") as train_scope:
  train_op = multi_gpu_model(..)
with tf.name_scope("test") as test_scope:
  model = Model(...)
saver = tf.train.Saver()
# 建图完毕,开始执行运算
with tf.Session() as sess:
  writer = tf.summary.FileWriter(...)
  ...
  run_epoch(...,train_scope)
  run_epoch(...,test_scope)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

如何编写average_gradients函数

def average_gradients(grads):#grads:[[grad0, grad1,..], [grad0,grad1,..]..]
  averaged_grads = []
  for grads_per_var in zip(*grads):
    grads = []
    for grad in grads_per_var:
      expanded_grad = tf.expanded_dim(grad,0)
      grads.append(expanded_grad)
    grads = tf.concat_v2(grads, 0)
    grads = tf.reduce_mean(grads, 0)
    averaged_grads.append(grads)

  return averaged_grads
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

还有一个版本,但是不work,不知为啥

def average_gradients(grads):#grads:[[grad0, grad1,..], [grad0,grad1,..]..]
  averaged_grads = []
  for grads_per_var in zip(*grads):
    grads = tf.reduce_mean(grads_per_var, 0)
    averaged_grads.append(grads)
  return averaged_grads

# -*- coding:utf-8 -*-
import os.path
import re
import time
import numpy as np
import tensorflow as tf
import cifar10


batch_size=128
#train_dir='/tmp/cifar10_train'

#最大步数
max_steps=1000000

#GPU的数量
num_gpus=1
#log_device_placement=False


#计算损失的函数
def tower_loss(scope):

#使用cifar10.distorted_inputs()产生数据增强后的imageslabels.
images, labels = cifar10.distorted_inputs()

#调用cifar10.inference生成卷积网络
#为每个GPU生成单独的网络,这些网络的结构完全一样,并且共享模型参数。
logits = cifar10.inference(images)

#调用cifar.loss计算损失函数(这里不直接返回loss,而是存储到collection中)
_ = cifar10.loss(logits, labels)


#使用tf.get_collection('losses', scope)获取当前这个GPU上的loss
#通过scope限定了范围
losses = tf.get_collection('losses', scope)

# 再使用tf.add_n将所有损失叠加到一起,得到total_loss,
total_loss = tf.add_n(losses, name='total_loss')

#返回total_loss作为函数结果
return total_loss



#它负责将不同GPU计算出的梯度进行合成
#合成的输入参数tower_grads是梯度的双层列表,
# 外层列表是不同GPU计算得到的梯度,内存列表是某个GPU内计算的不同Variable对应的梯度
#最内层元素为(grads,variable,tower_grads的基本原始为二元组(梯度,变量),
#具体形式为:[[(grad0_gpu0,var0_gpu0),(grad1_gpu0,var1_gpu0)],[(grad0_gpu1,var0_gpu1),(grad1_gpu1,var1_gpu1)],....]]
#
#然后用循环便利这个双层列表
def average_gradients(tower_grads):

average_grads = []#首先创建平均梯度的列表average_grads,它负责将梯度在不同GPU间进行平均

#zip(*tower_grads)将这个双层列表转置,变成
#[[(grad0_gpu0,var0_gpu0),(grad0_gpu1,var0_gpu1)],[(grad1_gpu0,var1_gpu0),(grad1_gpu1,var1_gpu1)],....]]
#然后使用循环遍历其元素
for grad_and_vars in zip(*tower_grads):
# Note that each grad_and_vars looks like the following:
# ((grad0_gpu0, var0_gpu0), ... , (grad0_gpuN, var0_gpuN))
grads = []
for g, _ in grad_and_vars:
#每个循环获取的元素grad_and_vars,是同一个Variable的梯度在不同GPU计算的副本,需要计算其梯度的均值
#如果这个梯度是N维的向量,需要在每个维度上都进行平均。
# Add 0 dimension to the gradients to represent the tower.
#先使用tf.expand_dims给这些梯度添加一个冗余的维度0,然后把这些梯度都放到列表grad中。
expanded_g = tf.expand_dims(g, 0)

# Append on a 'tower' dimension which we will average over below.
#然后把这些梯度都放到列表grad中。
grads.append(expanded_g)

# Average over the 'tower' dimension.
#接着使用tf.concat将他们在维度0合并
grad = tf.concat(grads, 0)
#最后使用tf.reduce_mean针对维度0上求平均,即将其他维度全部平均。
grad = tf.reduce_mean(grad, 0)

#最后将平均后的梯度根Variable组合得到原有的二元组(梯度,变量)格式,并添加到列表average_grads
v = grad_and_vars[0][1]
grad_and_var = (grad, v)
average_grads.append(grad_and_var)
#当所有梯度都求完均值后,返回average_grads
return average_grads


def train():

#先设置默认的计算设备为CPU,用来计算一些简单的计算
with tf.Graph().as_default(), tf.device('/cpu:0'):
# Create a variable to count the number of train() calls. This equals the
# number of batches processed * FLAGS.num_gpus.
#然后使用global_step记录全局训练的步数
global_step = tf.get_variable(
'global_step', [],
initializer=tf.constant_initializer(0), trainable=False)

# Calculate the learning rate schedule.
#并计算一个epoch对应的batch数,
# 以及学习速率衰减需要的步数decay_steps.
num_batches_per_epoch = (cifar10.NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN /
batch_size)
decay_steps = int(num_batches_per_epoch * cifar10.NUM_EPOCHS_PER_DECAY)

# Decay the learning rate exponentially based on the number of steps.
#使用tf.train.exponential_decay创建训练步数衰减的学习速率,
#第一个参数为初始学习速率
#第二个参数为全局训练的步数
#第三个参数为每次衰减需要的步数,
#第四个参数:为衰减率
#staircase设置为true,代表的是阶梯是的衰减,

lr = tf.train.exponential_decay(cifar10.INITIAL_LEARNING_RATE,
global_step,
decay_steps,
cifar10.LEARNING_RATE_DECAY_FACTOR,
staircase=True)

# Create an optimizer that performs gradient descent.
# 然后设置优化算法为GradientDescent,并传入随步数衰减的学习速率
opt = tf.train.GradientDescentOptimizer(lr)

# Calculate the gradients for each model tower.
#定义存储个GPU 计算结果的列表tower_grads
tower_grads = []
#创建一个循环,循环次数为GPU的数量
for i in range(num_gpus):
#在每一个循环内,使用tf.device限定使用第几个GPU,如gpu0,gpu1,
with tf.device('/gpu:%d' % i):
#然后使用tf.name_scope将命名空间定义为tower0,tower1的形式
with tf.name_scope('%s_%d' % (cifar10.TOWER_NAME, i)) as scope:
#对每一个GPU,使用前面定义好的函数tower_loss获取其损失,
loss = tower_loss(scope)

# Reuse variables for the next tower.
#重用参数,让所有GPU共用一个模型以及完全相同的参数。
tf.get_variable_scope().reuse_variables()

# Calculate the gradients for the batch of data on this CIFAR tower.
#在使用opt.compute_gradients计算单个GPU的梯度
grads = opt.compute_gradients(loss)

# Keep track of the gradients across all towers.
#并将求得的梯度添加到梯度列表。
tower_grads.append(grads)

#使用前面写好的函数,average_gradients计算平均梯度
grads = average_gradients(tower_grads)

#并用opt.apply_gradients更新模型参数
apply_gradient_op = opt.apply_gradients(grads, global_step=global_step)
#这样就完成了多GPU的同步训练核参数更新

# Create a saver.
#创建模型的保存器saver.
saver = tf.train.Saver(tf.all_variables())

# Build an initialization operation to run below.
init = tf.global_variables_initializer()

#Sessionallow_soft_placement=True(有些操作在CPU上操作,不使用allow_soft_placement,会报错)
sess = tf.Session(config=tf.ConfigProto(allow_soft_placement=True))
#初始化全部参数
sess.run(init)

# Start the queue runners.准备号大量的数据增强后的训练样本,防止后妈的训练被阻塞在生成样本上。
#tf.train.start_queue_runners
tf.train.start_queue_runners(sess=sess)

#进行训练的循环,最大迭代次数max_steps
for step in range(max_steps):
start_time = time.time()
#在每一步中执行一次更新梯度的操作apply_gradient_op(即一次训练操作)
_, loss_value = sess.run([apply_gradient_op, loss])
duration = time.time() - start_time#计算时间

# assert not np.isnan(loss_value), 'Model diverged with loss = NaN'
#每隔10步,展示一次当前batchloss,以及每秒钟可训练的样本数核每个batch训练所需要话费的时间
if step % 10 == 0:
num_examples_per_step = batch_size * num_gpus
examples_per_sec = num_examples_per_step / duration
sec_per_batch = duration / num_gpus

format_str = ('step %d, loss = %.2f (%.1f examples/sec; %.3f '
'sec/batch)')
print (format_str % (step, loss_value,
examples_per_sec, sec_per_batch))

#每个1000步,使用saver保存整个模型文件
if step % 1000 == 0 or (step + 1) == max_steps:
# checkpoint_path = os.path.join(train_dir, 'model.ckpt')
saver.save(sess, './model/model.ckpt', global_step=step)



#使用cifar10.maybe_download_and_extract()下载完整的CIFAR-10数据,
cifar10.maybe_download_and_extract()
#调用train()函数开始训练
train()




    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多