Commit 83e4e187 authored by Yuxin Wu's avatar Yuxin Wu

replicated trainer aggregation mode heuristics

parent 0d7e71df
...@@ -13,7 +13,7 @@ It's Yet Another TF wrapper, but different in: ...@@ -13,7 +13,7 @@ It's Yet Another TF wrapper, but different in:
1. Focus on __training speed__. 1. Focus on __training speed__.
+ Speed comes for free with tensorpack -- it uses TensorFlow in the __efficient way__ with no extra overhead. + Speed comes for free with tensorpack -- it uses TensorFlow in the __efficient way__ with no extra overhead.
On different CNNs, it runs [1.2~4x faster](https://github.com/tensorpack/benchmarks/tree/master/other-wrappers) than the equivalent Keras code. On different CNNs, it runs [1.2~5x faster](https://github.com/tensorpack/benchmarks/tree/master/other-wrappers) than the equivalent Keras code.
+ Data-parallel multi-GPU training is off-the-shelf to use. It scales as well as Google's [official benchmark](https://www.tensorflow.org/performance/benchmarks). + Data-parallel multi-GPU training is off-the-shelf to use. It scales as well as Google's [official benchmark](https://www.tensorflow.org/performance/benchmarks).
......
...@@ -223,13 +223,15 @@ class SyncMultiGPUReplicatedBuilder(DataParallelBuilder): ...@@ -223,13 +223,15 @@ class SyncMultiGPUReplicatedBuilder(DataParallelBuilder):
if self._mode == 'nccl': if self._mode == 'nccl':
all_grads = allreduce_grads(all_grads, average=self._average) # #gpu x #param x 2 all_grads = allreduce_grads(all_grads, average=self._average) # #gpu x #param x 2
else: else:
# all_grads = allreduce_grads_hierarchical(all_grads, raw_devices, average=self._average)
packer = GradientPacker(len(raw_devices)) packer = GradientPacker(len(raw_devices))
packer.compute_strategy(all_grads[0]) succ = packer.compute_strategy(all_grads[0])
packed_grads = packer.pack_all(all_grads, raw_devices) if succ:
packed_grads_aggr = allreduce_grads_hierarchical(packed_grads, raw_devices, average=self._average) packed_grads = packer.pack_all(all_grads, raw_devices)
all_grads = packer.unpack_all(packed_grads_aggr, raw_devices) packed_grads_aggr = allreduce_grads_hierarchical(
packed_grads, raw_devices, average=self._average)
all_grads = packer.unpack_all(packed_grads_aggr, raw_devices)
else:
all_grads = allreduce_grads_hierarchical(all_grads, raw_devices, average=self._average)
self.grads = merge_grad_list(all_grads, all_vars) self.grads = merge_grad_list(all_grads, all_vars)
elif self._mode == 'cpu': elif self._mode == 'cpu':
......
...@@ -312,16 +312,24 @@ class GradientPacker(object): ...@@ -312,16 +312,24 @@ class GradientPacker(object):
@call_only_once @call_only_once
def compute_strategy(self, grads): def compute_strategy(self, grads):
"""
Returns:
bool - False if grads cannot be packed due to various reasons.
"""
for g in grads: for g in grads:
assert g.shape.is_fully_defined(), "Shape of {} is {}!".format(g.name, g.shape) assert g.shape.is_fully_defined(), "Shape of {} is {}!".format(g.name, g.shape)
self._shapes = [g.shape for g in grads] self._shapes = [g.shape for g in grads]
self._sizes = [g.shape.num_elements() for g in grads] self._sizes = [g.shape.num_elements() for g in grads]
self._total_size = sum(self._sizes) self._total_size = sum(self._sizes)
assert self._total_size > self._num_split if self._total_size / self._num_split < 1024:
logger.info("Skip GradientPacker due to too few gradients.")
return False
# should have the same dtype # should have the same dtype
dtypes = set([g.dtype for g in grads]) dtypes = set([g.dtype for g in grads])
assert len(dtypes) == 1, dtypes if len(dtypes) != 1:
logger.info("Skip GradientPacker due to inconsistent gradient types.")
return False
split_size = self._total_size // self._num_split split_size = self._total_size // self._num_split
split_size_last = self._total_size - split_size * (self._num_split - 1) split_size_last = self._total_size - split_size * (self._num_split - 1)
...@@ -329,6 +337,7 @@ class GradientPacker(object): ...@@ -329,6 +337,7 @@ class GradientPacker(object):
logger.info( logger.info(
"Will pack {} gradients of total number={} into {} splits.".format( "Will pack {} gradients of total number={} into {} splits.".format(
len(self._sizes), self._total_size, self._num_split)) len(self._sizes), self._total_size, self._num_split))
return True
def pack(self, grads): def pack(self, grads):
""" """
......
...@@ -146,17 +146,19 @@ class SyncMultiGPUTrainerReplicated(SingleCostTrainer): ...@@ -146,17 +146,19 @@ class SyncMultiGPUTrainerReplicated(SingleCostTrainer):
gpus (int or [int]): list of GPU ids. gpus (int or [int]): list of GPU ids.
average (bool): whether to average or sum gradients. average (bool): whether to average or sum gradients.
mode (str or None): Gradient aggregation mode. mode (str or None): Gradient aggregation mode.
These methods may have slight differences in speed. Supported values: ['nccl', 'hierarchical', 'cpu'].
Supported values: ['nccl', 'cpu']. Default to pick Default to pick automatically by heuristics.
automatically by heuristics. These modes may have slight (within 5%) differences in speed.
""" """
self.devices = gpus self.devices = gpus
if use_nccl is not None: if use_nccl is not None:
mode = 'nccl' if use_nccl else 'cpu' mode = 'nccl' if use_nccl else None
logger.warn("use_nccl option was deprecated! Use the `mode` option instead!") logger.warn("use_nccl option was deprecated! Use the `mode` option instead!")
if mode is None: if mode is None:
mode = 'nccl' mode = 'hierarchical' if len(gpus) >= 8 else 'nccl'
mode = mode.lower() mode = mode.lower()
self._builder = SyncMultiGPUReplicatedBuilder(gpus, average, mode) self._builder = SyncMultiGPUReplicatedBuilder(gpus, average, mode)
super(SyncMultiGPUTrainerReplicated, self).__init__() super(SyncMultiGPUTrainerReplicated, self).__init__()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment