Commit ad8aa0a5 authored by Yuxin Wu's avatar Yuxin Wu

Add BytePS trainer; add docs

parent 5013ea51
......@@ -143,7 +143,7 @@ But how do you make sure you'll not run into one of the unsupported situations l
In the design, `torch.utils.data.Dataset` is simply a Python container/iterator, similar to DataFlow.
However it has made some **bad assumptions**:
it assumes your dataset has a `__len__` and supports `__getitem__`,
it assumes your dataset supports `__getitem__`,
which does not work when you have a dynamic/unreliable data source,
or when you need to filter your data on the fly.
......@@ -152,7 +152,7 @@ or when you need to filter your data on the fly.
1. It assumes you always do batch training, has a constant batch size, and
the batch grouping can be purely determined by indices.
All of these are not necessarily true.
None of these are necessarily true.
2. Its multiprocessing implementation is efficient on `torch.Tensor`,
but inefficient for generic data type or numpy arrays.
......@@ -160,7 +160,22 @@ or when you need to filter your data on the fly.
On the other hand, DataFlow:
1. Is a pure iterator, not necessarily has a length. This is more generic.
1. Is a pure iterator, not necessarily has a length or can be indexed. This is more generic.
2. Parallelization and batching are disentangled concepts.
You do not need to use batches, and can implement different batching logic easily.
3. Is optimized for generic data type and numpy arrays.
```eval_rst
.. note:: **Why is an iterator more general than ``__getitem__``? **
DataFlow's iterator interface can perfectly simulate the behavior of ``__getitem__`` interface like this:
.. code-block:: python
df = SomeIndexGenerator()
# A dataflow which produces indices, like [0], [1], [2], ...
# The indices can be either sequential, or more fancy, akin to `torch.utils.data.Sampler`.
df = MapData(df, lambda idx: dataset[idx[0]])
# Map the indices to datapoints by ``__getitem__``.
```
......@@ -423,7 +423,6 @@ class ImageNetModel(ModelDesc):
add_moving_summary(tf.reduce_mean(wrong, name='train-error-top5'))
return loss
def create_predict_config(self, session_init):
"""
Returns:
......
......@@ -29,7 +29,7 @@ __all__ = ['NoOpTrainer', 'SimpleTrainer',
'AsyncMultiGPUTrainer',
'DistributedTrainerParameterServer',
'DistributedTrainerReplicated',
'HorovodTrainer']
'HorovodTrainer', 'BytePSTrainer']
def _int_to_range(x):
......@@ -378,11 +378,10 @@ class HorovodTrainer(SingleCostTrainer):
logger.warn("Horovod and pyarrow may conflict due to pyarrow bugs. "
"Uninstall pyarrow and use msgpack instead.")
# lazy import
import horovod.tensorflow as _hvd
import horovod.tensorflow as hvd
import horovod
global hvd
hvd = _hvd
hvd_version = tuple(map(int, horovod.__version__.split('.')))
self.hvd = hvd
hvd.init()
self.is_chief = hvd.rank() == 0
......@@ -395,17 +394,17 @@ class HorovodTrainer(SingleCostTrainer):
super(HorovodTrainer, self).__init__()
def allreduce(self, grads):
if hvd.size() == 1:
if self.hvd.size() == 1:
return grads
# copied from https://github.com/uber/horovod/blob/master/horovod/tensorflow/__init__.py
averaged_gradients = []
with tf.name_scope("HVDAllReduce"):
with tf.name_scope("AllReduce"):
for grad, var in grads:
if grad is not None:
if self._compression is not None and self._has_compression:
avg_grad = hvd.allreduce(grad, average=self._average, compression=self._compression)
avg_grad = self.hvd.allreduce(grad, average=self._average, compression=self._compression)
else:
avg_grad = hvd.allreduce(grad, average=self._average)
avg_grad = self.hvd.allreduce(grad, average=self._average)
averaged_gradients.append((avg_grad, var))
else:
averaged_gradients.append((None, var))
......@@ -420,7 +419,7 @@ class HorovodTrainer(SingleCostTrainer):
self.train_op = opt.apply_gradients(grads, name='train_op')
def broadcast(self):
logger.info("Running horovod broadcast ...")
logger.info("Running broadcast ...")
# the op will be created later in initialize()
self.trainer._broadcast_op.run()
......@@ -433,18 +432,17 @@ class HorovodTrainer(SingleCostTrainer):
# broadcast_op should be the last setup_graph: it needs to be created
# "right before" the graph is finalized,
# because it needs to capture all the variables (which may be created by callbacks).
with tf.name_scope('horovod_broadcast'):
self._broadcast_op = hvd.broadcast_global_variables(0)
self._broadcast_op = self.hvd.broadcast_global_variables(0)
# it's important that our NewSessionCreator does not finalize the graph
if not isinstance(session_creator, NewSessionCreator):
raise ValueError(
"session_creator has to be `NewSessionCreator` for horovod training! ")
"session_creator has to be `NewSessionCreator` for horovod/byteps training! ")
# NOTE It will fail if GPU was already detected before initializing the session
# https://github.com/tensorflow/tensorflow/issues/8136
session_creator.config.gpu_options.visible_device_list = str(self._local_rank)
try:
session_creator.config.inter_op_parallelism_threads = mp.cpu_count() // hvd.local_size()
session_creator.config.inter_op_parallelism_threads = mp.cpu_count() // self.hvd.local_size()
except AttributeError: # old horovod does not have local_size
pass
super(HorovodTrainer, self).initialize(session_creator, session_init)
......@@ -461,5 +459,30 @@ class HorovodTrainer(SingleCostTrainer):
self.sess.run(self._broadcast_op)
# for lazy import
hvd = None
class BytePSTrainer(HorovodTrainer):
"""
BytePS trainer. Supports both multi-GPU and distributed training.
To use it, switch the trainer, and fefer to BytePS documentation on how to
launch server/scheduler/workers.
"""
def __init__(self, average=True):
"""
Args:
average (bool): whether to average or sum the gradients across processes.
"""
import byteps.tensorflow as bps
self.hvd = bps # BytePS has the same interface as Horovod
self.hvd.allreduce = bps.push_pull # https://github.com/bytedance/byteps/issues/8
# TODO bootstrap env vars
bps.init()
self.is_chief = bps.rank() == 0
self._local_rank = bps.local_rank()
self._rank = bps.rank()
self._average = average
self._compression = None
self._has_compression = False
logger.info("[BytePSTrainer] local rank={}".format(self._local_rank))
SingleCostTrainer.__init__(self)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment