摘要:本系列主要对 tf 的一些常用概念与方法进行描述。本文主要针对tensorflow的模型训练 Training 与测试 Testing 等相关函数进行讲解。为‘Tensorflow 一些常用基本概念与函数’系列之四。
1、序言
本文所讲的内容主要为以下列表中相关函数。函数 training() 通过梯度下降法为最小化损失函数增加了相关的优化操作,在训练过程中,先实例化一个优化函数,比如 tf.train.GradientDescentOptimizer,并基于一定的学习率进行梯度优化训练:
1. optimizer = tf.train.GradientDescentOptimizer(learning_rate)
然后,可以设置 一个用于记录全局训练步骤的单值。以及使用 minimize()操作,该操作不仅可以优化更新训练的模型参数,也可以为全局步骤 (global step) 计数。与其他 tensorflow 操作类似,这些训练操作都需要在tf.session会话中进行
1. global_step = tf.Variable(0, name='global_step', trainable=False)
2. train_op = optimizer.minimize(loss, global_step=global_step)
操作组 | 操作 |
---|---|
Training | Optimizers,Gradient Computation,Gradient Clipping,Distributed execution |
Testing | Unit tests,Utilities,Gradient checking |
2、Tensorflow 函数
2.1 训练 (Training)
一个 TFRecords 文件为一个字符串序列。这种格式并非随机获取,它比较适合大规模的数据流,而不太适合需要快速分区或其他非序列获取方式。
█ 优化 (Optimizers)
tf 中各种优化类提供了为损失函数计算梯度的方法,其中包含比较经典的优化算法,比如 GradientDescent 和 Adagrad。
class tf.train.Optimizer
操作 | 描述 |
---|---|
class tf.train.Optimizer | 基本的优化类,该类不常常被直接调用,而较多使用其子类, |
比如 GradientDescentOptimizer, AdagradOptimizer
或者 MomentumOptimizer |
| tf.train.Optimizer.init(use_locking, name) | 创建一个新的优化器,
该优化器必须被其子类 (subclasses) 的构造函数调用 |
| tf.train.Optimizer.minimize(loss, global_step=None,
var_list=None, gate_gradients=1,
aggregation_method=None, colocate_gradients_with_ops=False,
name=None, grad_loss=None) | 添加操作节点,用于最小化 loss,并更新 var_list
该函数是简单的合并了 compute_gradients() 与 apply_gradients() 函数
返回为一个优化更新后的 var_list,如果 global_step 非 None,该操作还会为 global_step 做自增操作 |
| tf.train.Optimizer.compute_gradients(loss,var_list=None, gate_gradients=1,
aggregation_method=None,
colocate_gradients_with_ops=False, grad_loss=None) | 对 var_list 中的变量计算 loss 的梯度
该函数为函数 minimize()的第一部分,返回一个以元组 (gradient, variable) 组成的列表 |
| tf.train.Optimizer.apply_gradients(grads_and_vars, global_step=None, name=None) | 将计算出的梯度应用到变量上,是函数 minimize() 的第二部分,返回一个应用指定的梯度的操作 Operation,对 global_step 做自增操作 |
| tf.train.Optimizer.get_name() | 获取名称 |
▷ class tf.train.Optimizer
用法
1. \# Create an optimizer with the desired parameters.
2. opt = GradientDescentOptimizer(learning_rate=0.1)
3. \# Add Ops to the graph to minimize a cost by updating a list of variables.
4. \# "cost" is a Tensor, and the list of variables contains tf.Variable objects.
5. opt_op = opt.minimize(cost, var_list=<list of variables>)
6. \# Execute opt_op to do one step of training:
7. opt_op.run()
在使用它们之前处理梯度
使用 minimize() 操作,该操作不仅可以计算出梯度,而且还可以将梯度作用在变量上。如果想在使用它们之前处理梯度,可以按照以下三步骤使用 optimizer
1、使用函数 compute_gradients() 计算梯度
2、按照自己的愿望处理梯度
3、使用函数 apply_gradients() 应用处理过后的梯度
例如:
1. \# 创建一个optimizer.
2. opt = GradientDescentOptimizer(learning_rate=0.1)
4. \# 计算<list of variables>相关的梯度
5. grads\_and\_vars = opt.compute_gradients(loss, <list of variables>)
7. \# grads\_and\_vars为tuples (gradient, variable)组成的列表。
8. #对梯度进行想要的处理,比如cap处理
9. capped\_grads\_and_vars = \[(MyCapper(gv\[0\]), gv\[1\]) for gv in grads\_and\_vars\]
11. \# 令optimizer运用capped的梯度(gradients)
12. opt.apply_gradients(capped\_grads\_and_vars)
选通梯度 (Gating Gradients)
函数 minimize() 与 compute_gradients() 都含有一个参数 gate_gradient,用于控制在应用这些梯度时并行化的程度。
其值可以取:GATE_NONE, GATE_OP 或 GATE_GRAPH
GATE_NONE : 并行地计算和应用梯度。提供最大化的并行执行,但是会导致有的数据结果没有再现性。比如两个 matmul 操作的梯度依赖输入值,使用 GATE_NONE 可能会出现有一个梯度在其他梯度之前便应用到某个输入中,导致出现不可再现的 (non-reproducible) 结果
GATE_OP: 对于每个操作 Op,确保每一个梯度在使用之前都已经计算完成。这种做法防止了那些具有多个输入,并且梯度计算依赖输入情形中,多输入 Ops 之间的竞争情况出现。
GATE_GRAPH: 确保所有的变量对应的所有梯度在他们任何一个被使用前计算完成。该方式具有最低级别的并行化程度,但是对于想要在应用它们任何一个之前处理完所有的梯度计算时很有帮助的。
█ Slots
一些 optimizer 的之类,比如 MomentumOptimizer 和 AdagradOptimizer 分配和管理着额外的用于训练的变量。这些变量称之为’Slots’,Slots 有相应的名称,可以向 optimizer 访问的 slots 名称。有助于在 log debug 一个训练算法以及报告 slots 状态
操作 | 描述 |
---|---|
tf.train.Optimizer.get_slot_names() | 返回一个由 Optimizer 所创建的 slots 的名称列表 |
tf.train.Optimizer.get_slot(var, name) | 返回一个 name 所对应的 slot,name 是由 Optimizer 为 var 所创建 |
var 为用于传入 minimize() 或 apply_gradients() 的变量 | |
class tf.train.GradientDescentOptimizer | 使用梯度下降算法的 Optimizer |
| tf.train.GradientDescentOptimizer.init(learningrate,
uselocking=False, name=’GradientDescent’) | 构建一个新的梯度下降优化器 (Optimizer) |
| class tf.train.AdadeltaOptimizer | 使用Adadelta 算法的 Optimizer |
| tf.train.AdadeltaOptimizer.init(learningrate=0.001,
rho=0.95, epsilon=1e-08,
uselocking=False, name=’Adadelta’) | 创建 Adadelta 优化器 |
| class tf.train.AdagradOptimizer | 使用Adagrad 算法的 Optimizer |
| tf.train.AdagradOptimizer.init(learningrate,
initialaccumulatorvalue=0.1,
uselocking=False, name=’Adagrad’) | 创建 Adagrad 优化器 |
| class tf.train.MomentumOptimizer | 使用Momentum算法的 Optimizer |
| tf.train.MomentumOptimizer.__init(learning_rate,
momentum, use_locking=False,
name=’Momentum’, use_nesterov=False) | 创建 momentum 优化器
momentum:动量,一个 tensor 或者浮点值 |
| class tf.train.AdamOptimizer | 使用Adam 算法的 Optimizer |
| tf.train.AdamOptimizer.__init(learning_rate=0.001,
beta1=0.9, beta2=0.999, epsilon=1e-08,
use_locking=False, name=’Adam’) | 创建 Adam 优化器 |
| class tf.train.FtrlOptimizer | 使用FTRL 算法的 Optimizer |
| tf.train.FtrlOptimizer.__init(learning_rate,
learning_rate_power=-0.5,
initial_accumulator_value=0.1,
l1_regularization_strength=0.0,
l2_regularization_strength=0.0,
use_locking=False, name=’Ftrl’) | 创建 FTRL 算法优化器 |
| class tf.train.RMSPropOptimizer | 使用RMSProp 算法的 Optimizer |
| tf.train.RMSPropOptimizer.__init(learning_rate,
decay=0.9, momentum=0.0, epsilon=1e-10,
use_locking=False, name=’RMSProp’) | 创建 RMSProp 算法优化器 |
▷ tf.train.AdamOptimizer
Adam 的基本运行方式,首先初始化:
1. m_0 <- 0 (Initialize initial 1st moment vector)
2. v_0 <- 0 (Initialize initial 2nd moment vector)
3. t <- 0 (Initialize timestep)
在论文中的 section2 的末尾所描述了更新规则,该规则使用梯度 g 来更新变量:
1. t <- t + 1
2. lr_t <- learning_rate * sqrt(1 - beta2^t) / (1 - beta1^t)
4. m_t <- beta1 * m_{t-1} + (1 - beta1) * g
5. v_t <- beta2 * v_{t-1} + (1 - beta2) * g * g
6. variable <- variable - lr_t * m_t / (sqrt(v_t) + epsilon)
其中 epsilon 的默认值 1e-8 可能对于大多数情况都不是一个合适的值。例如,当在 ImageNet 上训练一个 Inception network 时比较好的选择为 1.0 或者 0.1。
需要注意的是,在稠密数据中即便 g 为 0 时, m_t, v_t 以及 variable 都将会更新。而在稀疏数据中,m_t, v_t 以及 variable 不被更新且值为零。
█ 梯度计算与截断 (Gradient Computation and Clipping)
TensorFlow 提供了计算给定 tf 计算图的求导函数,并在图的基础上增加节点。优化器 (optimizer) 类可以自动的计算网络图的导数,但是优化器中的创建器 (creators ) 或者专业的人员可以通过本节所述的函数调用更底层的方法。
操作 | 描述 |
---|---|
| tf.gradients(ys, xs, gradys=None, name=’gradients’,
colocate_gradients_with_ops=False, gate_gradients=False,
aggregation_method=None) | 构建一个符号函数,计算 ys 关于 xs 中 x 的偏导的和,
返回 xs 中每个 x 对应的 sum(dy/dx) |
| tf.stop_gradient(input, name=None) | 停止计算梯度,
在 EM 算法、Boltzmann 机等可能会使用到 |
| tf.clip_by_value(t, clip_value_min, clip_value_max, name=None) | 基于定义的 min 与 max 对 tesor 数据进行截断操作,
目的是为了应对梯度爆发或者梯度消失的情况 |
| tf.clip_by_norm(t, clip_norm, axes=None, name=None) | 使用 L2 范式标准化 tensor 最大值为 clip_norm
返回 t clipnorm / l2norm(t) |
| tf.clip_by_average_norm(t, clip_norm, name=None) | 使用平均 L2 范式规范 tensor 数据 t,
并以 clip_norm 为最大值
返回 t clip_norm / l2norm_avg(t) |
| tf.clip_by_global_norm(t_list,
clip_norm, use_norm=None, name=None) | 返回 t_list[i] clip_norm / max(global_norm, clip_norm)
其中 global_norm = sqrt(sum([l2norm(t)*2 for t in t_list])) |
| tf.global_norm(t_list, name=None) | 返回 global_norm = sqrt(sum([l2norm(t)2 for t in t_list])) |
█ 退化学习率 (Decaying the learning rate)
操作 | 描述 |
---|---|
| tf.train.exponential_decay(learning_rate, global_step,
decay_steps, decay_rate, staircase=False, name=None) | 对学习率进行指数衰退 |
▷ tf.train.exponential_decay
1. #该函数返回以下结果
2. decayed\_learning\_rate = learning_rate *
3. decay_rate ^ (global_step / decay_steps)
4. ##例: 以0.96为基数,每100000 步进行一次学习率的衰退
5. global_step = tf.Variable(0, trainable=False)
6. starter\_learning\_rate = 0.1
7. learning_rate = tf.train.exponential_decay(starter\_learning\_rate, global_step,
8. 100000, 0.96, staircase=True)
9. \# Passing global_step to minimize() will increment it at each step.
10. learning_step = (
11. tf.train.GradientDescentOptimizer(learning_rate)
12. .minimize(...my loss..., global_step=global_step)
13. )
█ 移动平均 (Moving Averages)
一些训练优化算法,比如 GradientDescent 和 Momentum 在优化过程中便可以使用到移动平均方法。使用移动平均常常可以较明显地改善结果。
操作 | 描述 |
---|---|
class tf.train.ExponentialMovingAverage | 将指数衰退加入到移动平均中 |
tf.train.ExponentialMovingAverage.apply(var_list=None) | 对 var_list 变量保持移动平均 |
tf.train.ExponentialMovingAverage.average_name(var) | 返回 var 均值的变量名称 |
tf.train.ExponentialMovingAverage.average(var) | 返回 var 均值变量 |
tf.train.ExponentialMovingAverage.variables_to_restore(moving_avg_variables=None) | 返回用于保存的变量名称的映射 |
▷ tf.train.ExponentialMovingAverage
1. \# Example usage when creating a training model:
2. \# Create variables.
3. var0 = tf.Variable(...)
4. var1 = tf.Variable(...)
5. \# ... use the variables to build a training model...
6. ...
7. \# Create an op that applies the optimizer. This is what we usually
8. \# would use as a training op.
9. opt_op = opt.minimize(my_loss, \[var0, var1\])
11. \# Create an ExponentialMovingAverage object
12. ema = tf.train.ExponentialMovingAverage(decay=0.9999)
14. \# Create the shadow variables, and add ops to maintain moving averages
15. \# of var0 and var1.
16. maintain\_averages\_op = ema.apply(\[var0, var1\])
18. \# Create an op that will update the moving averages after each training
19. \# step. This is what we will use in place of the usual training op.
20. with tf.control_dependencies(\[opt_op\]):
21. training_op = tf.group(maintain\_averages\_op)
23. ...train the model by running training_op...
25. #Example of restoring the shadow variable values:
26. \# Create a Saver that loads variables from their saved shadow values.
27. shadow\_var0\_name = ema.average_name(var0)
28. shadow\_var1\_name = ema.average_name(var1)
29. saver = tf.train.Saver({shadow\_var0\_name: var0, shadow\_var1\_name: var1})
30. saver.restore(...checkpoint filename...)
31. \# var0 and var1 now hold the moving average values
▷ tf.train.ExponentialMovingAverage.variables_to_restore
1. variables\_to\_restore = ema.variables\_to\_restore()
2. saver = tf.train.Saver(variables\_to\_restore)
█ 协调器和队列运行器 (Coordinator and QueueRunner)
查看queue中,queue 相关的内容,了解 tensorflow 中队列的运行方式。
操作 | 描述 |
---|---|
class tf.train.Coordinator | 线程的协调器 |
tf.train.Coordinator.clear_stop() | 清除停止标记 |
tf.train.Coordinator.join(threads=None, stop_grace_period_secs=120) | 等待线程终止 |
threads: 一个 threading.Threads 的列表,启动的线程,将额外加入到 registered 的线程中 | |
tf.train.Coordinator.register_thread(thread) | Register 一个用于 join 的线程 |
tf.train.Coordinator.request_stop(ex=None) | 请求线程结束 |
tf.train.Coordinator.should_stop() | 检查是否被请求停止 |
tf.train.Coordinator.stop_on_exception() | 上下文管理器,当一个例外出现时请求停止 |
tf.train.Coordinator.wait_for_stop(timeout=None) | 等待 Coordinator 提示停止进程 |
class tf.train.QueueRunner | 持有一个队列的入列操作列表,用于线程中运行 |
queue: 一个队列
enqueue_ops: 用于线程中运行的入列操作列表 |
| tf.train.QueueRunner.create_threads(sess,
coord=None, daemon=False, start=False) | 创建运行入列操作的线程,返回一个线程列表 |
| tf.train.QueueRunner.from_proto(queue_runner_def) | 返回由 queue_runner_def 创建的 QueueRunner 对象 |
| tf.train.add_queue_runner(qr, collection=’queue_runners’) | 增加一个 QueueRunner 到 graph 的收集器 (collection ) 中 |
| tf.train.start_queue_runners(sess=None, coord=None, daemon=True, start=True, collection=’queue_runners’) | 启动所有 graph 收集到的队列运行器 (queue runners) |
▷ class tf.train.Coordinator
1. #Coordinator的使用,用于多线程的协调
2. try:
3. ...
4. coord = Coordinator()
5. \# Start a number of threads, passing the coordinator to each of them.
6. ...start thread 1...(coord, ...)
7. ...start thread N...(coord, ...)
8. \# Wait for all the threads to terminate, give them 10s grace period
9. coord.join(threads, stop\_grace\_period_secs=10)
10. except RuntimeException:
11. ...one of the threads took more than 10s to stop after request_stop()
12. ...was called.
13. except Exception:
14. ...exception that was passed to coord.request_stop()
▷ tf.train.Coordinator.stop_on_exception()
1. with coord.stop\_on\_exception():
2. \# Any exception raised in the body of the with
3. \# clause is reported to the coordinator before terminating
4. \# the execution of the body.
5. ...body...
6. #等价于
7. try:
8. ...body...
9. exception Exception as ex:
10. coord.request_stop(ex)
█ 布执行 (Distributed execution)
可以阅读TensorFlow 的分布式学习框架简介 查看更多 tensorflow 分布式细节。
操作 | 描述 |
---|---|
class tf.train.Server | 一个进程内的 tensorflow 服务,用于分布式训练 |
| tf.train.Server.init(serverorclusterdef,
jobname=None, task_index=None, protocol=None,
config=None, start=True) | 创建一个新的服务,其中 job_name, task_index,
和 protocol 为可选参数,
优先级高于 server_or_cluster_def 中相关信息
server_or_cluster_def : 为一个 tf.train.ServerDef
或 tf.train.ClusterDef 协议 (protocol) 的 buffer,
或者一个 tf.train.ClusterSpec 对象 |
| tf.train.Server.create_local_server(config=None, start=True) | 创建一个新的运行在本地主机的单进程集群 |
| tf.train.Server.target | 返回 tf.Session 所连接的目标服务器 |
| tf.train.Server.server_def | 返回该服务的 tf.train.ServerDef |
| tf.train.Server.start() | 开启服务 |
| tf.train.Server.join() | 阻塞直到服务已经关闭 |
| # | |
| —- | —- |
| class tf.train.Supervisor | 一个训练辅助器,用于 checkpoints 模型以及计算的 summaries。该监视器只是一个小的外壳 (wrapper), 用于 Coordinator, a Saver, 和 a SessionManager 周围 |
| tf.train.Supervisor.__init(graph=None, ready_op=0, is_chief=True, init_op=0, init_feed_dict=None, local_init_op=0, logdir=None,
summary_op=0, saver=0, global_step=0,
save_summaries_secs=120, save_model_secs=600,
recovery_wait_secs=30, stop_grace_secs=120,
checkpoint_basename=’model.ckpt’, session_manager=None, summary_writer=0, init_fn=None) | 创建一个监视器 Supervisor |
| tf.train.Supervisor.managed_session(master=”, config=None, start_standard_services=True, close_summary_writer=True) | 返回一个管路 session 的上下文管理器 |
| tf.train.Supervisor.prepare_or_wait_for_session(master=”, config=None, wait_for_checkpoint=False, max_wait_secs=7200, start_standard_services=True) | 确保 model 已经准备好 |
| tf.train.Supervisor.start_standard_services(sess) | 为 sess 启动一个标准的服务 |
| tf.train.Supervisor.start_queue_runners(sess, queue_runners=None) | 为 QueueRunners 启动一个线程,queue_runners 为一个 QueueRunners 列表 |
| tf.train.Supervisor.summary_computed(sess, summary, global_step=None) | 指示计算的 summary |
| tf.train.Supervisor.stop(threads=None, close_summary_writer=True) | 停止服务以及协调器 (coordinator), 并没有关闭 session |
| tf.train.Supervisor.request_stop(ex=None) | 参考 Coordinator.request_stop() |
| tf.train.Supervisor.should_stop() | 参考 Coordinator.should_stop() |
| tf.train.Supervisor.stop_on_exception() | 参考 Coordinator.stop_on_exception() |
| tf.train.Supervisor.Loop(timer_interval_secs, target, args=None, kwargs=None) | 开启一个循环器线程用于调用一个函数
每经过 timer_interval_secs 秒执行,target(args, *kwargs) |
| tf.train.Supervisor.coord | 返回监督器 (Supervisor) 使用的协调器(Coordinator ) |
| # | |
| —- | —- |
| class tf.train.SessionManager | 训练的辅助器,用于从 checkpoint 恢复数据以及创建一个 session |
| tf.train.SessionManager.__init(local_init_op=None, ready_op=None, graph=None, recovery_wait_secs=30) | 创建一个 SessionManager |
| tf.train.SessionManager.prepare_session(master, init_op=None, saver=None, checkpoint_dir=None, wait_for_checkpoint=False, max_wait_secs=7200, config=None, init_feed_dict=None, init_fn=None) | 创建一个 session,并确保 model 可以被使用 |
| tf.train.SessionManager.recover_session(master, saver=None, checkpoint_dir=None, wait_for_checkpoint=False, max_wait_secs=7200, config=None) | 创建一个 session,如果可以的话,使用恢复方法创建 |
| tf.train.SessionManager.wait_for_session(master, config=None, max_wait_secs=inf) | 创建一个 session,并等待 model 准备完成 |
| # | |
| —- | —- |
| class tf.train.ClusterSpec | 将一个集群表示为一系列 “tasks”,并整合至“jobs” 中 |
| tf.train.ClusterSpec.as_cluster_def() | 返回该 cluster 中一个 tf.train.ClusterDef 协议的 buffer |
| tf.train.ClusterSpec.as_dict() | 返回一个字典,由 job 名称对应于网络地址 |
| tf.train.ClusterSpec.job_tasks(job_name) | 返回一个给定的 job 对应的 task 列表 |
| tf.train.ClusterSpec.jobs | 返回该 cluster 的 job 名称列表 |
| tf.train.replica_device_setter(ps_tasks=0, ps_device=’/job:ps’, worker_device=’/job:worker’, merge_devices=True, cluster=None, ps_ops=None) | 返回一个设备函数 (device function),以在建立一个副本 graph 的时候使用,设备函数(device function) 用在 with tf.device(device_function)中 |
▷ tf.train.Server
1. server = tf.train.Server(...)
2. with tf.Session(server.target):
3. \# ...
▷ tf.train.Supervisor
相关参数:
ready_op : 一维 字符串 tensor。该 tensor 是用过监视器在 prepare_or_wait_for_session() 计算,检查 model 是否准备好可以使用。如果准备好,将返回一个空阵列,如果为 None,该 model 没有被检查。
is_chief : 如果为 True,创建一个主监视器用于负责初始化与模型的恢复,若为 False,则依赖主监视器。
init_op : 一个操作,用于模型不能恢复时的初始化操作。默认初始化所有操作
local_init_op : 可被所有监视器运行的初始化操作。
logdir : 设置 log 目录
summary_op : 一个操作 (Operation),返回 Summary 和事件 logs,需要设置 logdir
saver : 一个 Saver 对象
save_summaries_secs : 保存 summaries 的间隔秒数
save_model_secs : 保存 model 的间隔秒数
checkpoint_basename : checkpoint 保存的基本名称
- 使用在单进程中
1. with tf.Graph().as_default():
2. ...add operations to the graph...
3. \# Create a Supervisor that will checkpoint the model in '/tmp/mydir'.
4. sv = Supervisor(logdir='/tmp/mydir')
5. \# Get a TensorFlow session managed by the supervisor.
6. with sv.managed_session(FLAGS.master) as sess:
7. \# Use the session to train the graph.
8. while not sv.should_stop():
9. sess.run(<my\_train\_op>)
10. \# 在上下文管理器with sv.managed_session()内,所有在graph的变量都被初始化。
11. \# 或者说,一些服务器checkpoint相应模型并增加summaries至事件log中。
12. \# 如果有例外发生,should_stop()将返回True
使用在多副本运行情况中
要使用副本训练已经部署在集群上的相同程序,必须指定其中一个 task 为主要,该 task 处理 initialization, checkpoints, summaries, 和 recovery 相关事物。其他 task 依赖该 task。
1. \# Choose a task as the chief. This could be based on server\_def.task\_index,
2. \# or job\_def.name, or job\_def.tasks. It's entirely up to the end user.
3. \# But there can be only one \*chief\*.
4. is_chief = (server_def.task_index == 0)
5. server = tf.train.Server(server_def)
7. with tf.Graph().as_default():
8. ...add operations to the graph...
9. \# Create a Supervisor that uses log directory on a shared file system.
10. \# Indicate if you are the 'chief'
11. sv = Supervisor(logdir='/shared_directory/...', is_chief=is_chief)
12. \# Get a Session in a TensorFlow server on the cluster.
13. with sv.managed_session(server.target) as sess:
14. \# Use the session to train the graph.
15. while not sv.should_stop():
16. sess.run(<my\_train\_op>)
如果有 task 崩溃或重启,managed_session() 将检查是否 Model 被初始化。如果已经初始化,它只需要创建一个 session 并将其返回至正在训练的正常代码中。如果 model 需要被初始化,主 task 将对它进行重新初始化,而其他 task 将等待模型初始化完成。
注意:该程序方法一样适用于单进程的 work,该单进程标注自己为主要的便行
▷ supervisor 中 master 的字符串形式
无论运行在本机或者集群上,都可以使用以下值设定 master flag:
- 定义为 ” ,要求一个进程内且没有使用 RPC 的 session
- 定义为 ‘local’,要求一个使用基于 RPC 的主服务接口 (“Master interface” ) 的 session 来运行 tensorflow 程序。更多细节可以查看 tf.train.Server.create_local_server() 相关内容。
- 定义为 ‘grpc://hostname:port’,要求一个指定的 RPC 接口的 session,同时运行内部进程的 master 接入远程的 tensorflow workers。可用 server.target 返回该形式
▷ supervisor 高级用法
- 启动额外的服务
managed_session() 启动了 Checkpoint 和 Summary 服务。如果需要运行更多的服务,可以在 managed_session() 控制的模块中启动他们。
1. #例如: 开启一个线程用于打印loss. 设置每60秒该线程运行一次,我们使用sv.loop()
2. ...
3. sv = Supervisor(logdir='/tmp/mydir')
4. with sv.managed_session(FLAGS.master) as sess:
5. sv.loop(60, print_loss, (sess))
6. while not sv.should_stop():
7. sess.run(my\_train\_op)
启动更少的的服务
managed_session() 启动了 “summary” 和 “checkpoint” 线程,这些线程通过构建器或者监督器默认自动创建了 summary_op 和 saver 操作。如果想运行自己的 summary 和 checkpointing 方法,关闭这些服务,通过传递 None 值给 summary_op 和 saver 参数。
1. 在chief中每100个step,创建summaries
2. \# Create a Supervisor with no automatic summaries.
3. sv = Supervisor(logdir='/tmp/mydir', is_chief=is_chief, summary_op=None)
4. \# As summary\_op was None, managed\_session() does not start the
5. \# summary thread.
6. with sv.managed_session(FLAGS.master) as sess:
7. for step in xrange(1000000):
8. if sv.should_stop():
9. break
10. if is_chief and step % 100 == 0:
11. \# Create the summary every 100 chief steps.
12. sv.summary_computed(sess, sess.run(my\_summary\_op))
13. else:
14. \# Train normally
15. sess.run(my\_train\_op)
▷ tf.train.Supervisor.managed_session
1. def train():
2. sv = tf.train.Supervisor(...)
3. with sv.managed_session(<master>) as sess:
4. for step in xrange(..):
5. if sv.should_stop():
6. break
7. sess.run(<my training op>)
8. ...do other things needed at each training step...
▷ tf.train.SessionManager
1. with tf.Graph().as_default():
2. ...add operations to the graph...
3. \# Create a SessionManager that will checkpoint the model in '/tmp/mydir'.
4. sm = SessionManager()
5. sess = sm.prepare_session(master, init_op, saver, checkpoint_dir)
6. \# Use the session to train the graph.
7. while True:
8. sess.run(<my\_train\_op>)
9. #其中prepare_session()初始化和恢复一个模型参数。
11. #另一个进程将等待model准备完成,代码如下
12. with tf.Graph().as_default():
13. ...add operations to the graph...
14. \# Create a SessionManager that will wait for the model to become ready.
15. sm = SessionManager()
16. sess = sm.wait\_for\_session(master)
17. \# Use the session to train the graph.
18. while True:
19. sess.run(<my\_train\_op>)
20. #wait\_for\_session()等待一个model被其他进程初始化
▷ tf.train.ClusterSpec
一个 tf.train.ClusterSpec 表示一系列的进程,这些进程都参与分布式 tensorflow 的计算。每一个 tf.train.Server 都在一个独有的集群中构建。
创建一个具有两个 jobs 及其 5 个 tasks 的集群们需要定义从 job 名称列表到网络地址列表之间的映射。
1. cluster = tf.train.ClusterSpec({"worker": \["worker0.example.com:2222",
2. "worker1.example.com:2222",
3. "worker2.example.com:2222"\],
4. "ps": \["ps0.example.com:2222",
5. "ps1.example.com:2222"\]})
▷ tf.train.replica_device_setter
1. \# To build a cluster with two ps jobs on hosts ps0 and ps1, and 3 worker
2. \# jobs on hosts worker0, worker1 and worker2.
3. cluster_spec = {
4. "ps": \["ps0:2222", "ps1:2222"\],
5. "worker": \["worker0:2222", "worker1:2222", "worker2:2222"\]}
6. with tf.device(tf.replica\_device\_setter(cluster=cluster_spec)):
7. \# Build your graph
8. v1 = tf.Variable(...) \# assigned to /job:ps/task:0
9. v2 = tf.Variable(...) \# assigned to /job:ps/task:1
10. v3 = tf.Variable(...) \# assigned to /job:ps/task:0
11. \# Run compute
█ 汇总操作 (Summary Operations)
我们可以在一个 session 中获取 summary 操作的输出,并将其传输到 SummaryWriter 以添加至一个事件记录文件中。
操作 | 描述 |
---|---|
tf.scalar_summary(tags, values, collections=None, name=None) | 输出一个标量值的 summary 协议 buffer |
tag 的 shape 需要与 values 的相同,用来做 summaries 的 tags,为字符串 | |
tf.image_summary(tag, tensor, max_images=3, collections=None, name=None) | 输出一个图像 tensor 的 summary 协议 buffer |
tf.audio_summary(tag, tensor, sample_rate, max_outputs=3, collections=None, name=None) | 输出一个音频 tensor 的 summary 协议 buffer |
tf.histogram_summary(tag, values, collections=None, name=None) | 输出一个直方图的 summary 协议 buffer |
tf.nn.zero_fraction(value, name=None) | 返回 0 在 value 中的小数比例 |
tf.merge_summary(inputs, collections=None, name=None) | 合并 summary |
tf.merge_all_summaries(key=’summaries’) | 合并在默认 graph 中手机的 summaries |
将记录汇总写入文件中 (Adding Summaries to Event Files)
操作 | 描述 |
---|---|
class tf.train.SummaryWriter | 将 summary 协议 buffer 写入事件文件中 |
tf.train.SummaryWriter.init(logdir, graph=None, max_queue=10, flush_secs=120, graph_def=None) | 创建一个 SummaryWriter 实例以及新建一个事件文件 |
tf.train.SummaryWriter.add_summary(summary, global_step=None) | 将一个 summary 添加到事件文件中 |
tf.train.SummaryWriter.add_session_log(session_log, global_step=None) | 添加 SessionLog 到一个事件文件中 |
tf.train.SummaryWriter.add_event(event) | 添加一个事件到事件文件中 |
tf.train.SummaryWriter.add_graph(graph, global_step=None, graph_def=None) | 添加一个 Graph 到时间文件中 |
tf.train.SummaryWriter.add_run_metadata(run_metadata, tag, global_step=None) | 为一个单一的 session.run() 调用添加一个元数据信息 |
tf.train.SummaryWriter.flush() | 刷新时间文件到硬盘中 |
tf.train.SummaryWriter.close() | 将事件问价写入硬盘中并关闭该文件 |
tf.train.summary_iterator(path) | 一个用于从时间文件中读取时间协议 buffer 的迭代器 |
▷ tf.train.SummaryWriter
创建一个 SummaryWriter 和事件文件。如果我们传递一个 Graph 进入该构建器中,它将被添加到事件文件当中,这一点与使用 add_graph() 具有相同功能。
TensorBoard 将从事件文件中提取该 graph,并将其显示。所以我们能直观地看到我们建立的 graph。我们通常从我们启动的 session 中传递 graph:
1. ...create a graph...
2. \# Launch the graph in a session.
3. sess = tf.Session()
4. \# Create a summary writer, add the 'graph' to the event file.
5. writer = tf.train.SummaryWriter(<some-directory>, sess.graph)
▷ tf.train.summary_iterator
1. #打印时间文件中的内容
2. for e in tf.train.summary_iterator(path to events file):
3. print(e)
5. #打印指定的summary值
6. \# This example supposes that the events file contains summaries with a
7. \# summary value tag 'loss'. These could have been added by calling
8. \# \`add_summary()\`, passing the output of a scalar summary op created with
9. \# with: \`tf.scalar\_summary(\['loss'\], loss\_tensor)\`.
10. for e in tf.train.summary_iterator(path to events file):
11. for v in e.summary.value:
12. if v.tag == 'loss':
13. print(v.simple_value)
█ 训练的通用函数及其他 (Training utilities)
操作 | 描述 |
---|---|
tf.train.global_step(sess, global_step_tensor) | 一个用于获取全局 step 的小辅助器 |
tf.train.write_graph(graph_def, logdir, name, as_text=True) | 将一个 graph proto 写入一个文件中 |
# | |
:— | |
class tf.train.LooperThread | 可重复地执行代码的线程 |
tf.train.LooperThread.init(coord, timer_interval_secs, target=None, args=None, kwargs=None) | 创建一个 LooperThread |
tf.train.LooperThread.is_alive() | 返回是否该线程是活跃的 |
tf.train.LooperThread.join(timeout=None) | 等待线程结束 |
tf.train.LooperThread.loop(coord, timer_interval_secs, target, args=None, kwargs=None) | 启动一个 LooperThread,用于周期地调用某个函数 |
调用函数 target(args) | |
tf.py_func(func, inp, Tout, stateful=True, name=None) | 将 python 函数包装成 tf 中操作节点 |
▷ tf.train.global_step
1. \# Creates a variable to hold the global_step.
2. global\_step\_tensor = tf.Variable(10, trainable=False, name='global_step')
3. \# Creates a session.
4. sess = tf.Session()
5. \# Initializes the variable.
6. sess.run(global\_step\_tensor.initializer)
7. print('global_step: %s' % tf.train.global_step(sess, global\_step\_tensor))
9. global_step: 10
▷ tf.train.write_graph
1. v = tf.Variable(0, name='my_variable')
2. sess = tf.Session()
3. tf.train.write_graph(sess.graph_def, '/tmp/my-model', 'train.pbtxt')
▷ tf.py_func
1. #tf.py_func(func, inp, Tout, stateful=True, name=None)
2. #func:为一个python函数
3. #inp:为输入函数的参数,Tensor列表
4. #Tout: 指定func返回的输出的数据类型,是一个列表
5. def my_func(x):
6. \# x will be a numpy array with the contents of the placeholder below
7. return np.sinh(x)
8. inp = tf.placeholder(tf.float32, \[...\])
9. y = py_func(my_func, \[inp\], \[tf.float32\])
2.2 测试 (Testing)
TensorFlow 提供了一个方便的继承 unittest.TestCase 类的方法,该类增加有关 TensorFlow 测试的方法。如下例子:
1. import tensorflow as tf
3. class SquareTest(tf.test.TestCase):
5. def testSquare(self):
6. with self.test_session():
7. x = tf.square(\[2, 3\])
8. self.assertAllEqual(x.eval(), \[4, 9\])
10. if \_\_name\_\_ == '\_\_main\_\_':
11. tf.test.main()
█ 共用 (Utilities)
操作 | 描述 |
---|---|
tf.test.main() | 运行所有的单元测试 |
tf.test.assert_equal_graph_def(actual, expected) | 断言 两个 GraphDefs 是否几乎一样 |
tf.test.get_temp_dir() | 返回测试期间使用的临时目录 |
tf.test.is_built_with_cuda() | 返回是否 Tensorflow 支持 CUDA(GPU) 的 build |
█ 梯度检查 (Gradient checking)
可对比 compute_gradient 和 compute_gradient_error 函数的用法
操作 | 描述 |
---|---|
tf.test.compute_gradient(x, x_shape, y, y_shape, x_init_value=None, delta=0.001, init_targets=None) | 计算并返回理论的和数值的 Jacobian 矩阵 |
tf.test.compute_gradient_error(x, x_shape, y, y_shape, x_init_value=None, delta=0.001, init_targets=None) | 计算梯度的 error。在计算所得的与数值估计的 Jacobian 中 为 dy/dx 计算最大的 error |
相关链接:
[1] 安装 Tensorflow(Linux ubuntu) http://blog.csdn.net/lenbow/article/details/51203526
[2] ubuntu 下 CUDA 编译的 GCC 降级安装 http://blog.csdn.net/lenbow/article/details/51596706
[3] ubuntu 手动安装最新 Nvidia 显卡驱动 http://blog.csdn.net/lenbow/article/details/51683783
[4] Tensorflow 的 CUDA 升级,以及相关配置 http://blog.csdn.net/lenbow/article/details/52118116
[5] 基于 gensim 的 Doc2Vec 简析 http://blog.csdn.net/lenbow/article/details/52120230
[6] TensorFlow 的分布式学习框架简介 http://blog.csdn.net/lenbow/article/details/52130565
https://www.plob.org/article/13424.html