Commit 0459677e authored by Yuxin Wu's avatar Yuxin Wu

Add basic profiling callback (#309)

parent ef1e9611
...@@ -16,7 +16,7 @@ class Callback(object): ...@@ -16,7 +16,7 @@ class Callback(object):
Attributes: Attributes:
epoch_num(int): the number of the current epoch. epoch_num(int): the number of the current epoch.
global_step(int): the number of global steps that have finished. global_step(int): the number of global steps that have finished or is currently running.
local_step(int): the local steps within the current epoch. local_step(int): the local steps within the current epoch.
trainer(Trainer): the trainer. trainer(Trainer): the trainer.
graph(tf.Graph): the graph. graph(tf.Graph): the graph.
......
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# File: prof.py
# Author: Yuxin Wu <ppwwyyxxc@gmail.com>
import os
import numpy as np
import multiprocessing as mp
import time
from six.moves import map
import tensorflow as tf
from tensorflow.python.client import timeline
from .base import Callback
from ..utils import logger
from ..utils.concurrency import ensure_proc_terminate, subproc_call
__all__ = ['GPUUtilizationTracker', 'GraphProfiler']
class GPUUtilizationTracker(Callback):
""" Summarize the average GPU utilization within an epoch"""
def __init__(self, devices=None):
"""
Args:
devices (list[int]): physical GPU ids. If None, will use CUDA_VISIBLE_DEVICES
"""
if devices is None:
env = os.environ.get('CUDA_VISIBLE_DEVICES')
assert env is not None, "[GPUUtilizationTracker] Both devices and CUDA_VISIBLE_DEVICES are None!"
self._devices = env.split(',')
else:
self._devices = list(map(str, devices))
assert len(self._devices), "[GPUUtilizationTracker] No GPU device given!"
self._command = "nvidia-smi --query-gpu=utilization.gpu --format=csv,noheader,nounits -i " + \
','.join(self._devices)
_, ret = subproc_call(self._command)
assert ret == 0, "Cannot fetch GPU utilization!"
def _before_train(self):
self._evt = mp.Event()
self._stop_evt = mp.Event()
self._queue = mp.Queue()
self._proc = mp.Process(target=self.worker, args=(
self._evt, self._queue, self._stop_evt))
ensure_proc_terminate(self._proc)
self._proc.start()
def _before_epoch(self):
self._evt.set()
def _after_epoch(self):
while self._evt.is_set(): # unlikely
pass
self._evt.set()
stats = self._queue.get()
for idx, dev in enumerate(self._devices):
self.trainer.monitors.put_scalar('GPU{}-Util'.format(dev), stats[idx])
def _after_train(self):
self._stop_evt.set()
self._evt.set()
self._proc.join()
def worker(self, evt, rst_queue, stop_evt):
while True:
evt.wait() # start epoch
evt.clear()
if stop_evt.is_set(): # or on exit
return
stats = np.zeros((len(self._devices),), dtype='f4')
cnt = 0
while True:
time.sleep(1)
output, retv = subproc_call(self._command)
assert retv == 0, "Cannot fetch GPU Utilization!"
data = list(map(float, output.strip().split(b'\n')))
stats += data
cnt += 1
if evt.is_set(): # stop epoch
if stop_evt.is_set(): # or on exit
return
evt.clear()
rst_queue.put(stats / cnt)
break
# TODO add more features from tfprof
# https://github.com/tensorflow/tensorflow/blob/master/tensorflow/tools/tfprof/g3doc/python_api.md
class GraphProfiler(Callback):
"""
Enable profiling by installing session hooks,
and write tracing files to ``logger.LOG_DIR``.
The tracing files can be loaded from ``chrome://tracing``.
Note that the profiling is enabled for every step.
You probably want to schedule it less frequently by
:class:`PeriodicRunHooks`.
"""
def __init__(self, show_memory=False):
"""
Args:
show_memory(bool): show tensor allocation in the tracing.
"""
self._dir = logger.LOG_DIR
self._show_memory = bool(show_memory)
assert os.path.isdir(self._dir)
def _before_run(self, _):
opt = tf.RunOptions()
opt.trace_level = tf.RunOptions.FULL_TRACE
return tf.train.SessionRunArgs(fetches=None, options=opt)
def _after_run(self, _, run_values):
meta = run_values.run_metadata
self._write_chrome_trace(meta)
def _write_chrome_trace(self, metadata):
tl = timeline.Timeline(step_stats=metadata.step_stats)
fname = os.path.join(
self._dir, 'chrome-trace-{}.json'.format(self.global_step))
with open(fname, 'w') as f:
f.write(tl.generate_chrome_trace_format(
show_dataflow=True, show_memory=self._show_memory))
...@@ -3,16 +3,11 @@ ...@@ -3,16 +3,11 @@
# Author: Yuxin Wu <ppwwyyxxc@gmail.com> # Author: Yuxin Wu <ppwwyyxxc@gmail.com>
import os import os
import numpy as np
import multiprocessing as mp
import time
from six.moves import map
from .base import Callback from .base import Callback
from ..utils import logger from ..utils import logger
from ..utils.concurrency import ensure_proc_terminate, subproc_call
__all__ = ['SendStat', 'GPUUtilizationTracker'] __all__ = ['SendStat']
class SendStat(Callback): class SendStat(Callback):
...@@ -30,74 +25,3 @@ class SendStat(Callback): ...@@ -30,74 +25,3 @@ class SendStat(Callback):
ret = os.system(cmd) ret = os.system(cmd)
if ret != 0: if ret != 0:
logger.error("Command {} failed with ret={}!".format(cmd, ret)) logger.error("Command {} failed with ret={}!".format(cmd, ret))
class GPUUtilizationTracker(Callback):
""" Summarize the average GPU utilization within an epoch"""
def __init__(self, devices=None):
"""
Args:
devices (list[int]): physical GPU ids. If None, will use CUDA_VISIBLE_DEVICES
"""
if devices is None:
env = os.environ.get('CUDA_VISIBLE_DEVICES')
assert env is not None, "[GPUUtilizationTracker] Both devices and CUDA_VISIBLE_DEVICES are None!"
self._devices = env.split(',')
else:
self._devices = list(map(str, devices))
assert len(self._devices), "[GPUUtilizationTracker] No GPU device given!"
self._command = "nvidia-smi --query-gpu=utilization.gpu --format=csv,noheader,nounits -i " + \
','.join(self._devices)
_, ret = subproc_call(self._command)
assert ret == 0, "Cannot fetch GPU utilization!"
def _before_train(self):
self._evt = mp.Event()
self._stop_evt = mp.Event()
self._queue = mp.Queue()
self._proc = mp.Process(target=self.worker, args=(
self._evt, self._queue, self._stop_evt))
ensure_proc_terminate(self._proc)
self._proc.start()
def _before_epoch(self):
self._evt.set()
def _after_epoch(self):
while self._evt.is_set(): # unlikely
pass
self._evt.set()
stats = self._queue.get()
for idx, dev in enumerate(self._devices):
self.trainer.monitors.put_scalar('GPU{}-Util'.format(dev), stats[idx])
def _after_train(self):
self._stop_evt.set()
self._evt.set()
self._proc.join()
def worker(self, evt, rst_queue, stop_evt):
while True:
evt.wait() # start epoch
evt.clear()
if stop_evt.is_set(): # or on exit
return
stats = np.zeros((len(self._devices),), dtype='f4')
cnt = 0
while True:
time.sleep(1)
output, retv = subproc_call(self._command)
assert retv == 0, "Cannot fetch GPU Utilization!"
data = list(map(float, output.strip().split(b'\n')))
stats += data
cnt += 1
if evt.is_set(): # stop epoch
if stop_evt.is_set(): # or on exit
return
evt.clear()
rst_queue.put(stats / cnt)
break
...@@ -5,18 +5,18 @@ ...@@ -5,18 +5,18 @@
from .base import ProxyCallback, Callback from .base import ProxyCallback, Callback
__all__ = ['PeriodicTrigger'] __all__ = ['PeriodicTrigger', 'PeriodicRunHooks']
class PeriodicTrigger(ProxyCallback): class PeriodicTrigger(ProxyCallback):
""" """
Schedule to trigger a callback every k steps or every k epochs by its ``trigger()`` method. Schedule to trigger a callback every k global steps or every k epochs by its ``trigger()`` method.
""" """
def __init__(self, triggerable, every_k_steps=None, every_k_epochs=None): def __init__(self, triggerable, every_k_steps=None, every_k_epochs=None):
""" """
Args: Args:
triggerable (Callback): a Callback instance with a _trigger method to be called. triggerable (Callback): a Callback instance with a _trigger method to be called.
every_k_steps (int): trigger when ``local_step % k == 0``. Set to every_k_steps (int): trigger when ``global_step % k == 0``. Set to
None to disable. None to disable.
every_k_epochs (int): trigger when ``epoch_num % k == 0``. Set to every_k_epochs (int): trigger when ``epoch_num % k == 0``. Set to
None to disable. None to disable.
...@@ -27,15 +27,13 @@ class PeriodicTrigger(ProxyCallback): ...@@ -27,15 +27,13 @@ class PeriodicTrigger(ProxyCallback):
super(PeriodicTrigger, self).__init__(triggerable) super(PeriodicTrigger, self).__init__(triggerable)
assert (every_k_epochs is not None) or (every_k_steps is not None), \ assert (every_k_epochs is not None) or (every_k_steps is not None), \
"every_k_steps and every_k_epochs cannot be both None!" "every_k_steps and every_k_epochs cannot be both None!"
self._step_k = every_k_steps self._step_k = int(every_k_steps)
self._epoch_k = every_k_epochs self._epoch_k = int(every_k_epochs)
def _trigger_step(self): def _trigger_step(self):
if self._step_k is None: if self._step_k is None:
return return
# trigger_step is triggered after run_step, so if self.global_step % self._step_k == 0:
# local_step + 1 is the number of step that have finished
if (self.local_step + 1) % self._step_k == 0:
self.cb.trigger() self.cb.trigger()
def _trigger_epoch(self): def _trigger_epoch(self):
...@@ -46,3 +44,26 @@ class PeriodicTrigger(ProxyCallback): ...@@ -46,3 +44,26 @@ class PeriodicTrigger(ProxyCallback):
def __str__(self): def __str__(self):
return "PeriodicTrigger-" + str(self.cb) return "PeriodicTrigger-" + str(self.cb)
class PeriodicRunHooks(ProxyCallback):
"""
Schedule the ``{before,after}_run`` methods of a callback every k global steps.
All other methods are untouched.
"""
def __init__(self, callback, every_k_steps):
"""
Args:
callback (Callback):
every_k_steps(int):
"""
self._every_k_steps = int(every_k_steps)
super(PeriodicRunHooks, self).__init__(callback)
def _before_run(self, ctx):
if self.global_step % self._every_k_steps == 0:
return self.cb._before_run(ctx)
def _after_run(self, ctx, rv):
if self.global_step % self._every_k_steps == 0:
self.cb._after_run(ctx, rv)
...@@ -43,7 +43,7 @@ class Trainer(object): ...@@ -43,7 +43,7 @@ class Trainer(object):
epoch_num (int): the number of epochs that have finished. epoch_num (int): the number of epochs that have finished.
local_step (int): the number of steps that have finished in the current epoch. local_step (int): the number of steps that have finished in the current epoch.
global_step (int): the number of steps that have finished. global_step (int): the number of steps that have finished or is currently running.
""" """
# step attr only available after before_train? # step attr only available after before_train?
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment