Commit 8ec145ee authored by Yuxin Wu's avatar Yuxin Wu

update docs and debug the size of StagingInput

parent 5c25afcb
...@@ -135,6 +135,7 @@ class ILSVRC12Files(RNGDataFlow): ...@@ -135,6 +135,7 @@ class ILSVRC12Files(RNGDataFlow):
self.full_dir = os.path.join(dir, name) self.full_dir = os.path.join(dir, name)
self.name = name self.name = name
assert os.path.isdir(self.full_dir), self.full_dir assert os.path.isdir(self.full_dir), self.full_dir
assert meta_dir is None or os.path.isdir(meta_dir), meta_dir
if shuffle is None: if shuffle is None:
shuffle = name == 'train' shuffle = name == 'train'
self.shuffle = shuffle self.shuffle = shuffle
......
...@@ -165,10 +165,10 @@ class DistributedReplicatedBuilder(DataParallelBuilder, DistributedBuilderBase): ...@@ -165,10 +165,10 @@ class DistributedReplicatedBuilder(DataParallelBuilder, DistributedBuilderBase):
.. code-block:: none .. code-block:: none
# Start training like this: # Start training like this:
(host1)$ train.py --job worker --task 0 (host1)$ ./train.py --job worker --task 0
(host1)$ CUDA_VISIBLE_DEVICES= train.py --job ps --task 0 (host1)$ CUDA_VISIBLE_DEVICES= ./train.py --job ps --task 0
(host2)$ train.py --job worker --task 1 (host2)$ ./train.py --job worker --task 1
(host2)$ CUDA_VISIBLE_DEVICES= train.py --job ps --task 1 (host2)$ CUDA_VISIBLE_DEVICES= ./train.py --job ps --task 1
""" """
def __init__(self, towers, server): def __init__(self, towers, server):
......
...@@ -483,15 +483,14 @@ class StagingInput(FeedfreeInput): ...@@ -483,15 +483,14 @@ class StagingInput(FeedfreeInput):
A callback registered by this input source, to make sure stage/unstage A callback registered by this input source, to make sure stage/unstage
is run at each step. is run at each step.
""" """
def __init__(self, stage_op_fn, unstage_op_fn, nr_stage): def __init__(self, input, nr_stage):
self.nr_stage = nr_stage self.nr_stage = nr_stage
self.stage_op_fn = stage_op_fn self._input = input
self.unstage_op_fn = unstage_op_fn
self._initialized = False self._initialized = False
def _setup_graph(self): def _setup_graph(self):
self.stage_op = self.stage_op_fn() self.stage_op = self._input._get_stage_op()
unstage_op = self.unstage_op_fn() unstage_op = self._input._get_unstage_op()
self.fetches = tf.train.SessionRunArgs( self.fetches = tf.train.SessionRunArgs(
fetches=[self.stage_op, unstage_op]) fetches=[self.stage_op, unstage_op])
...@@ -523,6 +522,7 @@ class StagingInput(FeedfreeInput): ...@@ -523,6 +522,7 @@ class StagingInput(FeedfreeInput):
self._areas = [] self._areas = []
self._stage_ops = [] self._stage_ops = []
self._unstage_ops = [] self._unstage_ops = []
# self._size_ops = []
def _setup(self, inputs): def _setup(self, inputs):
self._input.setup(inputs) self._input.setup(inputs)
...@@ -530,10 +530,8 @@ class StagingInput(FeedfreeInput): ...@@ -530,10 +530,8 @@ class StagingInput(FeedfreeInput):
def _get_callbacks(self): def _get_callbacks(self):
cbs = self._input.get_callbacks() cbs = self._input.get_callbacks()
# Pass a lambda to be called later, because stage ops have not been built
cbs.append( cbs.append(
StagingInput.StagingCallback( StagingInput.StagingCallback(self, self._nr_stage))
lambda: self._get_stage_op(), lambda: self._get_unstage_op(), self._nr_stage))
return cbs return cbs
def _size(self): def _size(self):
...@@ -560,6 +558,7 @@ class StagingInput(FeedfreeInput): ...@@ -560,6 +558,7 @@ class StagingInput(FeedfreeInput):
for vin, vout in zip(inputs, outputs): for vin, vout in zip(inputs, outputs):
vout.set_shape(vin.get_shape()) vout.set_shape(vin.get_shape())
self._unstage_ops.append(outputs) self._unstage_ops.append(outputs)
# self._size_ops.append(stage.size())
return outputs return outputs
def _get_stage_op(self): def _get_stage_op(self):
...@@ -571,5 +570,17 @@ class StagingInput(FeedfreeInput): ...@@ -571,5 +570,17 @@ class StagingInput(FeedfreeInput):
all_outputs = list(chain.from_iterable(self._unstage_ops)) all_outputs = list(chain.from_iterable(self._unstage_ops))
return tf.group(*all_outputs) return tf.group(*all_outputs)
# for debugging only
def _create_ema_callback(self):
def create_ema_op():
with self.cached_name_scope():
avg_size = tf.truediv(tf.add_n(self._size_ops), len(self._size_ops), name='avg_stagingarea_size')
return add_moving_summary(avg_size, collection=None)[0].op
return RunOp(
create_ema_op,
run_before=False,
run_as_trigger=False,
run_step=True)
StagingInputWrapper = StagingInput StagingInputWrapper = StagingInput
...@@ -215,7 +215,7 @@ def add_moving_summary(*args, **kwargs): ...@@ -215,7 +215,7 @@ def add_moving_summary(*args, **kwargs):
ctx = get_current_tower_context() ctx = get_current_tower_context()
# allow ctx to be none # allow ctx to be none
if ctx is not None and not ctx.is_main_training_tower: if ctx is not None and not ctx.is_main_training_tower:
return return []
if not isinstance(args[0], list): if not isinstance(args[0], list):
v = args v = args
......
...@@ -283,12 +283,7 @@ class HorovodTrainer(SingleCostTrainer): ...@@ -283,12 +283,7 @@ class HorovodTrainer(SingleCostTrainer):
Note: Note:
1. If using all GPUs, you can always skip the `CUDA_VISIBLE_DEVICES` option. 1. If using all GPUs, you can always skip the `CUDA_VISIBLE_DEVICES` option.
2. About performance, horovod is expected to be slightly 2. Due to the use of MPI, training is less informative (no progress bar).
slower than native tensorflow on multi-GPU training, but faster in distributed training.
3. Due to the use of MPI, training is less informative (no progress bar).
It's recommended to use other multi-GPU trainers for single-node
experiments, and scale to multi nodes by horovod.
""" """
def __init__(self): def __init__(self):
hvd.init() hvd.init()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment