Commit 694e404b authored by Yuxin Wu's avatar Yuxin Wu

[WIP] move away all graph building logic

parent ad5cb725
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# File: utils.py
# File: _utils.py
# Author: Yuxin Wu <ppwwyyxxc@gmail.com>
import copy
from six.moves import zip
from contextlib import contextmanager
import operator
import tensorflow as tf
from ..tfutils.common import get_op_tensor_name
from ..utils import logger
......@@ -57,3 +62,63 @@ def get_sublist_by_names(lst, names):
raise
ret.append(lst[idx])
return ret
@contextmanager
def override_to_local_variable(enable=True):
if enable:
with tf.variable_scope(
tf.get_variable_scope(),
custom_getter=OverrideToLocalVariable()):
yield
else:
yield
class OverrideToLocalVariable(object):
"""
Ensures the created variable
is in LOCAL_VARIABLES and not GLOBAL_VARIBLES collection.
"""
def __call__(self, getter, name, *args, **kwargs):
if 'collections' in kwargs:
collections = kwargs['collections']
if not collections:
collections = set([tf.GraphKeys.GLOBAL_VARIABLES])
else:
collections = set(collections.copy())
collections.remove(tf.GraphKeys.GLOBAL_VARIABLES)
collections.add(tf.GraphKeys.LOCAL_VARIABLES)
kwargs['collections'] = list(collections)
return getter(name, *args, **kwargs)
# Copied from https://github.com/tensorflow/benchmarks/blob/master/scripts/tf_cnn_benchmarks/variable_mgr.py
class LeastLoadedDeviceSetter(object):
""" Helper class to assign variables on the least loaded ps-device."""
def __init__(self, worker_device, ps_devices):
"""
Args:
worker_device: the device to use for compute ops.
ps_devices: a list of device to use for Variable ops.
"""
self.ps_devices = ps_devices
self.worker_device = worker_device
self.ps_sizes = [0] * len(self.ps_devices)
def __call__(self, op):
def sanitize_name(name): # tensorflow/tensorflow#11484
return tf.DeviceSpec.from_string(name).to_string()
if op.device:
return op.device
if op.type not in ['Variable', 'VariableV2']:
return sanitize_name(self.worker_device)
device_index, _ = min(enumerate(
self.ps_sizes), key=operator.itemgetter(1))
device_name = self.ps_devices[device_index]
var_size = op.outputs[0].get_shape().num_elements()
self.ps_sizes[device_index] += var_size
return sanitize_name(device_name)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# File: distributed.py
import tensorflow as tf
import re
from six.moves import zip, range
from ..utils import logger
from ..utils.argtools import memoized
from ..tfutils.gradproc import FilterNoneGrad
from ..tfutils.common import get_global_step_var, get_op_tensor_name
from ..tfutils.tower import get_current_tower_context
from .training import DataParallelBuilder
__all__ = ['DistributedReplicatedBuilder']
class DistributedReplicatedBuilder(DataParallelBuilder):
def __init__(self, towers, server):
super(DistributedReplicatedBuilder, self).__init__(towers)
self.server = server
server_def = server.server_def
self.cluster = tf.train.ClusterSpec(server_def.cluster)
self.job_name = server_def.job_name
self.task_index = server_def.task_index
# TODO XXX ps does't need to build!
assert self.job_name in ['ps', 'worker'], self.job_name
assert tf.test.is_gpu_available()
logger.info("Distributed training on cluster:\n" + str(server_def.cluster))
logger.info("My role in the cluster: job={}, task={}".format(self.job_name, self.task_index))
self.is_chief = (self.task_index == 0 and self.job_name == 'worker')
worker_prefix = '/job:worker/task:%s' % self.task_index
self.param_server_device = tf.train.replica_device_setter(
worker_device=worker_prefix + '/cpu:0', cluster=self.cluster)
self.num_ps = self.cluster.num_tasks('ps')
self.num_worker = self.cluster.num_tasks('worker')
self.nr_gpu = len(self.towers)
self.cpu_device = '%s/cpu:0' % worker_prefix
self.raw_devices = ['%s/%s:%i' % (worker_prefix, 'gpu', i) for i in range(self.nr_gpu)]
# Device for queues for managing synchronization between servers
self.sync_queue_devices = ['/job:ps/task:%s/cpu:0' % i for i in range(self.num_ps)]
self.sync_queue_counter = 0
@staticmethod
def _average_grads(tower_grads, devices):
"""
Average grads from towers.
The device where the average happens is chosen with round-robin.
Args:
tower_grads: Ngpu x Nvar x 2
Returns:
Nvar x 2
"""
nr_device = len(devices)
if nr_device == 1:
return tower_grads[0]
new_tower_grads = []
with tf.name_scope('AvgGrad'):
for i, grad_and_vars in enumerate(zip(*tower_grads)):
# Ngpu * 2
with tf.device(devices[i % nr_device]):
v = grad_and_vars[0][1]
# average gradient
all_grads = [g for (g, _) in grad_and_vars]
grad = tf.multiply(
tf.add_n(all_grads), 1.0 / nr_device)
new_tower_grads.append((grad, v))
return new_tower_grads
@staticmethod
def _apply_shadow_vars(avg_grads):
"""
Create shadow variables on PS, and replace variables in avg_grads
by these shadow variables.
Args:
avg_grads: list of (grad, var) tuples
"""
ps_var_grads = []
for grad, var in avg_grads:
assert var.name.startswith('tower'), var.name
my_name = '/'.join(var.name.split('/')[1:])
my_name = get_op_tensor_name(my_name)[0]
new_v = tf.get_variable(my_name, dtype=var.dtype.base_dtype,
initializer=var.initial_value,
trainable=True)
# (g, v) to be applied, where v is global (ps vars)
ps_var_grads.append((grad, new_v))
return ps_var_grads
@staticmethod
def _shadow_model_variables(shadow_vars):
"""
Create shadow vars for model_variables as well, and add to the list of ``shadow_vars``.
Returns:
list of (shadow_model_var, local_model_var) used for syncing.
"""
curr_shadow_vars = set([v.name for v in shadow_vars])
model_vars = tf.model_variables()
shadow_model_vars = []
for v in model_vars:
assert v.name.startswith('tower'), "Found some MODEL_VARIABLES created outside of the model!"
stripped_name = get_op_tensor_name(re.sub('tower[0-9]+/', '', v.name))[0]
if stripped_name in curr_shadow_vars:
continue
new_v = tf.get_variable(stripped_name, dtype=v.dtype.base_dtype,
initializer=v.initial_value,
trainable=False)
curr_shadow_vars.add(stripped_name) # avoid duplicated shadow_model_vars
shadow_vars.append(new_v)
shadow_model_vars.append((new_v, v)) # only need to sync model_var from one tower
return shadow_model_vars
def _add_sync_queues_and_barrier(self, name, dependencies):
"""Adds ops to enqueue on all worker queues.
Args:
name: prefixed for the shared_name of ops.
dependencies: control dependency from ops.
Returns:
an op that should be used as control dependency before starting next step.
"""
self.sync_queue_counter += 1
with tf.device(self.sync_queue_devices[self.sync_queue_counter % len(self.sync_queue_devices)]):
sync_queues = [
tf.FIFOQueue(self.num_worker, [tf.bool], shapes=[[]],
shared_name='%s%s' % (name, i))
for i in range(self.num_worker)]
queue_ops = []
# For each other worker, add an entry in a queue, signaling that it can finish this step.
token = tf.constant(False)
with tf.control_dependencies(dependencies):
for i, q in enumerate(sync_queues):
if i != self.task_index:
queue_ops.append(q.enqueue(token))
# Drain tokens off queue for this worker, one for each other worker.
queue_ops.append(
sync_queues[self.task_index].dequeue_many(len(sync_queues) - 1))
return tf.group(*queue_ops, name=name)
def build(self, input, get_cost_fn, get_opt_fn):
with tf.device(self.param_server_device):
gs = get_global_step_var()
assert gs.device, gs.device
# do this before inputsource.setup because input_source my need global step
get_opt_fn = memoized(get_opt_fn)
# Build the optimizer first, before entering any tower.
# This makes sure that learning_rate is a global variable (what we expect)
get_opt_fn()
def get_grads():
ctx = get_current_tower_context()
cost = get_cost_fn(*input.get_input_tensors())
varlist = ctx.filter_vars_by_vs_name(tf.trainable_variables())
opt = get_opt_fn()
grads = opt.compute_gradients(
cost, var_list=varlist,
gate_gradients=False, colocate_gradients_with_ops=True)
grads = FilterNoneGrad().process(grads)
return grads
# Ngpu * Nvar * 2
grad_list = self.build_on_multi_tower(
get_grads,
devices=self.raw_devices,
use_vs=[True] * len(self.towers)) # open vs at each tower
DataParallelBuilder._check_grad_list(grad_list)
avg_grads = DistributedReplicatedBuilder._average_grads(grad_list, self.raw_devices)
with tf.device(self.param_server_device):
ps_var_grads = DistributedReplicatedBuilder._apply_shadow_vars(avg_grads)
var_update_ops = self._apply_gradients_and_copy(
get_opt_fn(), grad_list, ps_var_grads)
self._shadow_vars = [v for (_, v) in ps_var_grads]
self._shadow_model_vars = DistributedReplicatedBuilder._shadow_model_variables(self._shadow_vars)
# TODO add options to synchronize less
main_fetch = tf.group(*var_update_ops, name='main_fetches')
train_op = self._add_sync_queues_and_barrier(
'post_copy_barrier', [main_fetch])
# initial local_vars syncing
initial_sync_op = self._get_initial_sync_op()
if len(self._shadow_model_vars) and self.is_chief:
model_sync_op = self._get_sync_model_vars_op()
else:
model_sync_op = None
return train_op, initial_sync_op, model_sync_op
def _apply_gradients_and_copy(self, opt, raw_grad_list, ps_var_grads):
"""
Apply averaged gradients to ps vars, and then copy the updated
variables back to each tower.
Args:
raw_grad_list: Ngpu x Nvar x 2 gradient list from all towers
ps_var_grads: Nvar x 2 (grad, ps_var)
Returns:
list of copy ops
"""
# TODO do this for variables together?
var_update_ops = []
for vid, (g, v) in enumerate(ps_var_grads):
# TODO do we put momentum variables into local or global?
apply_gradient_op = opt.apply_gradients([(g, v)])
barrier = self._add_sync_queues_and_barrier(
'param_update_barrier_{}'.format(vid), [apply_gradient_op])
with tf.control_dependencies([barrier]), \
tf.device(self.cpu_device):
updated_value = v.read_value()
for towerid in range(self.nr_gpu):
var_update_ops.append(
raw_grad_list[towerid][vid][1].assign(updated_value))
return var_update_ops
def _get_initial_sync_op(self):
"""
Get the op to copy-initialized all local variables from PS.
"""
def strip_port(s):
if s.endswith(':0'):
return s[:-2]
return s
local_vars = tf.local_variables()
local_var_by_name = dict([(strip_port(v.name), v) for v in local_vars])
ops = []
nr_shadow_vars = len(self._shadow_vars)
for v in self._shadow_vars:
vname = strip_port(v.name)
for i in range(self.nr_gpu):
name = 'tower%s/%s' % (i, vname)
assert name in local_var_by_name, \
"Shadow variable {} doesn't match a corresponding local variable!".format(v.name)
copy_to = local_var_by_name[name]
# logger.info("{} -> {}".format(v.name, copy_to.name))
ops.append(copy_to.assign(v.read_value()))
return tf.group(*ops, name='sync_{}_variables_from_ps'.format(nr_shadow_vars))
def _get_sync_model_vars_op(self):
"""
Get the op to sync local model_variables to PS.
"""
ops = []
for (shadow_v, local_v) in self._shadow_model_vars:
ops.append(shadow_v.assign(local_v.read_value()))
assert len(ops)
return tf.group(*ops, name='sync_{}_model_variables_to_ps'.format(len(ops)))
......@@ -5,9 +5,22 @@
from abc import ABCMeta, abstractmethod
import tensorflow as tf
import six
from six.moves import zip, range
from ..utils import logger
from ..utils.argtools import memoized
from ..tfutils.gradproc import FilterNoneGrad
from ..tfutils.tower import TowerContext
from ..tfutils.tower import TowerContext, get_current_tower_context
from ..tfutils.common import get_tf_version_number
from ..tfutils.collection import backup_collection, restore_collection
from ..tfutils.gradproc import ScaleGradient
from ..utils.naming import TOWER_FREEZE_KEYS
from ._utils import LeastLoadedDeviceSetter, override_to_local_variable
__all__ = ['GraphBuilder', 'SimpleBuilder',
'SyncMultiGPUParameterServerBuilder', 'DataParallelBuilder',
'SyncMultiGPUReplicatedBuilder', 'AsyncMultiGPUBuilder']
@six.add_metaclass(ABCMeta)
......@@ -17,7 +30,7 @@ class GraphBuilder(object):
pass
class SimpleGraphBuilder(GraphBuilder):
class SimpleBuilder(GraphBuilder):
"""
Build the graph for single-cost single-optimizer single-tower training.
"""
......@@ -43,3 +56,284 @@ class SimpleGraphBuilder(GraphBuilder):
grads = FilterNoneGrad().process(grads)
train_op = opt.apply_gradients(grads, name='min_op')
return train_op
class DataParallelBuilder(GraphBuilder):
def __init__(self, towers):
"""
Args:
towers(list[int]): list of GPU relative ids.
"""
if len(towers) > 1:
logger.info("Training a model of {} towers".format(len(towers)))
DataParallelBuilder._check_tf_version()
self.towers = towers
@staticmethod
def _check_tf_version(self):
assert get_tf_version_number() >= 1.1, \
"TF version {} is too old to run multi GPU training!".format(tf.VERSION)
@staticmethod
def _check_grad_list(grad_list):
"""
Args:
grad_list: list of list of tuples, shape is Ngpu x Nvar x 2
"""
nvars = [len(k) for k in grad_list]
assert len(set(nvars)) == 1, "Number of gradients from each tower is different! " + str(nvars)
def build_on_multi_tower(
self, func, devices=None, use_vs=None):
"""
Args:
func: a lambda to be called inside each tower
devices: a list of devices to be used. By default will use GPUs in ``towers``.
use_vs (list[bool]): list of use_vs to passed to TowerContext
Returns:
List of outputs of ``func``, evaluated on each tower.
"""
ret = []
if devices is not None:
assert len(devices) == len(self.towers)
if use_vs is not None:
assert len(use_vs) == len(self.towers)
tower_names = ['tower{}'.format(idx) for idx in range(len(self.towers))]
for idx, t in enumerate(self.towers):
device = devices[idx] if devices is not None else '/gpu:{}'.format(t)
usevs = use_vs[idx] if use_vs is not None else False
with tf.device(device), TowerContext(
tower_names[idx],
is_training=True,
index=idx,
use_vs=usevs):
if idx == t:
logger.info("Building graph for training tower {}...".format(idx))
else:
logger.info("Building graph for training tower {} on device {}...".format(idx, device))
# When use_vs is True, use LOCAL_VARIABLES,
# so these duplicated variables won't be saved by default.
with override_to_local_variable(enable=usevs):
ret.append(func())
if idx == 0:
# avoid duplicated summary & update_ops from each device
backup = backup_collection(TOWER_FREEZE_KEYS)
restore_collection(backup)
return ret
class SyncMultiGPUParameterServerBuilder(DataParallelBuilder):
def __init__(self, towers, ps_device):
super(SyncMultiGPUParameterServerBuilder, self).__init__(towers)
# TODO auto choose ps_device
self.ps_device = ps_device
@staticmethod
def _average_grads(tower_grads):
# tower_grads: Ngpu x Nvar x 2
nr_tower = len(tower_grads)
if nr_tower == 1:
return tower_grads[0]
new_tower_grads = []
with tf.name_scope('AvgGrad'):
for grad_and_vars in zip(*tower_grads):
# Ngpu * 2
v = grad_and_vars[0][1]
all_grads = [g for (g, _) in grad_and_vars]
with tf.device(v.device): # colocate summed grad with var
grad = tf.multiply(
tf.add_n(all_grads), 1.0 / nr_tower)
new_tower_grads.append((grad, v))
return new_tower_grads
def build(self, input, get_cost_fn, get_opt_fn):
raw_devices = ['/gpu:{}'.format(k) for k in self.towers]
if self.ps_device == 'gpu':
devices = [LeastLoadedDeviceSetter(d, raw_devices) for d in raw_devices]
else:
devices = [tf.train.replica_device_setter(
worker_device=d, ps_device='/cpu:0', ps_tasks=1) for d in raw_devices]
# TODO XXX share this part of code
get_opt_fn = memoized(get_opt_fn)
def get_grads():
ctx = get_current_tower_context()
cost = get_cost_fn(*input.get_input_tensors())
varlist = ctx.filter_vars_by_vs_name(tf.trainable_variables())
opt = get_opt_fn()
grads = opt.compute_gradients(
cost, var_list=varlist,
gate_gradients=False, colocate_gradients_with_ops=True)
grads = FilterNoneGrad().process(grads)
return grads
grad_list = self.build_on_multi_tower(get_grads, devices)
DataParallelBuilder._check_grad_list(grad_list)
# debug tower performance (without update):
# ops = [k[0] for k in grad_list[1]] + [k[0] for k in grad_list[0]]
# self.train_op = tf.group(*ops)
# return
grads = SyncMultiGPUParameterServerBuilder._average_grads(grad_list)
# grads = grad_list[0]
opt = get_opt_fn()
if self.ps_device == 'cpu':
with tf.device('/cpu:0'):
train_op = opt.apply_gradients(grads, name='train_op')
else:
train_op = opt.apply_gradients(grads, name='train_op')
return train_op
class SyncMultiGPUReplicatedBuilder(DataParallelBuilder):
@staticmethod
def _allreduce_grads(tower_grads):
from tensorflow.contrib import nccl
nr_tower = len(tower_grads)
if nr_tower == 1:
return [[x] for x in tower_grads[0]]
new_tower_grads = []
with tf.name_scope('AvgGrad'):
for grad_and_vars in zip(*tower_grads):
v = grad_and_vars[0][1]
grads = [g for g, _ in grad_and_vars]
summed = nccl.all_sum(grads)
grads_for_a_var = []
for (_, v), g in zip(grad_and_vars, summed):
with tf.device(g.device):
g = tf.multiply(g, 1.0 / nr_tower)
grads_for_a_var.append((g, v))
new_tower_grads.append(grads_for_a_var)
# NVar * NGPU * 2
return new_tower_grads
def build(self, input, get_cost_fn, get_opt_fn):
raw_devices = ['/gpu:{}'.format(k) for k in self.towers]
get_opt_fn = memoized(get_opt_fn)
def get_grads():
ctx = get_current_tower_context()
cost = get_cost_fn(*input.get_input_tensors())
varlist = ctx.filter_vars_by_vs_name(tf.trainable_variables())
opt = get_opt_fn()
grads = opt.compute_gradients(
cost, var_list=varlist,
gate_gradients=False, colocate_gradients_with_ops=True)
grads = FilterNoneGrad().process(grads)
return grads
grad_list = self.build_on_multi_tower(
get_grads, # use no variable scope for the first tower
use_vs=[False] + [True] * (len(self.towers) - 1))
grads = SyncMultiGPUReplicatedBuilder._allreduce_grads(grad_list)
train_ops = []
opt = get_opt_fn()
for idx in range(len(self.towers)):
with tf.device(raw_devices[idx]):
grad_and_vars = [x[idx] for x in grads]
# apply_gradients may create variables. Make them LOCAL_VARIABLES
with override_to_local_variable(enable=idx > 0):
train_ops.append(opt.apply_gradients(
grad_and_vars, name='apply_grad_{}'.format(idx)))
train_op = tf.group(*train_ops, name='train_op')
post_init_op = SyncMultiGPUReplicatedBuilder.get_post_init_ops()
return train_op, post_init_op
# Adopt from https://github.com/tensorflow/benchmarks/blob/master/scripts/tf_cnn_benchmarks/variable_mgr.py
@staticmethod
def get_post_init_ops():
"""
Copy values of variables on GPU 0 to other GPUs.
"""
# literally all variables, because it's better to sync optimizer-internal variables as well
all_vars = tf.global_variables() + tf.local_variables()
var_by_name = dict([(v.name, v) for v in all_vars])
post_init_ops = []
for v in all_vars:
if not v.name.startswith('tower'):
continue
if v.name.startswith('tower0'):
logger.warn("[SyncMultiGPUReplicatedBuilder] variable "
"{} has prefix 'tower0', this is unexpected.".format(v.name))
continue # TODO some vars (EMA) may still startswith tower0
# in this trainer, the master name doesn't have the towerx/ prefix
split_name = v.name.split('/')
prefix = split_name[0]
realname = '/'.join(split_name[1:])
if prefix in realname:
logger.error("[SyncMultiGPUReplicatedBuilder] variable "
"{} has its prefix {} appears multiple times in its name!".format(v.name, prefix))
copy_from = var_by_name.get(realname)
assert copy_from is not None, var_by_name.keys()
post_init_ops.append(v.assign(copy_from.read_value()))
logger.info(
"'sync_variables_from_main_tower' includes {} operations.".format(len(post_init_ops)))
return tf.group(*post_init_ops, name='sync_variables_from_main_tower')
class AsyncMultiGPUBuilder(DataParallelBuilder):
def __init__(self, towers, scale_gradient=True):
super(AsyncMultiGPUBuilder, self).__init__(towers)
self._scale_gradient = scale_gradient
def build(self, input, get_cost_fn, get_opt_fn):
ps_device = 'cpu' if len(self.towers) >= 4 else 'gpu'
if ps_device == 'gpu':
raw_devices = ['/gpu:{}'.format(k) for k in self.towers]
devices = [LeastLoadedDeviceSetter(d, raw_devices) for d in raw_devices]
else:
devices = [tf.train.replica_device_setter(
worker_device=d, ps_device='/cpu:0', ps_tasks=1) for d in raw_devices]
get_opt_fn = memoized(get_opt_fn)
def get_grads():
ctx = get_current_tower_context()
cost = get_cost_fn(*input.get_input_tensors())
varlist = ctx.filter_vars_by_vs_name(tf.trainable_variables())
opt = get_opt_fn()
grads = opt.compute_gradients(
cost, var_list=varlist,
gate_gradients=False, colocate_gradients_with_ops=True)
grads = FilterNoneGrad().process(grads)
return grads
grad_list = self.build_on_multi_tower(get_grads, devices)
DataParallelBuilder._check_grad_list(grad_list)
if self.scale_gradient and len(self.towers) > 1:
# pretend to average the grads, in order to make async and
# sync have consistent effective learning rate
gradproc = ScaleGradient(('.*', 1.0 / len(self.towers)), verbose=False)
grad_list = [gradproc.process(gv) for gv in grad_list]
# Ngpu x Nvar x 2
train_ops = []
opt = get_opt_fn()
with tf.name_scope('async_apply_gradients'):
for i, grad_and_vars in enumerate(zip(*grad_list)):
# Ngpu x 2
v = grad_and_vars[0][1]
with tf.device(v.device):
# will call apply_gradients (therefore gradproc) multiple times
train_ops.append(opt.apply_gradients(
grad_and_vars, name='apply_grad_{}'.format(i)))
return tf.group(*train_ops, name='train_op')
......@@ -3,23 +3,22 @@
# File: distributed.py
import tensorflow as tf
import re
import os
from six.moves import range
from ..utils import logger
from ..callbacks import RunOp
from ..tfutils.sesscreate import NewSessionCreator
from ..tfutils.common import get_global_step_var, get_op_tensor_name
from ..tfutils.common import get_global_step_var
from .multigpu import MultiGPUTrainerBase
from ..graph_builder.distributed import DistributedReplicatedBuilder
from .utility import override_to_local_variable
from .base import Trainer
__all__ = ['DistributedTrainerReplicated']
class DistributedTrainerReplicated(MultiGPUTrainerBase):
class DistributedTrainerReplicated(Trainer):
"""
Distributed replicated training.
Each worker process builds the same model on one or more GPUs.
......@@ -62,147 +61,27 @@ class DistributedTrainerReplicated(MultiGPUTrainerBase):
server(tf.train.Server): the server object with ps and workers
"""
assert config.data is not None and config.model is not None
self.server = server
server_def = server.server_def
self.cluster = tf.train.ClusterSpec(server_def.cluster)
self.job_name = server_def.job_name
self.task_index = server_def.task_index
assert self.job_name in ['ps', 'worker'], self.job_name
assert tf.test.is_gpu_available
logger.info("Distributed training on cluster:\n" + str(server_def.cluster))
logger.info("My role in the cluster: job={}, task={}".format(self.job_name, self.task_index))
self._builder = DistributedReplicatedBuilder(self.config.tower, server)
self._input_source = config.data
self.is_chief = (self.task_index == 0 and self.job_name == 'worker')
worker_prefix = '/job:worker/task:%s' % self.task_index
self.param_server_device = tf.train.replica_device_setter(
worker_device=worker_prefix + '/cpu:0', cluster=self.cluster)
self.num_ps = self.cluster.num_tasks('ps')
self.num_worker = self.cluster.num_tasks('worker')
self.is_chief = self._builder.is_chief
self.nr_gpu = config.nr_tower
self.cpu_device = '%s/cpu:0' % worker_prefix
self.raw_devices = ['%s/%s:%i' % (worker_prefix, 'gpu', i) for i in range(self.nr_gpu)]
# Device for queues for managing synchronization between servers
self.sync_queue_devices = ['/job:ps/task:%s/cpu:0' % i for i in range(self.num_ps)]
self.sync_queue_counter = 0
super(DistributedTrainerReplicated, self).__init__(config)
@staticmethod
def _average_grads(tower_grads, devices):
"""
Average grads from towers.
The device where the average happens is chosen with round-robin.
Args:
tower_grads: Ngpu x Nvar x 2
Returns:
Nvar x 2
"""
nr_device = len(devices)
if nr_device == 1:
return tower_grads[0]
new_tower_grads = []
with tf.name_scope('AvgGrad'):
for i, grad_and_vars in enumerate(zip(*tower_grads)):
# Ngpu * 2
with tf.device(devices[i % nr_device]):
v = grad_and_vars[0][1]
# average gradient
all_grads = [g for (g, _) in grad_and_vars]
grad = tf.multiply(
tf.add_n(all_grads), 1.0 / nr_device)
new_tower_grads.append((grad, v))
return new_tower_grads
@staticmethod
def _apply_shadow_vars(avg_grads):
"""
Create shadow variables on PS, and replace variables in avg_grads
by these shadow variables.
Args:
avg_grads: list of (grad, var) tuples
"""
ps_var_grads = []
for grad, var in avg_grads:
assert var.name.startswith('tower'), var.name
my_name = '/'.join(var.name.split('/')[1:])
my_name = get_op_tensor_name(my_name)[0]
new_v = tf.get_variable(my_name, dtype=var.dtype.base_dtype,
initializer=var.initial_value,
trainable=True)
# (g, v) to be applied, where v is global (ps vars)
ps_var_grads.append((grad, new_v))
return ps_var_grads
@staticmethod
def _shadow_model_variables(shadow_vars):
"""
Create shadow vars for model_variables as well, and add to the list of ``shadow_vars``.
Returns:
list of (shadow_model_var, local_model_var) used for syncing.
"""
curr_shadow_vars = set([v.name for v in shadow_vars])
model_vars = tf.model_variables()
shadow_model_vars = []
for v in model_vars:
assert v.name.startswith('tower'), "Found some MODEL_VARIABLES created outside of the model!"
stripped_name = get_op_tensor_name(re.sub('tower[0-9]+/', '', v.name))[0]
if stripped_name in curr_shadow_vars:
continue
new_v = tf.get_variable(stripped_name, dtype=v.dtype.base_dtype,
initializer=v.initial_value,
trainable=False)
curr_shadow_vars.add(stripped_name) # avoid duplicated shadow_model_vars
shadow_vars.append(new_v)
shadow_model_vars.append((new_v, v)) # only need to sync model_var from one tower
return shadow_model_vars
def _apply_gradients_and_copy(self, raw_grad_list, ps_var_grads):
"""
Apply averaged gradients to ps vars, and then copy the updated
variables back to each tower.
Args:
raw_grad_list: Ngpu x Nvar x 2 gradient list from all towers
ps_var_grads: Nvar x 2 (grad, ps_var)
Returns:
list of copy ops
"""
# TODO do this for variables together?
opt = self.model.get_optimizer()
var_update_ops = []
for vid, (g, v) in enumerate(ps_var_grads):
# TODO do we put momentum variables into local or global?
apply_gradient_op = opt.apply_gradients([(g, v)])
barrier = self._add_sync_queues_and_barrier(
'param_update_barrier_{}'.format(vid), [apply_gradient_op])
with tf.control_dependencies([barrier]), \
tf.device(self.cpu_device):
updated_value = v.read_value()
for towerid in range(self.nr_gpu):
var_update_ops.append(
raw_grad_list[towerid][vid][1].assign(updated_value))
return var_update_ops
def _setup(self):
if self.job_name == 'ps':
logger.info("Running ps {}".format(self.task_index))
if self._builder.job_name == 'ps':
logger.info("Running ps {}".format(self._builder.task_index))
logger.info("Kill me with 'kill {}'".format(os.getpid()))
self.server.join() # this will never return tensorflow#4713
return
with tf.device(self.param_server_device):
with tf.device(self._builder.param_server_device):
gs = get_global_step_var()
assert gs.device, gs.device
# do this before inputsource.setup because input_source my need global step
# always do this before inputsource.setup because input_source my need global step
with override_to_local_variable():
# input source may create variable (queue size summary)
......@@ -212,40 +91,22 @@ class DistributedTrainerReplicated(MultiGPUTrainerBase):
cbs = self._input_source.setup(self.model.get_inputs_desc())
self.config.callbacks.extend(cbs)
# Build the optimizer first, before entering any tower.
# This makes sure that learning_rate is a global variable (what we expect)
self.model.get_optimizer()
def get_cost(*inputs):
self.model.build_graph(inputs)
return self.model.get_cost()
# Ngpu * Nvar * 2
grad_list = MultiGPUTrainerBase.build_on_multi_tower(
self.config.tower,
lambda: MultiGPUTrainerBase._build_graph_get_grads(
self.model, self._input_source),
devices=self.raw_devices,
use_vs=[True] * self.config.nr_tower) # open vs at each tower
MultiGPUTrainerBase._check_grad_list(grad_list)
avg_grads = DistributedTrainerReplicated._average_grads(grad_list, self.raw_devices)
with tf.device(self.param_server_device):
ps_var_grads = DistributedTrainerReplicated._apply_shadow_vars(avg_grads)
var_update_ops = self._apply_gradients_and_copy(grad_list, ps_var_grads)
self._shadow_vars = [v for (_, v) in ps_var_grads]
self._shadow_model_vars = DistributedTrainerReplicated._shadow_model_variables(self._shadow_vars)
# TODO add options to synchronize less
main_fetch = tf.group(*var_update_ops, name='main_fetches')
self.train_op = self._add_sync_queues_and_barrier(
'post_copy_barrier', [main_fetch])
self.train_op, initial_sync_op, model_sync_op = self._builder.build(
self._input_source, get_cost, self.model.get_optimizer)
# initial local_vars syncing
cb = RunOp(self._get_initial_sync_op,
cb = RunOp(lambda: initial_sync_op,
run_before=True, run_as_trigger=False, verbose=True)
cb.chief_only = False
self.register_callback(cb)
# model_variables syncing
if len(self._shadow_model_vars) and self.is_chief:
cb = RunOp(self._get_sync_model_vars_op,
if model_sync_op:
cb = RunOp(lambda: model_sync_op,
run_before=False, run_as_trigger=True, verbose=True)
logger.warn("For efficiency, local MODEL_VARIABLES are only synced to PS once "
"every epoch. Be careful if you save the model more frequently than this.")
......@@ -285,69 +146,6 @@ class DistributedTrainerReplicated(MultiGPUTrainerBase):
self.config.session_creator = _Creator()
def _add_sync_queues_and_barrier(self, name, dependencies):
"""Adds ops to enqueue on all worker queues.
Args:
name: prefixed for the shared_name of ops.
dependencies: control dependency from ops.
Returns:
an op that should be used as control dependency before starting next step.
"""
self.sync_queue_counter += 1
with tf.device(self.sync_queue_devices[self.sync_queue_counter % len(self.sync_queue_devices)]):
sync_queues = [
tf.FIFOQueue(self.num_worker, [tf.bool], shapes=[[]],
shared_name='%s%s' % (name, i))
for i in range(self.num_worker)]
queue_ops = []
# For each other worker, add an entry in a queue, signaling that it can finish this step.
token = tf.constant(False)
with tf.control_dependencies(dependencies):
for i, q in enumerate(sync_queues):
if i != self.task_index:
queue_ops.append(q.enqueue(token))
# Drain tokens off queue for this worker, one for each other worker.
queue_ops.append(
sync_queues[self.task_index].dequeue_many(len(sync_queues) - 1))
return tf.group(*queue_ops, name=name)
def _get_initial_sync_op(self):
"""
Get the op to copy-initialized all local variables from PS.
"""
def strip_port(s):
if s.endswith(':0'):
return s[:-2]
return s
local_vars = tf.local_variables()
local_var_by_name = dict([(strip_port(v.name), v) for v in local_vars])
ops = []
nr_shadow_vars = len(self._shadow_vars)
for v in self._shadow_vars:
vname = strip_port(v.name)
for i in range(self.nr_gpu):
name = 'tower%s/%s' % (i, vname)
assert name in local_var_by_name, \
"Shadow variable {} doesn't match a corresponding local variable!".format(v.name)
copy_to = local_var_by_name[name]
# logger.info("{} -> {}".format(v.name, copy_to.name))
ops.append(copy_to.assign(v.read_value()))
return tf.group(*ops, name='sync_{}_variables_from_ps'.format(nr_shadow_vars))
def _get_sync_model_vars_op(self):
"""
Get the op to sync local model_variables to PS.
"""
ops = []
for (shadow_v, local_v) in self._shadow_model_vars:
ops.append(shadow_v.assign(local_v.read_value()))
assert len(ops)
return tf.group(*ops, name='sync_{}_model_variables_to_ps'.format(len(ops)))
@property
def vs_name_for_predictor(self):
return "tower0"
......@@ -4,32 +4,22 @@
# Author: Yuxin Wu <ppwwyyxxc@gmail.com>
import tensorflow as tf
from six.moves import zip, range
from ..utils import logger
from ..utils.naming import TOWER_FREEZE_KEYS
from ..tfutils.common import get_tf_version_number
from ..tfutils.tower import TowerContext
from ..tfutils.collection import backup_collection, restore_collection
from ..tfutils.gradproc import ScaleGradient
from ..callbacks.graph import RunOp
from ..graph_builder.input_source import QueueInput, StagingInputWrapper, DummyConstantInput
from ..graph_builder.training import (
SyncMultiGPUParameterServerBuilder,
SyncMultiGPUReplicatedBuilder,
AsyncMultiGPUBuilder)
from .base import Trainer
from .utility import LeastLoadedDeviceSetter, override_to_local_variable
__all__ = ['MultiGPUTrainerBase',
'SyncMultiGPUTrainerReplicated',
__all__ = ['SyncMultiGPUTrainerReplicated',
'SyncMultiGPUTrainerParameterServer',
'AsyncMultiGPUTrainer',
'SyncMultiGPUTrainer']
def _check_tf_version():
assert get_tf_version_number() >= 1.1, \
"TF version {} is too old to run multi GPU training!".format(tf.VERSION)
def apply_prefetch_policy(config, gpu_prefetch=True):
assert (config.data is not None or config.dataflow is not None) and config.model is not None
if config.data is None and config.dataflow is not None:
......@@ -45,76 +35,7 @@ def apply_prefetch_policy(config, gpu_prefetch=True):
config.data = StagingInputWrapper(config.data, devices)
class MultiGPUTrainerBase(Trainer):
""" Base class for multi-gpu training"""
@staticmethod
def build_on_multi_tower(
towers, func,
devices=None,
use_vs=None):
"""
Args:
towers: list of gpu relative ids
func: a lambda to be called inside each tower
devices: a list of devices to be used. By default will use GPUs in ``towers``.
use_vs (list[bool]): list of use_vs to passed to TowerContext
Returns:
List of outputs of ``func``, evaluated on each tower.
"""
if len(towers) > 1:
logger.info("Training a model of {} towers".format(len(towers)))
_check_tf_version()
ret = []
if devices is not None:
assert len(devices) == len(towers)
if use_vs is not None:
assert len(use_vs) == len(towers)
tower_names = ['tower{}'.format(idx) for idx in range(len(towers))]
keys_to_freeze = TOWER_FREEZE_KEYS[:]
for idx, t in enumerate(towers):
device = devices[idx] if devices is not None else '/gpu:{}'.format(t)
usevs = use_vs[idx] if use_vs is not None else False
with tf.device(device), TowerContext(
tower_names[idx],
is_training=True,
index=idx,
use_vs=usevs):
if idx == t:
logger.info("Building graph for training tower {}...".format(idx))
else:
logger.info("Building graph for training tower {} on device {}...".format(idx, device))
# When use_vs is True, use LOCAL_VARIABLES,
# so these duplicated variables won't be saved by default.
with override_to_local_variable(enable=usevs):
ret.append(func())
if idx == 0:
# avoid duplicated summary & update_ops from each device
backup = backup_collection(keys_to_freeze)
restore_collection(backup)
return ret
@staticmethod
def _check_grad_list(grad_list):
"""
Args:
grad_list: list of list of tuples, shape is Ngpu x Nvar x 2
"""
nvars = [len(k) for k in grad_list]
assert len(set(nvars)) == 1, "Number of gradients from each tower is different! " + str(nvars)
@staticmethod
def _build_graph_get_grads(model, input):
model.build_graph(input)
return model.get_cost_and_grad()[1]
class SyncMultiGPUTrainerParameterServer(MultiGPUTrainerBase):
class SyncMultiGPUTrainerParameterServer(Trainer):
"""
A data-parallel multi-GPU trainer. It builds one tower on each GPU with
shared variable scope. It synchronoizes the gradients computed
......@@ -137,74 +58,18 @@ class SyncMultiGPUTrainerParameterServer(MultiGPUTrainerBase):
self._ps_device = ps_device
super(SyncMultiGPUTrainerParameterServer, self).__init__(config)
@staticmethod
def _average_grads(tower_grads):
# tower_grads: Ngpu x Nvar x 2
nr_tower = len(tower_grads)
if nr_tower == 1:
return tower_grads[0]
new_tower_grads = []
with tf.name_scope('AvgGrad'):
for grad_and_vars in zip(*tower_grads):
# Ngpu * 2
v = grad_and_vars[0][1]
all_grads = [g for (g, _) in grad_and_vars]
with tf.device(v.device): # colocate summed grad with var
grad = tf.multiply(
tf.add_n(all_grads), 1.0 / nr_tower)
new_tower_grads.append((grad, v))
return new_tower_grads
@staticmethod
def setup_graph(model, input, ps_device, towers):
"""
Args:
model (ModelDesc):
input (InputSource):
ps_device (str):
towers (list[int]):
def _setup(self):
callbacks = self._input_source.setup(self.model.get_inputs_desc())
Returns:
tf.Operation: the training op
def get_cost(*inputs):
self.model.build_graph(inputs)
return self.model.get_cost()
[Callback]: the callbacks to be added
"""
callbacks = input.setup(model.get_inputs_desc())
raw_devices = ['/gpu:{}'.format(k) for k in towers]
if ps_device == 'gpu':
devices = [LeastLoadedDeviceSetter(d, raw_devices) for d in raw_devices]
else:
devices = [tf.train.replica_device_setter(
worker_device=d, ps_device='/cpu:0', ps_tasks=1) for d in raw_devices]
grad_list = MultiGPUTrainerBase.build_on_multi_tower(
towers,
lambda: MultiGPUTrainerBase._build_graph_get_grads(model, input),
devices)
MultiGPUTrainerBase._check_grad_list(grad_list)
# debug tower performance (without update):
# ops = [k[0] for k in grad_list[1]] + [k[0] for k in grad_list[0]]
# self.train_op = tf.group(*ops)
# return
grads = SyncMultiGPUTrainerParameterServer._average_grads(grad_list)
# grads = grad_list[0]
opt = model.get_optimizer()
if ps_device == 'cpu':
with tf.device('/cpu:0'):
train_op = opt.apply_gradients(grads, name='train_op')
else:
train_op = opt.apply_gradients(grads, name='train_op')
return train_op, callbacks
self.train_op = SyncMultiGPUParameterServerBuilder(
self.config.tower, self._ps_device).build(
self._input_source, get_cost, self.model.get_optimizer)
def _setup(self):
self.train_op, cbs = SyncMultiGPUTrainerParameterServer.setup_graph(
self.model, self._input_source, self._ps_device, self.config.tower)
self.config.callbacks.extend(cbs)
self.config.callbacks.extend(callbacks)
def SyncMultiGPUTrainer(config):
......@@ -216,7 +81,7 @@ def SyncMultiGPUTrainer(config):
return SyncMultiGPUTrainerParameterServer(config, ps_device='gpu')
class SyncMultiGPUTrainerReplicated(MultiGPUTrainerBase):
class SyncMultiGPUTrainerReplicated(Trainer):
"""
Data-parallel multi-GPU trainer where each GPU contains a replicate of the whole model.
It will build one tower on each GPU under its own variable scope.
......@@ -233,105 +98,23 @@ class SyncMultiGPUTrainerReplicated(MultiGPUTrainerBase):
self._input_source = config.data
super(SyncMultiGPUTrainerReplicated, self).__init__(config)
@staticmethod
def _allreduce_grads(tower_grads):
from tensorflow.contrib import nccl
nr_tower = len(tower_grads)
if nr_tower == 1:
return [[x] for x in tower_grads[0]]
new_tower_grads = []
with tf.name_scope('AvgGrad'):
for grad_and_vars in zip(*tower_grads):
v = grad_and_vars[0][1]
grads = [g for g, _ in grad_and_vars]
summed = nccl.all_sum(grads)
grads_for_a_var = []
for (_, v), g in zip(grad_and_vars, summed):
with tf.device(g.device):
g = tf.multiply(g, 1.0 / nr_tower)
grads_for_a_var.append((g, v))
new_tower_grads.append(grads_for_a_var)
# NVar * NGPU * 2
return new_tower_grads
@staticmethod
def setup_graph(model, input, tower):
"""
Args:
model (ModelDesc):
input (InputSource):
tower (list[int]):
def _setup(self):
callbacks = self._input_source.setup(self.model.get_inputs_desc())
Returns:
tf.Operation: the training op
def get_cost(*inputs):
self.model.build_graph(inputs)
return self.model.get_cost()
self.train_op, post_init_op = SyncMultiGPUReplicatedBuilder(self.config.tower).build(
self._input_source, get_cost, self.model.get_optimizer)
[Callback]: the callbacks to be added
"""
callbacks = input.setup(model.get_inputs_desc())
raw_devices = ['/gpu:{}'.format(k) for k in tower]
grad_list = MultiGPUTrainerBase.build_on_multi_tower(
tower,
lambda: MultiGPUTrainerBase._build_graph_get_grads(model, input),
# use no variable scope for the first tower
use_vs=[False] + [True] * (len(tower) - 1))
grads = SyncMultiGPUTrainerReplicated._allreduce_grads(grad_list)
train_ops = []
opt = model.get_optimizer()
for idx in range(len(tower)):
with tf.device(raw_devices[idx]):
grad_and_vars = [x[idx] for x in grads]
# apply_gradients may create variables. Make them LOCAL_VARIABLES
with override_to_local_variable(enable=idx > 0):
train_ops.append(opt.apply_gradients(
grad_and_vars, name='apply_grad_{}'.format(idx)))
train_op = tf.group(*train_ops, name='train_op')
cb = RunOp(
SyncMultiGPUTrainerReplicated.get_post_init_ops,
lambda: post_init_op,
run_before=True, run_as_trigger=True, verbose=True)
return train_op, callbacks + [cb]
self.config.callbacks.extend(callbacks + [cb])
def _setup(self):
self.train_op, cbs = SyncMultiGPUTrainerReplicated.setup_graph(
self.model, self._input_source, self.config.tower)
self.config.callbacks.extend(cbs)
# Adopt from https://github.com/tensorflow/benchmarks/blob/master/scripts/tf_cnn_benchmarks/variable_mgr.py
@staticmethod
def get_post_init_ops():
"""
Copy values of variables on GPU 0 to other GPUs.
"""
# literally all variables, because it's better to sync optimizer-internal variables as well
all_vars = tf.global_variables() + tf.local_variables()
var_by_name = dict([(v.name, v) for v in all_vars])
post_init_ops = []
for v in all_vars:
if not v.name.startswith('tower'):
continue
if v.name.startswith('tower0'):
logger.warn("[SyncMultiGPUTrainerReplicated] variable "
"{} has prefix 'tower0', this is unexpected.".format(v.name))
continue # TODO some vars (EMA) may still startswith tower0
# in this trainer, the master name doesn't have the towerx/ prefix
split_name = v.name.split('/')
prefix = split_name[0]
realname = '/'.join(split_name[1:])
if prefix in realname:
logger.error("[SyncMultiGPUTrainerReplicated] variable "
"{} has its prefix {} appears multiple times in its name!".format(v.name, prefix))
copy_from = var_by_name.get(realname)
assert copy_from is not None, var_by_name.keys()
post_init_ops.append(v.assign(copy_from.read_value()))
logger.info(
"'sync_variables_from_main_tower' includes {} operations.".format(len(post_init_ops)))
return tf.group(*post_init_ops, name='sync_variables_from_main_tower')
class AsyncMultiGPUTrainer(MultiGPUTrainerBase):
class AsyncMultiGPUTrainer(Trainer):
"""
A data-parallel multi-GPU trainer. It builds one tower on each GPU with shared variable scope.
Every tower computes the gradients and independently applies them to the
......@@ -349,55 +132,15 @@ class AsyncMultiGPUTrainer(MultiGPUTrainerBase):
self._scale_gradient = scale_gradient
super(AsyncMultiGPUTrainer, self).__init__(config)
@staticmethod
def setup_graph(model, input, scale_gradient, tower):
"""
Args:
model (ModelDesc):
input (InputSource):
scale_gradient (bool):
tower (list[int]):
def _setup(self):
callbacks = self._input_source.setup(self.model.get_inputs_desc())
Returns:
tf.Operation: the training op
def get_cost(*inputs):
self.model.build_graph(inputs)
return self.model.get_cost()
[Callback]: the callbacks to be added
"""
callbacks = input.setup(model.get_inputs_desc())
ps_device = 'cpu' if len(tower) >= 4 else 'gpu'
if ps_device == 'gpu':
raw_devices = ['/gpu:{}'.format(k) for k in tower]
devices = [LeastLoadedDeviceSetter(d, raw_devices) for d in raw_devices]
else:
devices = [tf.train.replica_device_setter(
worker_device=d, ps_device='/cpu:0', ps_tasks=1) for d in raw_devices]
grad_list = MultiGPUTrainerBase.build_on_multi_tower(
tower,
lambda: MultiGPUTrainerBase._build_graph_get_grads(model, input), devices)
MultiGPUTrainerBase._check_grad_list(grad_list)
if scale_gradient and len(tower) > 1:
# pretend to average the grads, in order to make async and
# sync have consistent effective learning rate
gradproc = ScaleGradient(('.*', 1.0 / len(tower)), verbose=False)
grad_list = [gradproc.process(gv) for gv in grad_list]
# Ngpu x Nvar x 2
train_ops = []
opt = model.get_optimizer()
with tf.name_scope('async_apply_gradients'):
for i, grad_and_vars in enumerate(zip(*grad_list)):
# Ngpu x 2
v = grad_and_vars[0][1]
with tf.device(v.device):
# will call apply_gradients (therefore gradproc) multiple times
train_ops.append(opt.apply_gradients(
grad_and_vars, name='apply_grad_{}'.format(i)))
return tf.group(*train_ops, name='train_op'), callbacks
self.train_op = AsyncMultiGPUBuilder(
self.config.tower, self._scale_gradient).build(
self._input_source, get_cost, self.model.get_optimizer)
def _setup(self):
self.train_op, cbs = AsyncMultiGPUTrainer.setup_graph(
self.model, self._input_source, self._scale_gradient, self.config.tower)
self.config.callbacks.extend(cbs)
self.config.callbacks.extend(callbacks)
......@@ -7,9 +7,9 @@ from .base import Trainer
from ..utils import logger
from ..graph_builder.input_source import FeedInput, QueueInput
from ..graph_builder.training import SimpleGraphBuilder
from ..graph_builder.training import SimpleBuilder
__all__ = ['SimpleTrainer']
__all__ = ['SimpleTrainer', 'QueueInputTrainer']
class SimpleTrainer(Trainer):
......@@ -46,7 +46,7 @@ class SimpleTrainer(Trainer):
self.model.build_graph(inputs)
return self.model.get_cost()
self.train_op = SimpleGraphBuilder().build(self._input_source, get_cost, self.model.get_optimizer)
self.train_op = SimpleBuilder().build(self._input_source, get_cost, self.model.get_optimizer)
self.config.callbacks.extend(cbs)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment