Commit c7b5a85f authored by Yuxin Wu's avatar Yuxin Wu

Let queue_size summary be part of QueueInput callback

parent a6aa8bce
...@@ -56,7 +56,7 @@ class RunOp(Callback): ...@@ -56,7 +56,7 @@ class RunOp(Callback):
def _before_run(self, _): def _before_run(self, _):
if self.run_step: if self.run_step:
self._print() self._print()
return self._fetch # faster than return [self._op] return self._fetch
def _print(self): def _print(self):
if self.verbose: if self.verbose:
......
...@@ -16,7 +16,7 @@ from six.moves import range ...@@ -16,7 +16,7 @@ from six.moves import range
from ..utils import logger from ..utils import logger
from ..utils.utils import get_tqdm_kwargs from ..utils.utils import get_tqdm_kwargs
from ..utils.develop import deprecated from ..utils.develop import deprecated
from ..dataflow import DataFlow from ..dataflow.base import DataFlow, DataFlowTerminated
from ..graph_builder.input_source_base import InputSource from ..graph_builder.input_source_base import InputSource
from ..graph_builder.input_source import ( from ..graph_builder.input_source import (
...@@ -79,13 +79,13 @@ class InferenceRunnerBase(Callback): ...@@ -79,13 +79,13 @@ class InferenceRunnerBase(Callback):
tower_id = self.trainer.config.predict_tower[0] tower_id = self.trainer.config.predict_tower[0]
device = '/gpu:{}'.format(tower_id) if tower_id >= 0 else '/cpu:0' device = '/gpu:{}'.format(tower_id) if tower_id >= 0 else '/cpu:0'
cbs = self._input_source.setup(self.trainer.model.get_inputs_desc()) self._input_callbacks = self._input_source.setup(self.trainer.model.get_inputs_desc())
with tf.variable_scope(tf.get_variable_scope(), reuse=True): with tf.variable_scope(tf.get_variable_scope(), reuse=True):
self._tower_handle = self.trainer.predictor_factory.build( self._tower_handle = self.trainer.predictor_factory.build(
self._tower_name, device, self._input_source) self._tower_name, device, self._input_source)
self._hooks = [self._build_hook(inf) for inf in self.infs] self._hooks = [self._build_hook(inf) for inf in self.infs]
self._hooks.extend([CallbackToHook(cb) for cb in cbs]) self._hooks.extend([CallbackToHook(cb) for cb in self._input_callbacks])
for inf in self.infs: for inf in self.infs:
inf.setup_graph(self.trainer) inf.setup_graph(self.trainer)
...@@ -108,8 +108,8 @@ class InferenceRunnerBase(Callback): ...@@ -108,8 +108,8 @@ class InferenceRunnerBase(Callback):
try: try:
for _ in tqdm.trange(self._size, **get_tqdm_kwargs()): for _ in tqdm.trange(self._size, **get_tqdm_kwargs()):
self._hooked_sess.run(fetches=[]) self._hooked_sess.run(fetches=[])
except StopIteration: except (StopIteration, DataFlowTerminated):
raise RuntimeError( logger.exception(
"[InferenceRunner] input stopped before reaching its size()! " + msg) "[InferenceRunner] input stopped before reaching its size()! " + msg)
for inf in self.infs: for inf in self.infs:
inf.trigger_epoch() inf.trigger_epoch()
......
...@@ -18,9 +18,9 @@ __all__ = ['StepTensorPrinter', 'ProgressBar'] ...@@ -18,9 +18,9 @@ __all__ = ['StepTensorPrinter', 'ProgressBar']
class StepTensorPrinter(Callback): class StepTensorPrinter(Callback):
""" It prints the value of some tensors in each step. """ Prints the value of some tensors in each step.
It's just a demo of how trigger_step works but you should in general use It's an example of how ``before_run/after_run`` works.
:func:`symbolic_functions.print_stat` or :func:`tf.Print` instead. """ """
def __init__(self, names): def __init__(self, names):
""" """
...@@ -31,7 +31,7 @@ class StepTensorPrinter(Callback): ...@@ -31,7 +31,7 @@ class StepTensorPrinter(Callback):
logger.warn("Using print_stat or tf.Print in the graph is much faster than StepTensorPrinter!") logger.warn("Using print_stat or tf.Print in the graph is much faster than StepTensorPrinter!")
self._names = names self._names = names
def _before_train(self): def _setup_graph(self):
self._fetches = get_op_or_tensor_by_name(self._names) self._fetches = get_op_or_tensor_by_name(self._names)
def _before_run(self, _): def _before_run(self, _):
......
...@@ -20,6 +20,7 @@ from ..tfutils.tower import get_current_tower_context ...@@ -20,6 +20,7 @@ from ..tfutils.tower import get_current_tower_context
from ..utils import logger from ..utils import logger
from ..utils.concurrency import ShareSessionThread from ..utils.concurrency import ShareSessionThread
from ..callbacks.base import Callback from ..callbacks.base import Callback
from ..callbacks.graph import RunOp
__all__ = ['PlaceholderInput', 'FeedInput', 'DataParallelFeedInput', __all__ = ['PlaceholderInput', 'FeedInput', 'DataParallelFeedInput',
'FeedfreeInput', 'FeedfreeInput',
...@@ -178,9 +179,6 @@ class EnqueueThread(ShareSessionThread): ...@@ -178,9 +179,6 @@ class EnqueueThread(ShareSessionThread):
self.op = self.queue.enqueue(self.placehdrs) self.op = self.queue.enqueue(self.placehdrs)
self.close_op = self.queue.close(cancel_pending_enqueues=True) self.close_op = self.queue.close(cancel_pending_enqueues=True)
self.size_op = self.queue.size()
add_moving_summary(tf.cast(
self.size_op, tf.float32, name='queue_size'))
def run(self): def run(self):
with self.default_sess(): with self.default_sess():
...@@ -235,11 +233,22 @@ class QueueInput(FeedfreeInput): ...@@ -235,11 +233,22 @@ class QueueInput(FeedfreeInput):
name='input_queue') name='input_queue')
self.thread = EnqueueThread(self.queue, self.ds, self._input_placehdrs) self.thread = EnqueueThread(self.queue, self.ds, self._input_placehdrs)
def _create_ema_callback(self):
with self.cached_name_scope():
# in TF there is no API to get queue capacity, so we can only summary the size
size = tf.cast(self.queue.size(), tf.float32, name='queue_size')
size_ema_op = add_moving_summary(size, collection=None)[0].op
return RunOp(
lambda: size_ema_op,
run_before=False,
run_as_trigger=False,
run_step=True)
def _get_callbacks(self): def _get_callbacks(self):
from ..callbacks.concurrency import StartProcOrThread from ..callbacks.concurrency import StartProcOrThread
cb = StartProcOrThread(self.thread) cb = StartProcOrThread(self.thread)
cb.chief_only = False cb.chief_only = False
return [cb] return [cb, self._create_ema_callback()]
def _get_input_tensors(self): def _get_input_tensors(self):
with tf.device('/cpu:0'), self.cached_name_scope(): with tf.device('/cpu:0'), self.cached_name_scope():
......
...@@ -141,7 +141,8 @@ class MapGradient(GradientProcessor): ...@@ -141,7 +141,8 @@ class MapGradient(GradientProcessor):
_summaried_gradient = set() _summaried_gradient = set()
# TODO let the maintain op depend on grad directly ? # TODO has dependency problems: sess.run may not depend on grad
# maybe group maintain op and grad ?
class SummaryGradient(MapGradient): class SummaryGradient(MapGradient):
""" """
Summary histogram and RMS for each gradient variable. Summary histogram and RMS for each gradient variable.
......
...@@ -165,15 +165,20 @@ def add_moving_summary(*args, **kwargs): ...@@ -165,15 +165,20 @@ def add_moving_summary(*args, **kwargs):
Args: Args:
args: tensors to summary args: tensors to summary
decay (float): the decay rate. Defaults to 0.95. decay (float): the decay rate. Defaults to 0.95.
collection (str): the name of the collection to add EMA-maintaining ops. collection (str or None): the name of the collection to add EMA-maintaining ops.
The default will work together with the default The default will work together with the default
:class:`MovingAverageSummary` callback. :class:`MovingAverageSummary` callback.
Returns:
[tf.Tensor]: list of tensors returned by assign_moving_average,
which can be used to maintain the EMA.
""" """
decay = kwargs.pop('decay', 0.95) decay = kwargs.pop('decay', 0.95)
coll = kwargs.pop('collection', MOVING_SUMMARY_OPS_KEY) coll = kwargs.pop('collection', MOVING_SUMMARY_OPS_KEY)
assert len(kwargs) == 0, "Unknown arguments: " + str(kwargs) assert len(kwargs) == 0, "Unknown arguments: " + str(kwargs)
ctx = get_current_tower_context() ctx = get_current_tower_context()
# allow ctx to be none
if ctx is not None and not ctx.is_main_training_tower: if ctx is not None and not ctx.is_main_training_tower:
return return
...@@ -188,21 +193,26 @@ def add_moving_summary(*args, **kwargs): ...@@ -188,21 +193,26 @@ def add_moving_summary(*args, **kwargs):
G = tf.get_default_graph() G = tf.get_default_graph()
# TODO variable not saved under distributed # TODO variable not saved under distributed
ema_ops = []
for c in v: for c in v:
name = re.sub('tower[0-9]+/', '', c.op.name) name = re.sub('tower[0-9]+/', '', c.op.name)
with G.colocate_with(c), tf.name_scope(None): with G.colocate_with(c), tf.name_scope(None):
# assign_moving_average creates variables with op names, therefore clear ns first.
with _enter_vs_reuse_ns('EMA') as vs: with _enter_vs_reuse_ns('EMA') as vs:
# will actually create ns EMA_1, EMA_2, etc. tensorflow#6007
ema_var = tf.get_variable(name, shape=c.shape, dtype=c.dtype, ema_var = tf.get_variable(name, shape=c.shape, dtype=c.dtype,
initializer=tf.constant_initializer(), trainable=False) initializer=tf.constant_initializer(), trainable=False)
ns = vs.original_name_scope ns = vs.original_name_scope
with tf.name_scope(ns): # reuse VS&NS so that no EMA_1 will appear with tf.name_scope(ns): # reuse VS&NS so that EMA_1 won't appear
ema_op = moving_averages.assign_moving_average( ema_op = moving_averages.assign_moving_average(
ema_var, c, decay, ema_var, c, decay,
zero_debias=True, name=name + '_EMA_apply') zero_debias=True, name=name + '_EMA_apply')
tf.summary.scalar(name + '-summary', ema_op) tf.summary.scalar(name + '-summary', ema_op) # write the EMA value as a summary
tf.add_to_collection(coll, ema_op) ema_ops.append(ema_op)
if coll is not None:
for op in ema_ops:
# TODO a new collection to summary every step? # TODO a new collection to summary every step?
tf.add_to_collection(coll, op)
return ema_ops
try: try:
......
...@@ -34,7 +34,7 @@ class StopTraining(BaseException): ...@@ -34,7 +34,7 @@ class StopTraining(BaseException):
class MaintainStepCounter(Callback): class MaintainStepCounter(Callback):
""" """
It maintains the global step in the graph, making sure it's increased by one. It maintains the global step in the graph, making sure it's increased by one.
This callback is always enabled by the trainer, and you wouldn't need to use it. This callback is always enabled by the trainer, you don't need to worry about it.
""" """
def _setup_graph(self): def _setup_graph(self):
# ensure it exists # ensure it exists
...@@ -64,10 +64,10 @@ class Trainer(object): ...@@ -64,10 +64,10 @@ class Trainer(object):
Attributes: Attributes:
config (TrainConfig): the config used in this trainer. config (TrainConfig): the config used in this trainer.
model (ModelDesc): model (ModelDesc): alias for ``config.model``.
sess (tf.Session): the current session in use. sess (tf.Session): the current session in use.
hooked_sess (tf.MonitoredSession): the session with hooks. hooked_sess (tf.MonitoredSession): the session with hooks.
monitors (Monitors): the monitors. Callbacks can use it for logging. monitors (Monitors): the monitors. Other callbacks can use it for logging.
local_step (int): the number of (tensorpack) steps that have finished in the current epoch. local_step (int): the number of (tensorpack) steps that have finished in the current epoch.
""" """
# step attr only available after before_train? # step attr only available after before_train?
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment