Commit 490142d7 authored by Yuxin Wu's avatar Yuxin Wu

Docs about graph builder

parent a2f60395
...@@ -361,7 +361,6 @@ def autodoc_skip_member(app, what, name, obj, skip, options): ...@@ -361,7 +361,6 @@ def autodoc_skip_member(app, what, name, obj, skip, options):
'Triggerable', 'Triggerable',
'predictor_factory', 'predictor_factory',
'get_predictors', 'get_predictors',
'vs_name_for_predictor',
'RandomCropAroundBox', 'RandomCropAroundBox',
'GaussianDeform', 'GaussianDeform',
'dump_chkpt_vars', 'dump_chkpt_vars',
......
...@@ -7,9 +7,7 @@ import re ...@@ -7,9 +7,7 @@ import re
from six.moves import zip, range from six.moves import zip, range
from ..utils.argtools import memoized from ..utils.argtools import memoized
from ..tfutils.gradproc import FilterNoneGrad
from ..tfutils.common import get_global_step_var, get_op_tensor_name from ..tfutils.common import get_global_step_var, get_op_tensor_name
from ..tfutils.tower import get_current_tower_context
from .training import DataParallelBuilder from .training import DataParallelBuilder
...@@ -17,8 +15,28 @@ __all__ = ['DistributedReplicatedBuilder'] ...@@ -17,8 +15,28 @@ __all__ = ['DistributedReplicatedBuilder']
class DistributedReplicatedBuilder(DataParallelBuilder): class DistributedReplicatedBuilder(DataParallelBuilder):
"""
Graph builder for distributed replicated training.
Each worker process builds the same model on one or more GPUs.
Gradients across GPUs are averaged within the worker,
and get synchronously applied to the global copy of variables located on PS.
Then each worker copy the latest variables from PS back to local.
See https://www.tensorflow.org/performance/benchmarks for details.
Note:
Gradients are not averaged across workers, but applied to PS variables
directly (either with or without locking depending on the optimizer).
"""
def __init__(self, towers, server): def __init__(self, towers, server):
"""
Args:
towers (list[int]): list of GPU ids.
server (tf.train.Server): the server with ps and workers.
The job_name must be 'worker' because 'ps' job doesn't need to
build any graph.
"""
super(DistributedReplicatedBuilder, self).__init__(towers) super(DistributedReplicatedBuilder, self).__init__(towers)
self.server = server self.server = server
server_def = server.server_def server_def = server.server_def
...@@ -146,6 +164,20 @@ class DistributedReplicatedBuilder(DataParallelBuilder): ...@@ -146,6 +164,20 @@ class DistributedReplicatedBuilder(DataParallelBuilder):
return tf.group(*queue_ops, name=name) return tf.group(*queue_ops, name=name)
def build(self, input, get_cost_fn, get_opt_fn): def build(self, input, get_cost_fn, get_opt_fn):
"""
Args:
input (InputSource): the input. Should have been setup.
get_cost_fn ([tf.Tensor] -> tf.Tensor): callable which takes a list of input tensor
and returns a cost tensor
get_opt_fn (-> tf.train.Optimizer): callable which returns an optimizer
Returns:
tf.Operation: the training op
tf.Operation: the op which sync all the local variables from PS.
This op sholud be run before training.
tf.Operation: the op which sync all the local `MODEL_VARIABLES` from PS.
You can choose how often to run it by yourself.
"""
# do this before everything, because they my need global step # do this before everything, because they my need global step
with tf.device(self.param_server_device): with tf.device(self.param_server_device):
gs = get_global_step_var() gs = get_global_step_var()
...@@ -156,21 +188,11 @@ class DistributedReplicatedBuilder(DataParallelBuilder): ...@@ -156,21 +188,11 @@ class DistributedReplicatedBuilder(DataParallelBuilder):
# This makes sure that learning_rate is a global variable (what we expect) # This makes sure that learning_rate is a global variable (what we expect)
get_opt_fn() get_opt_fn()
def get_grads(): get_grad_fn, _ = DataParallelBuilder._make_fn(input, get_cost_fn, get_opt_fn)
ctx = get_current_tower_context()
cost = get_cost_fn(*input.get_input_tensors())
varlist = ctx.filter_vars_by_vs_name(tf.trainable_variables())
opt = get_opt_fn()
grads = opt.compute_gradients(
cost, var_list=varlist,
gate_gradients=False, colocate_gradients_with_ops=True)
grads = FilterNoneGrad().process(grads)
return grads
# Ngpu * Nvar * 2 # Ngpu * Nvar * 2
grad_list = DataParallelBuilder.build_on_towers( grad_list = DataParallelBuilder.build_on_towers(
self.towers, get_grads, self.towers, get_grad_fn,
devices=self.raw_devices, devices=self.raw_devices,
use_vs=[True] * len(self.towers)) # open vs at each tower use_vs=[True] * len(self.towers)) # open vs at each tower
DataParallelBuilder._check_grad_list(grad_list) DataParallelBuilder._check_grad_list(grad_list)
...@@ -180,7 +202,7 @@ class DistributedReplicatedBuilder(DataParallelBuilder): ...@@ -180,7 +202,7 @@ class DistributedReplicatedBuilder(DataParallelBuilder):
ps_var_grads = DistributedReplicatedBuilder._apply_shadow_vars(avg_grads) ps_var_grads = DistributedReplicatedBuilder._apply_shadow_vars(avg_grads)
var_update_ops = self._apply_gradients_and_copy( var_update_ops = self._apply_gradients_and_copy(
get_opt_fn(), grad_list, ps_var_grads) get_opt_fn(), grad_list, ps_var_grads)
self._shadow_vars = [v for (_, v) in ps_var_grads] self._shadow_vars = [v for (__, v) in ps_var_grads]
self._shadow_model_vars = DistributedReplicatedBuilder._shadow_model_variables(self._shadow_vars) self._shadow_model_vars = DistributedReplicatedBuilder._shadow_model_variables(self._shadow_vars)
# TODO add options to synchronize less # TODO add options to synchronize less
......
...@@ -37,10 +37,10 @@ class SimpleBuilder(GraphBuilder): ...@@ -37,10 +37,10 @@ class SimpleBuilder(GraphBuilder):
def build(self, input, get_cost_fn, get_opt_fn): def build(self, input, get_cost_fn, get_opt_fn):
""" """
Args: Args:
input (InputSource): should have been setup already input (InputSource): the input. Should have been setup.
get_cost_fn ([tf.Tensor] -> tf.Tensor): a callable, get_cost_fn ([tf.Tensor] -> tf.Tensor): callable which takes a list of input tensor
taking several tensors as input and returns a cost tensor. and returns a cost tensor
get_opt_fn (None -> tf.train.Optimizer): a callable that returns an optimizer get_opt_fn (-> tf.train.Optimizer): callable which returns an optimizer
Returns: Returns:
tf.Operation: the training op tf.Operation: the training op
...@@ -62,7 +62,7 @@ class DataParallelBuilder(GraphBuilder): ...@@ -62,7 +62,7 @@ class DataParallelBuilder(GraphBuilder):
def __init__(self, towers): def __init__(self, towers):
""" """
Args: Args:
towers(list[int]): list of GPU relative ids. towers(list[int]): list of GPU ids.
""" """
if len(towers) > 1: if len(towers) > 1:
logger.info("Training a model of {} towers".format(len(towers))) logger.info("Training a model of {} towers".format(len(towers)))
...@@ -88,11 +88,12 @@ class DataParallelBuilder(GraphBuilder): ...@@ -88,11 +88,12 @@ class DataParallelBuilder(GraphBuilder):
def build_on_towers( def build_on_towers(
towers, func, devices=None, use_vs=None): towers, func, devices=None, use_vs=None):
""" """
Run `func` on all towers. Run `func` on all GPUs (towers) and return the results.
Args: Args:
towers (list[int]): a list of GPU id.
func: a lambda to be called inside each tower func: a lambda to be called inside each tower
devices: a list of devices to be used. By default will use GPUs in ``towers``. devices: a list of devices to be used. By default will use '/gpu:{tower}'
use_vs (list[bool]): list of use_vs to passed to TowerContext use_vs (list[bool]): list of use_vs to passed to TowerContext
Returns: Returns:
...@@ -115,10 +116,7 @@ class DataParallelBuilder(GraphBuilder): ...@@ -115,10 +116,7 @@ class DataParallelBuilder(GraphBuilder):
is_training=True, is_training=True,
index=idx, index=idx,
use_vs=usevs): use_vs=usevs):
if idx == t: logger.info("Building graph for training tower {} on device {}...".format(idx, device))
logger.info("Building graph for training tower {}...".format(idx))
else:
logger.info("Building graph for training tower {} on device {}...".format(idx, device))
# When use_vs is True, use LOCAL_VARIABLES, # When use_vs is True, use LOCAL_VARIABLES,
# so these duplicated variables won't be saved by default. # so these duplicated variables won't be saved by default.
...@@ -131,11 +129,46 @@ class DataParallelBuilder(GraphBuilder): ...@@ -131,11 +129,46 @@ class DataParallelBuilder(GraphBuilder):
restore_collection(backup) restore_collection(backup)
return ret return ret
@staticmethod
def _make_fn(input, get_cost_fn, get_opt_fn):
# internal use only
get_opt_fn = memoized(get_opt_fn)
def get_grad_fn():
ctx = get_current_tower_context()
cost = get_cost_fn(*input.get_input_tensors())
varlist = ctx.filter_vars_by_vs_name(tf.trainable_variables())
opt = get_opt_fn()
grads = opt.compute_gradients(
cost, var_list=varlist,
gate_gradients=False, colocate_gradients_with_ops=True)
grads = FilterNoneGrad().process(grads)
return grads
return get_grad_fn, get_opt_fn
class SyncMultiGPUParameterServerBuilder(DataParallelBuilder): class SyncMultiGPUParameterServerBuilder(DataParallelBuilder):
def __init__(self, towers, ps_device): """
Graph builder for data-parallel training in 'ParameterServer' mode.
It builds one tower on each GPU with
shared variable scope. It synchronoizes the gradients computed
from each tower, averages them and applies to the shared variables.
See https://www.tensorflow.org/performance/benchmarks for details.
"""
def __init__(self, towers, ps_device=None):
"""
Args:
towers(list[int]): list of GPU id
ps_device (str): either 'gpu' or 'cpu', where variables are stored.
Setting to 'cpu' might help when #gpu>=4
"""
super(SyncMultiGPUParameterServerBuilder, self).__init__(towers) super(SyncMultiGPUParameterServerBuilder, self).__init__(towers)
# TODO auto choose ps_device if ps_device is None:
ps_device = 'cpu' if len(towers) >= 4 else 'gpu'
assert ps_device in ['cpu', 'gpu']
self.ps_device = ps_device self.ps_device = ps_device
@staticmethod @staticmethod
...@@ -158,6 +191,16 @@ class SyncMultiGPUParameterServerBuilder(DataParallelBuilder): ...@@ -158,6 +191,16 @@ class SyncMultiGPUParameterServerBuilder(DataParallelBuilder):
return new_tower_grads return new_tower_grads
def build(self, input, get_cost_fn, get_opt_fn): def build(self, input, get_cost_fn, get_opt_fn):
"""
Args:
input (InputSource):
get_cost_fn ([tf.Tensor] -> tf.Tensor): callable which takes a list of input tensor
and returns a cost tensor
get_opt_fn (-> tf.train.Optimizer): callable which returns an optimizer
Returns:
tf.Operation: the training op
"""
raw_devices = ['/gpu:{}'.format(k) for k in self.towers] raw_devices = ['/gpu:{}'.format(k) for k in self.towers]
if self.ps_device == 'gpu': if self.ps_device == 'gpu':
devices = [LeastLoadedDeviceSetter(d, raw_devices) for d in raw_devices] devices = [LeastLoadedDeviceSetter(d, raw_devices) for d in raw_devices]
...@@ -165,22 +208,9 @@ class SyncMultiGPUParameterServerBuilder(DataParallelBuilder): ...@@ -165,22 +208,9 @@ class SyncMultiGPUParameterServerBuilder(DataParallelBuilder):
devices = [tf.train.replica_device_setter( devices = [tf.train.replica_device_setter(
worker_device=d, ps_device='/cpu:0', ps_tasks=1) for d in raw_devices] worker_device=d, ps_device='/cpu:0', ps_tasks=1) for d in raw_devices]
# TODO XXX share this part of code get_grad_fn, get_opt_fn = DataParallelBuilder._make_fn(input, get_cost_fn, get_opt_fn)
get_opt_fn = memoized(get_opt_fn)
def get_grads(): grad_list = DataParallelBuilder.build_on_towers(self.towers, get_grad_fn, devices)
ctx = get_current_tower_context()
cost = get_cost_fn(*input.get_input_tensors())
varlist = ctx.filter_vars_by_vs_name(tf.trainable_variables())
opt = get_opt_fn()
grads = opt.compute_gradients(
cost, var_list=varlist,
gate_gradients=False, colocate_gradients_with_ops=True)
grads = FilterNoneGrad().process(grads)
return grads
grad_list = DataParallelBuilder.build_on_towers(self.towers, get_grads, devices)
DataParallelBuilder._check_grad_list(grad_list) DataParallelBuilder._check_grad_list(grad_list)
# debug tower performance (without update): # debug tower performance (without update):
...@@ -201,6 +231,14 @@ class SyncMultiGPUParameterServerBuilder(DataParallelBuilder): ...@@ -201,6 +231,14 @@ class SyncMultiGPUParameterServerBuilder(DataParallelBuilder):
class SyncMultiGPUReplicatedBuilder(DataParallelBuilder): class SyncMultiGPUReplicatedBuilder(DataParallelBuilder):
"""
Graph builder for data-parallel training in "replicated" mode,
where each GPU contains a replicate of the whole model.
It will build one tower on each GPU under its own variable scope.
Each gradient update is averaged across or GPUs through NCCL.
See https://www.tensorflow.org/performance/benchmarks for details.
"""
@staticmethod @staticmethod
def _allreduce_grads(tower_grads): def _allreduce_grads(tower_grads):
from tensorflow.contrib import nccl from tensorflow.contrib import nccl
...@@ -224,25 +262,27 @@ class SyncMultiGPUReplicatedBuilder(DataParallelBuilder): ...@@ -224,25 +262,27 @@ class SyncMultiGPUReplicatedBuilder(DataParallelBuilder):
return new_tower_grads return new_tower_grads
def build(self, input, get_cost_fn, get_opt_fn): def build(self, input, get_cost_fn, get_opt_fn):
raw_devices = ['/gpu:{}'.format(k) for k in self.towers] """
Args:
input (InputSource): the input. Should have been setup.
get_cost_fn ([tf.Tensor] -> tf.Tensor): callable which takes a list of input tensor
and returns a cost tensor
get_opt_fn (-> tf.train.Optimizer): callable which returns an optimizer
get_opt_fn = memoized(get_opt_fn) Returns:
tf.Operation: the training op.
tf.Operation: the op which sync variables from GPU 0 to other GPUs.
It has to be run before the training has started.
And you can optionally run it later to sync non-trainable variables.
"""
raw_devices = ['/gpu:{}'.format(k) for k in self.towers]
def get_grads(): get_grad_fn, get_opt_fn = DataParallelBuilder._make_fn(input, get_cost_fn, get_opt_fn)
ctx = get_current_tower_context()
cost = get_cost_fn(*input.get_input_tensors())
varlist = ctx.filter_vars_by_vs_name(tf.trainable_variables())
opt = get_opt_fn()
grads = opt.compute_gradients(
cost, var_list=varlist,
gate_gradients=False, colocate_gradients_with_ops=True)
grads = FilterNoneGrad().process(grads)
return grads
grad_list = DataParallelBuilder.build_on_towers( grad_list = DataParallelBuilder.build_on_towers(
self.towers, self.towers,
get_grads, # use no variable scope for the first tower get_grad_fn,
# use no variable scope for the first tower
use_vs=[False] + [True] * (len(self.towers) - 1)) use_vs=[False] + [True] * (len(self.towers) - 1))
grads = SyncMultiGPUReplicatedBuilder._allreduce_grads(grad_list) grads = SyncMultiGPUReplicatedBuilder._allreduce_grads(grad_list)
...@@ -292,11 +332,33 @@ class SyncMultiGPUReplicatedBuilder(DataParallelBuilder): ...@@ -292,11 +332,33 @@ class SyncMultiGPUReplicatedBuilder(DataParallelBuilder):
class AsyncMultiGPUBuilder(DataParallelBuilder): class AsyncMultiGPUBuilder(DataParallelBuilder):
"""
Graph builder for data-parallel training with async update.
It builds one tower on each GPU with shared variable scope.
Every tower computes the gradients and independently applies them to the
variables, without synchronizing and averaging across towers.
"""
def __init__(self, towers, scale_gradient=True): def __init__(self, towers, scale_gradient=True):
"""
Args:
towers(list[int]): list of GPU ids.
scale_gradient (bool): if True, will scale each gradient by ``1.0/nr_gpu``.
"""
super(AsyncMultiGPUBuilder, self).__init__(towers) super(AsyncMultiGPUBuilder, self).__init__(towers)
self._scale_gradient = scale_gradient self._scale_gradient = scale_gradient
def build(self, input, get_cost_fn, get_opt_fn): def build(self, input, get_cost_fn, get_opt_fn):
"""
Args:
input (InputSource): the input. Should have been setup.
get_cost_fn ([tf.Tensor] -> tf.Tensor): callable which takes a list of input tensor
and returns a cost tensor
get_opt_fn (-> tf.train.Optimizer): callable which returns an optimizer
Returns:
tf.Operation: the training op
"""
ps_device = 'cpu' if len(self.towers) >= 4 else 'gpu' ps_device = 'cpu' if len(self.towers) >= 4 else 'gpu'
if ps_device == 'gpu': if ps_device == 'gpu':
...@@ -306,21 +368,9 @@ class AsyncMultiGPUBuilder(DataParallelBuilder): ...@@ -306,21 +368,9 @@ class AsyncMultiGPUBuilder(DataParallelBuilder):
devices = [tf.train.replica_device_setter( devices = [tf.train.replica_device_setter(
worker_device=d, ps_device='/cpu:0', ps_tasks=1) for d in raw_devices] worker_device=d, ps_device='/cpu:0', ps_tasks=1) for d in raw_devices]
get_opt_fn = memoized(get_opt_fn) get_grad_fn, get_opt_fn = DataParallelBuilder._make_fn(input, get_cost_fn, get_opt_fn)
def get_grads():
ctx = get_current_tower_context()
cost = get_cost_fn(*input.get_input_tensors())
varlist = ctx.filter_vars_by_vs_name(tf.trainable_variables())
opt = get_opt_fn()
grads = opt.compute_gradients(
cost, var_list=varlist,
gate_gradients=False, colocate_gradients_with_ops=True)
grads = FilterNoneGrad().process(grads)
return grads
grad_list = DataParallelBuilder.build_on_towers(self.towers, get_grads, devices) grad_list = DataParallelBuilder.build_on_towers(self.towers, get_grad_fn, devices)
DataParallelBuilder._check_grad_list(grad_list) DataParallelBuilder._check_grad_list(grad_list)
if self._scale_gradient and len(self.towers) > 1: if self._scale_gradient and len(self.towers) > 1:
......
...@@ -253,14 +253,6 @@ class Trainer(object): ...@@ -253,14 +253,6 @@ class Trainer(object):
self._callbacks.after_train() self._callbacks.after_train()
self.hooked_sess.close() self.hooked_sess.close()
# Predictor related methods. They actually should not be part of a trainer:
@property
def vs_name_for_predictor(self):
# The variable scope name a predictor should be built in.
# Expected to be changed. Don't use it.
# TODO graphbuilder knows it
return ""
def get_predictor(self, input_names, output_names, tower=0): def get_predictor(self, input_names, output_names, tower=0):
""" """
Returns a callable predictor built under ``is_training=False`` tower context. Returns a callable predictor built under ``is_training=False`` tower context.
......
...@@ -20,13 +20,7 @@ __all__ = ['DistributedTrainerReplicated'] ...@@ -20,13 +20,7 @@ __all__ = ['DistributedTrainerReplicated']
class DistributedTrainerReplicated(Trainer): class DistributedTrainerReplicated(Trainer):
""" """
Distributed replicated training. Build the graph with :class:`DistributedReplicatedBuilder` and train it.
Each worker process builds the same model on one or more GPUs.
Gradients across GPUs are averaged within the worker,
and get synchronously applied to the global copy of variables located on PS.
Then each worker copy the latest variables from PS back to local.
See https://www.tensorflow.org/performance/benchmarks for details.
Note: Note:
Gradients are not averaged across workers, but applied to PS variables Gradients are not averaged across workers, but applied to PS variables
...@@ -154,7 +148,3 @@ class DistributedTrainerReplicated(Trainer): ...@@ -154,7 +148,3 @@ class DistributedTrainerReplicated(Trainer):
return _create_session() return _create_session()
self.config.session_creator = _Creator() self.config.session_creator = _Creator()
@property
def vs_name_for_predictor(self):
return "tower0"
...@@ -47,11 +47,7 @@ def apply_prefetch_policy(config, gpu_prefetch=True): ...@@ -47,11 +47,7 @@ def apply_prefetch_policy(config, gpu_prefetch=True):
class SyncMultiGPUTrainerParameterServer(Trainer): class SyncMultiGPUTrainerParameterServer(Trainer):
""" """
A data-parallel multi-GPU trainer. It builds one tower on each GPU with Build graph with :class:`SyncMultiGPUParameterServerBuilder` and train it.
shared variable scope. It synchronoizes the gradients computed
from each tower, averages them and applies to the shared variables.
See https://www.tensorflow.org/performance/benchmarks for details.
""" """
def __init__(self, config, ps_device='gpu', gpu_prefetch=True): def __init__(self, config, ps_device='gpu', gpu_prefetch=True):
...@@ -93,11 +89,7 @@ def SyncMultiGPUTrainer(config): ...@@ -93,11 +89,7 @@ def SyncMultiGPUTrainer(config):
class SyncMultiGPUTrainerReplicated(Trainer): class SyncMultiGPUTrainerReplicated(Trainer):
""" """
Data-parallel multi-GPU trainer where each GPU contains a replicate of the whole model. Build graph with :class:`SyncMultiGPUReplicatedBuilder` and train it.
It will build one tower on each GPU under its own variable scope.
Each gradient update is averaged across or GPUs through NCCL.
See https://www.tensorflow.org/performance/benchmarks for details.
""" """
def __init__(self, config, gpu_prefetch=True): def __init__(self, config, gpu_prefetch=True):
""" """
...@@ -126,11 +118,8 @@ class SyncMultiGPUTrainerReplicated(Trainer): ...@@ -126,11 +118,8 @@ class SyncMultiGPUTrainerReplicated(Trainer):
class AsyncMultiGPUTrainer(Trainer): class AsyncMultiGPUTrainer(Trainer):
""" """
A data-parallel multi-GPU trainer. It builds one tower on each GPU with shared variable scope. Build graph with :class:`AsyncMultiGPUBuilder` and train it.
Every tower computes the gradients and independently applies them to the
variables, without synchronizing and averaging across towers.
""" """
def __init__(self, config, scale_gradient=True): def __init__(self, config, scale_gradient=True):
""" """
Args: Args:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment