Commit 08821b55 authored by Yuxin Wu's avatar Yuxin Wu

refactor queueinputtrainer a bit

parent 0e7f338c
...@@ -21,7 +21,7 @@ import os ...@@ -21,7 +21,7 @@ import os
sys.path.insert(0, os.path.abspath('../')) sys.path.insert(0, os.path.abspath('../'))
import mock import mock
MOCK_MODULES = ['numpy', 'scipy', 'tensorflow'] MOCK_MODULES = ['numpy', 'scipy', 'tensorflow', 'scipy.misc', 'h5py', 'nltk']
for mod_name in MOCK_MODULES: for mod_name in MOCK_MODULES:
sys.modules[mod_name] = mock.Mock() sys.modules[mod_name] = mock.Mock()
......
## ResNet ## ResNet
Implements the paper "Deep Residual Learning for Image Recognition", [http://arxiv.org/abs/1512.03385](http://arxiv.org/abs/1512.03385) Implement the paper "Deep Residual Learning for Image Recognition", [http://arxiv.org/abs/1512.03385](http://arxiv.org/abs/1512.03385)
with the variants proposed in "Identity Mappings in Deep Residual Networks", [https://arxiv.org/abs/1603.05027](https://arxiv.org/abs/1603.05027). with the variants proposed in "Identity Mappings in Deep Residual Networks", [https://arxiv.org/abs/1603.05027](https://arxiv.org/abs/1603.05027).
The train error shown here is a moving average of the error rate of each batch in training. The train error shown here is a moving average of the error rate of each batch in training.
......
#!/usr/bin/env python #!/usr/bin/env python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# File: mnist_convnet.py # File: mnist-convnet.py
# Author: Yuxin Wu <ppwwyyxx@gmail.com> # Author: Yuxin Wu <ppwwyyxx@gmail.com>
import tensorflow as tf import tensorflow as tf
...@@ -121,5 +121,6 @@ if __name__ == '__main__': ...@@ -121,5 +121,6 @@ if __name__ == '__main__':
config = get_config() config = get_config()
if args.load: if args.load:
config.session_init = SaverRestore(args.load) config.session_init = SaverRestore(args.load)
tp.SimpleTrainer(config).train() #tp.SimpleTrainer(config).train()
tp.QueueInputTrainer(config).train()
...@@ -6,25 +6,8 @@ Define a `DataFlow` instance to feed data. ...@@ -6,25 +6,8 @@ Define a `DataFlow` instance to feed data.
See [Dataflow documentation](https://github.com/ppwwyyxx/tensorpack/tree/master/tensorpack/dataflow) See [Dataflow documentation](https://github.com/ppwwyyxx/tensorpack/tree/master/tensorpack/dataflow)
### How to define a model ### How to define a model
Take a look at the `get_model` function in [mnist example](https://github.com/ppwwyyxx/tensorpack/blob/master/example_mnist.py) first.
To define a model, write a `get_model` function which accepts two arguments: Take a look at [mnist example](https://github.com/ppwwyyxx/tensorpack/blob/master/example_mnist.py) first.
+ inputs: a list of variables used as input in training. inputs could be batched or not batched (see
[training](#how-to-perform-training))
+ is_training: the graph for training and inference could be different (e.g. dropout, augmentation),
`get_model` function should use this variable to know is it doing training or inference.
The function should define a graph based on input variables.
It could use any pre-defined routines in [tensorpack/models](https://github.com/ppwwyyxx/tensorpack/tree/master/tensorpack/models),
or use tensorflow symbolic functions.
It may also define other helper variables to monitor the training,
(e.g. accuracy), and add tensorboard summaries you need. (See [howto summary](#use-tensorboard-summary))
Also, it's helpful to give names to some important variables used in inference. (See
[inference](#how-to-perform-inference)).
The function should at last return the cost to minimize.
### How to perform training ### How to perform training
...@@ -35,6 +18,3 @@ The function should at last return the cost to minimize. ...@@ -35,6 +18,3 @@ The function should at last return the cost to minimize.
### How to add new models ### How to add new models
### Use tensorboard summary ### Use tensorboard summary
<!--
- what will be automatically summaried
-->
...@@ -2,12 +2,12 @@ ...@@ -2,12 +2,12 @@
# File: __init__.py # File: __init__.py
# Author: Yuxin Wu <ppwwyyxx@gmail.com> # Author: Yuxin Wu <ppwwyyxx@gmail.com>
import models from . import models
import train from . import train
import utils from . import utils
import tfutils from . import tfutils
import callbacks from . import callbacks
import dataflow from . import dataflow
from .train import * from .train import *
from .models import * from .models import *
......
...@@ -3,11 +3,14 @@ ...@@ -3,11 +3,14 @@
# Author: Yuxin Wu <ppwwyyxx@gmail.com> # Author: Yuxin Wu <ppwwyyxx@gmail.com>
import sys, os import sys, os
import multiprocessing
from scipy.misc import imsave from scipy.misc import imsave
from ..utils.fs import mkdir_p from ..utils.fs import mkdir_p
# TODO name_func to write label? __all__ = ['dump_dataset_images', 'dataflow_to_process_queue']
# TODO pass a name_func to write label as filename?
def dump_dataset_images(ds, dirname, max_count=None, index=0): def dump_dataset_images(ds, dirname, max_count=None, index=0):
""" Dump images from a `DataFlow` to a directory. """ Dump images from a `DataFlow` to a directory.
...@@ -25,3 +28,34 @@ def dump_dataset_images(ds, dirname, max_count=None, index=0): ...@@ -25,3 +28,34 @@ def dump_dataset_images(ds, dirname, max_count=None, index=0):
return return
img = dp[index] img = dp[index]
imsave(os.path.join(dirname, "{}.jpg".format(i)), img) imsave(os.path.join(dirname, "{}.jpg".format(i)), img)
def dataflow_to_process_queue(ds, size, nr_consumer):
"""
Convert a `DataFlow` to a multiprocessing.Queue.
:param ds: a `DataFlow`
:param size: size of the queue
:param nr_consumer: number of consumer of the queue.
will add this many of `DIE` sentinel to the end of the queue.
:returns: (queue, process). The process will take data from `ds` to fill
the queue once you start it.
"""
q = multiprocessing.Queue(size)
class EnqueProc(multiprocessing.Process):
def __init__(self, ds, q, nr_consumer):
super(EnqueProc, self).__init__()
self.ds = ds
self.q = q
def run(self):
try:
for idx, dp in enumerate(self.ds.get_data()):
self.q.put((idx, dp))
finally:
for _ in range(nr_consumer):
self.q.put((DIE, None))
proc = EnqueProc(ds, q, nr_consumer)
return q, proc
...@@ -39,16 +39,6 @@ class ModelDesc(object): ...@@ -39,16 +39,6 @@ class ModelDesc(object):
def _get_input_vars(self): def _get_input_vars(self):
pass pass
# TODO move this to QueueInputTrainer
def get_input_queue(self, input_vars):
"""
return the queue for input. the dequeued elements will be fed to self.get_cost
if queue is None, datapoints from dataflow will be fed to the graph directly.
when running with multiGPU, queue cannot be None
"""
assert input_vars is not None
return tf.FIFOQueue(100, [x.dtype for x in input_vars], name='input_queue')
def get_cost(self, input_vars, is_training): def get_cost(self, input_vars, is_training):
""" """
:param input_vars: a list of input variable in the graph :param input_vars: a list of input variable in the graph
......
...@@ -3,12 +3,10 @@ ...@@ -3,12 +3,10 @@
# Author: Yuxin Wu <ppwwyyxx@gmail.com> # Author: Yuxin Wu <ppwwyyxx@gmail.com>
import tensorflow as tf import tensorflow as tf
from itertools import count
import argparse
from collections import namedtuple
import numpy as np import numpy as np
from collections import namedtuple
from tqdm import tqdm from tqdm import tqdm
from six.moves import zip from six.moves import zip, range
import multiprocessing import multiprocessing
from .utils.concurrency import ensure_proc_terminate, OrderedResultGatherProc, DIE from .utils.concurrency import ensure_proc_terminate, OrderedResultGatherProc, DIE
...@@ -17,6 +15,7 @@ from .tfutils import * ...@@ -17,6 +15,7 @@ from .tfutils import *
from .utils import logger from .utils import logger
from .tfutils.modelutils import describe_model from .tfutils.modelutils import describe_model
from .dataflow import DataFlow, BatchData from .dataflow import DataFlow, BatchData
from .dataflow.dftools import dataflow_to_process_queue
__all__ = ['PredictConfig', 'DatasetPredictor', 'get_predict_func'] __all__ = ['PredictConfig', 'DatasetPredictor', 'get_predict_func']
...@@ -102,7 +101,7 @@ class PredictWorker(multiprocessing.Process): ...@@ -102,7 +101,7 @@ class PredictWorker(multiprocessing.Process):
""" A worker process to run predictor on one GPU """ """ A worker process to run predictor on one GPU """
def __init__(self, idx, gpuid, inqueue, outqueue, config): def __init__(self, idx, gpuid, inqueue, outqueue, config):
""" """
:param idx: index of the worker :param idx: index of the worker. the 0th worker will print log.
:param gpuid: id of the GPU to be used :param gpuid: id of the GPU to be used
:param inqueue: input queue to get data point :param inqueue: input queue to get data point
:param outqueue: output queue put result :param outqueue: output queue put result
...@@ -118,7 +117,7 @@ class PredictWorker(multiprocessing.Process): ...@@ -118,7 +117,7 @@ class PredictWorker(multiprocessing.Process):
def run(self): def run(self):
os.environ['CUDA_VISIBLE_DEVICES'] = self.gpuid os.environ['CUDA_VISIBLE_DEVICES'] = self.gpuid
G = tf.Graph() # build a graph for each process, because they don't need to share anything G = tf.Graph() # build a graph for each process, because they don't need to share anything
with G.as_default(), tf.device('/gpu:{}'.format(self.idx)): with G.as_default(), tf.device('/gpu:0'):
self.func = get_predict_func(self.config) self.func = get_predict_func(self.config)
if self.idx == 0: if self.idx == 0:
describe_model() describe_model()
...@@ -131,33 +130,6 @@ class PredictWorker(multiprocessing.Process): ...@@ -131,33 +130,6 @@ class PredictWorker(multiprocessing.Process):
res = PredictResult(dp, self.func(dp)) res = PredictResult(dp, self.func(dp))
self.outqueue.put((tid, res)) self.outqueue.put((tid, res))
def DFtoQueue(ds, size, nr_consumer):
"""
Build a queue that produce data from `DataFlow`, and a process
that fills the queue.
:param ds: a `DataFlow`
:param size: size of the queue
:param nr_consumer: number of consumer of the queue.
will add this many of `DIE` sentinel to the end of the queue.
:returns: (queue, process)
"""
q = multiprocessing.Queue(size)
class EnqueProc(multiprocessing.Process):
def __init__(self, ds, q, nr_consumer):
super(EnqueProc, self).__init__()
self.ds = ds
self.q = q
def run(self):
for idx, dp in enumerate(self.ds.get_data()):
self.q.put((idx, dp))
print "Enqueue ends"
for _ in range(nr_consumer):
self.q.put((DIE, None))
proc = EnqueProc(ds, q, nr_consumer)
return q, proc
class DatasetPredictor(object): class DatasetPredictor(object):
""" """
Run the predict_config on a given `DataFlow`. Run the predict_config on a given `DataFlow`.
...@@ -171,12 +143,12 @@ class DatasetPredictor(object): ...@@ -171,12 +143,12 @@ class DatasetPredictor(object):
self.ds = dataset self.ds = dataset
self.nr_gpu = config.nr_gpu self.nr_gpu = config.nr_gpu
if self.nr_gpu > 1: if self.nr_gpu > 1:
self.inqueue, self.inqueue_proc = DFtoQueue(self.ds, 10, self.nr_gpu) self.inqueue, self.inqueue_proc = dataflow_to_process_queue(self.ds, 10, self.nr_gpu)
self.outqueue = multiprocessing.Queue() self.outqueue = multiprocessing.Queue()
try: try:
gpus = os.environ['CUDA_VISIBLE_DEVICES'].split(',') gpus = os.environ['CUDA_VISIBLE_DEVICES'].split(',')
except KeyError: except KeyError:
gpus = range(self.nr_gpu) gpus = list(range(self.nr_gpu))
self.workers = [PredictWorker(i, gpus[i], self.inqueue, self.outqueue, config) self.workers = [PredictWorker(i, gpus[i], self.inqueue, self.outqueue, config)
for i in range(self.nr_gpu)] for i in range(self.nr_gpu)]
self.result_queue = OrderedResultGatherProc(self.outqueue) self.result_queue = OrderedResultGatherProc(self.outqueue)
......
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
from ..utils.naming import * from ..utils.naming import *
import tensorflow as tf import tensorflow as tf
def get_default_sess_config(mem_fraction=0.99): def get_default_sess_config(mem_fraction=0.9):
""" """
Return a better session config to use as default. Return a better session config to use as default.
Tensorflow default session config consume too much resources. Tensorflow default session config consume too much resources.
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
import six import six
import tensorflow as tf import tensorflow as tf
import re
from ..utils import * from ..utils import *
from . import get_global_step_var from . import get_global_step_var
...@@ -69,23 +70,18 @@ def add_param_summary(summary_lists): ...@@ -69,23 +70,18 @@ def add_param_summary(summary_lists):
for act in actions: for act in actions:
perform(p, act) perform(p, act)
# TODO get rid of the cost_var thing... def summary_moving_average():
def summary_moving_average(cost_var):
""" Create a MovingAverage op and summary for all variables in """ Create a MovingAverage op and summary for all variables in
MOVING_SUMMARY_VARS_KEY, as well as `cost_var`. MOVING_SUMMARY_VARS_KEY.
:returns: a op to maintain these average. :returns: a op to maintain these average.
""" """
global_step_var = get_global_step_var() global_step_var = get_global_step_var()
averager = tf.train.ExponentialMovingAverage( averager = tf.train.ExponentialMovingAverage(
0.99, num_updates=global_step_var, name='moving_averages') 0.99, num_updates=global_step_var, name='moving_averages')
vars_to_summary = [cost_var] + \ vars_to_summary = tf.get_collection(MOVING_SUMMARY_VARS_KEY)
tf.get_collection(MOVING_SUMMARY_VARS_KEY)
avg_maintain_op = averager.apply(vars_to_summary) avg_maintain_op = averager.apply(vars_to_summary)
for idx, c in enumerate(vars_to_summary): for idx, c in enumerate(vars_to_summary):
name = c.op.name name = re.sub('tower[0-9]+/', '', c.op.name)
if idx == 0:
name = 'train_cost'
tf.scalar_summary(name, averager.average(c)) tf.scalar_summary(name, averager.average(c))
return avg_maintain_op return avg_maintain_op
...@@ -63,7 +63,7 @@ class Trainer(object): ...@@ -63,7 +63,7 @@ class Trainer(object):
summary = tf.Summary.FromString(summary_str) summary = tf.Summary.FromString(summary_str)
for val in summary.value: for val in summary.value:
if val.WhichOneof('value') == 'simple_value': if val.WhichOneof('value') == 'simple_value':
val.tag = re.sub('tower[0-9]*/', '', val.tag) # TODO move to subclasses val.tag = re.sub('tower[0-9]+/', '', val.tag) # TODO move to subclasses
self.stat_holder.add_stat(val.tag, val.simple_value) self.stat_holder.add_stat(val.tag, val.simple_value)
self.summary_writer.add_summary(summary, self.global_step) self.summary_writer.add_summary(summary, self.global_step)
......
...@@ -81,14 +81,27 @@ class EnqueueThread(threading.Thread): ...@@ -81,14 +81,27 @@ class EnqueueThread(threading.Thread):
class QueueInputTrainer(Trainer): class QueueInputTrainer(Trainer):
""" """
Trainer which builds a queue for input. Trainer which builds a FIFO queue for input.
Support multi GPU. Support multi GPU.
""" """
def __init__(self, config, input_queue=None):
"""
:param config: a `TrainConfig` instance
:param input_queue: a `tf.QueueBase` instance to be used to buffer datapoints.
Defaults to a FIFO queue of size 100.
"""
super(QueueInputTrainer, self).__init__(config)
self.input_vars = self.model.get_input_vars()
if input_queue is None:
self.input_queue = tf.FIFOQueue(
100, [x.dtype for x in self.input_vars], name='input_queue')
else:
self.input_queue = input_queue
@staticmethod @staticmethod
def _average_grads(tower_grads): def _average_grads(tower_grads):
ret = [] ret = []
with tf.device('/gpu:0'):
for grad_and_vars in zip(*tower_grads): for grad_and_vars in zip(*tower_grads):
v = grad_and_vars[0][1] v = grad_and_vars[0][1]
try: try:
...@@ -99,55 +112,60 @@ class QueueInputTrainer(Trainer): ...@@ -99,55 +112,60 @@ class QueueInputTrainer(Trainer):
ret.append((grad, v)) ret.append((grad, v))
return ret return ret
def train(self): def _get_model_inputs(self):
model = self.model """ Dequeue a datapoint from input_queue and return"""
input_vars = model.get_input_vars() ret = self.input_queue.dequeue()
input_queue = model.get_input_queue(input_vars) if isinstance(ret, tf.Tensor): # only one input
enqueue_op = input_queue.enqueue(input_vars) ret = [ret]
assert len(ret) == len(self.input_vars)
def get_model_inputs(): for qv, v in zip(ret, self.input_vars):
model_inputs = input_queue.dequeue()
if isinstance(model_inputs, tf.Tensor): # only one input
model_inputs = [model_inputs]
for qv, v in zip(model_inputs, input_vars):
qv.set_shape(v.get_shape()) qv.set_shape(v.get_shape())
return model_inputs return ret
def _single_tower_grad_cost(self):
""" Get grad and cost for single-tower case"""
model_inputs = self._get_model_inputs()
cost_var = self.model.get_cost(model_inputs, is_training=True)
grads = self.config.optimizer.compute_gradients(cost_var)
return (grads, cost_var)
# get gradients to update: def _multi_tower_grad_cost(self):
if self.config.nr_tower > 1:
logger.info("Training a model of {} tower".format(self.config.nr_tower)) logger.info("Training a model of {} tower".format(self.config.nr_tower))
# to avoid repeated summary from each device # to avoid repeated summary from each device
coll_keys = [tf.GraphKeys.SUMMARIES, MOVING_SUMMARY_VARS_KEY] collect_dedup = [tf.GraphKeys.SUMMARIES, MOVING_SUMMARY_VARS_KEY]
kept_summaries = {} kept_summaries = {}
grad_list = [] grad_list = []
for i in range(self.config.nr_tower): for i in range(self.config.nr_tower):
with tf.device('/gpu:{}'.format(i)), \ with tf.device('/gpu:{}'.format(i)), \
tf.name_scope('tower{}'.format(i)) as scope: tf.name_scope('tower{}'.format(i)) as scope:
model_inputs = get_model_inputs() model_inputs = self._get_model_inputs() # each tower dequeue from input queue
cost_var = model.get_cost(model_inputs, is_training=True) cost_var = self.model.get_cost(model_inputs, is_training=True) # build tower
if i == 0:
cost_var_t0 = cost_var # gate_gradienst=0 seems to be faster?
grad_list.append( grad_list.append(
self.config.optimizer.compute_gradients(cost_var, self.config.optimizer.compute_gradients(cost_var, gate_gradients=0))
gate_gradients=0))
if i == 0: if i == 0:
cost_var_t0 = cost_var
tf.get_variable_scope().reuse_variables() tf.get_variable_scope().reuse_variables()
for k in coll_keys: for k in collect_dedup:
kept_summaries[k] = copy.copy(tf.get_collection(k)) kept_summaries[k] = copy.copy(tf.get_collection(k))
logger.info("Graph built for tower {}.".format(i)) logger.info("Graph built for tower {}.".format(i))
for k in coll_keys: for k in collect_dedup:
del tf.get_collection_ref(k)[:] del tf.get_collection_ref(k)[:]
tf.get_collection_ref(k).extend(kept_summaries[k]) tf.get_collection_ref(k).extend(kept_summaries[k])
grads = QueueInputTrainer._average_grads(grad_list) grads = QueueInputTrainer._average_grads(grad_list)
cost_var = cost_var_t0 return (grads, cost_var_t0)
else:
model_inputs = get_model_inputs() def train(self):
cost_var = model.get_cost(model_inputs, is_training=True) enqueue_op = self.input_queue.enqueue(self.input_vars)
grads = self.config.optimizer.compute_gradients(cost_var)
avg_maintain_op = summary_moving_average(cost_var) # TODO(multigpu) average the cost from each device? grads, cost_var = self._single_tower_grad_cost() \
if self.config.nr_tower == 0 else self._multi_tower_grad_cost()
tf.add_to_collection(MOVING_SUMMARY_VARS_KEY, cost_var)
avg_maintain_op = summary_moving_average()
grads = self.process_grads(grads) grads = self.process_grads(grads)
...@@ -157,7 +175,7 @@ class QueueInputTrainer(Trainer): ...@@ -157,7 +175,7 @@ class QueueInputTrainer(Trainer):
self.init_session_and_coord() self.init_session_and_coord()
# create a thread that keeps filling the queue # create a thread that keeps filling the queue
self.input_th = EnqueueThread(self, input_queue, enqueue_op, input_vars) self.input_th = EnqueueThread(self, self.input_queue, enqueue_op, self.input_vars)
self.main_loop() self.main_loop()
def _start_all_threads(self): def _start_all_threads(self):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment