Commit feaae168 authored by Yuxin Wu's avatar Yuxin Wu

logging for concurrency utilities

parent c68686e6
...@@ -66,6 +66,18 @@ dataset instead of a down-sampled version here. ...@@ -66,6 +66,18 @@ dataset instead of a down-sampled version here.
The average resolution is about 400x350 <sup>[[1]]</sup>. The average resolution is about 400x350 <sup>[[1]]</sup>.
The original images (JPEG compressed) are 140G in total. The original images (JPEG compressed) are 140G in total.
We start from a simple DataFlow:
```python
from tensorpack import *
ds = dataset.ILSVRC12('/path/to/ILSVRC12', 'train', shuffle=True)
ds = BatchData(ds, 256, use_list=True)
TestDataSpeed(ds).start_test()
```
Here the first `ds` simply reads original images from filesystem, and second `ds` batch them, so
that we can test the speed of this DataFlow in the unit of batch per second. By default `BatchData`
will concatenate the data into an ndarray, but since images are originally of different shapes, we use
`use_list=True` so that it just produces lists.
[1]: #ref [1]: #ref
......
...@@ -16,7 +16,7 @@ probably because of async issues. ...@@ -16,7 +16,7 @@ probably because of async issues.
The pre-trained models are all trained with 4 GPUs for about 2 days. The pre-trained models are all trained with 4 GPUs for about 2 days.
But note that multi-GPU doesn't give you obvious speedup here, But note that multi-GPU doesn't give you obvious speedup here,
because the bottleneck is not computation but data. On machines without huge memory, you may also need to because the bottleneck in this implementation is not computation but data. On machines without huge memory, you may also need to
enable tcmalloc to keep training throughput more stable. enable tcmalloc to keep training throughput more stable.
Occasionally, processes may not get terminated completely, therefore it is suggested to use `systemd-run` to run any Occasionally, processes may not get terminated completely, therefore it is suggested to use `systemd-run` to run any
......
...@@ -44,8 +44,12 @@ class StartProcOrThread(Callback): ...@@ -44,8 +44,12 @@ class StartProcOrThread(Callback):
if isinstance(k, mp.Process): if isinstance(k, mp.Process):
logger.info("Stopping {} ...".format(k.name)) logger.info("Stopping {} ...".format(k.name))
k.terminate() k.terminate()
k.join() k.join(5.0)
if k.is_alive():
logger.error("Cannot join process {}.".format(k.name))
elif isinstance(k, StoppableThread): elif isinstance(k, StoppableThread):
logger.info("Stopping {} ...".format(k.name)) logger.info("Stopping {} ...".format(k.name))
k.stop() k.stop()
k.join() k.join(5.0)
if k.is_alive():
logger.error("Cannot join thread {}.".format(k.name))
...@@ -56,7 +56,7 @@ class BatchData(ProxyDataFlow): ...@@ -56,7 +56,7 @@ class BatchData(ProxyDataFlow):
of the original datapoints. of the original datapoints.
""" """
def __init__(self, ds, batch_size, remainder=False, allow_list=False): def __init__(self, ds, batch_size, remainder=False, use_list=False):
""" """
Args: Args:
ds (DataFlow): Its components must be either scalars or :class:`np.ndarray`. ds (DataFlow): Its components must be either scalars or :class:`np.ndarray`.
...@@ -65,7 +65,7 @@ class BatchData(ProxyDataFlow): ...@@ -65,7 +65,7 @@ class BatchData(ProxyDataFlow):
remainder (bool): whether to return the remaining data smaller than a batch_size. remainder (bool): whether to return the remaining data smaller than a batch_size.
If set True, it will possibly generates a data point of a smaller batch size. If set True, it will possibly generates a data point of a smaller batch size.
Otherwise, all generated data are guranteed to have the same size. Otherwise, all generated data are guranteed to have the same size.
allow_list (bool): if True, it will run faster by producing a list use_list (bool): if True, it will run faster by producing a list
of datapoints instead of an ndarray of datapoints, avoiding an of datapoints instead of an ndarray of datapoints, avoiding an
extra copy. extra copy.
""" """
...@@ -77,7 +77,7 @@ class BatchData(ProxyDataFlow): ...@@ -77,7 +77,7 @@ class BatchData(ProxyDataFlow):
pass pass
self.batch_size = batch_size self.batch_size = batch_size
self.remainder = remainder self.remainder = remainder
self.allow_list = allow_list self.use_list = use_list
def size(self): def size(self):
ds_size = self.ds.size() ds_size = self.ds.size()
...@@ -96,17 +96,17 @@ class BatchData(ProxyDataFlow): ...@@ -96,17 +96,17 @@ class BatchData(ProxyDataFlow):
for data in self.ds.get_data(): for data in self.ds.get_data():
holder.append(data) holder.append(data)
if len(holder) == self.batch_size: if len(holder) == self.batch_size:
yield BatchData._aggregate_batch(holder, self.allow_list) yield BatchData._aggregate_batch(holder, self.use_list)
del holder[:] del holder[:]
if self.remainder and len(holder) > 0: if self.remainder and len(holder) > 0:
yield BatchData._aggregate_batch(holder, self.allow_list) yield BatchData._aggregate_batch(holder, self.use_list)
@staticmethod @staticmethod
def _aggregate_batch(data_holder, allow_list): def _aggregate_batch(data_holder, use_list):
size = len(data_holder[0]) size = len(data_holder[0])
result = [] result = []
for k in range(size): for k in range(size):
if allow_list: if use_list:
result.append( result.append(
[x[k] for x in data_holder]) [x[k] for x in data_holder])
else: else:
......
...@@ -98,7 +98,7 @@ class ILSVRCMeta(object): ...@@ -98,7 +98,7 @@ class ILSVRCMeta(object):
class ILSVRC12(RNGDataFlow): class ILSVRC12(RNGDataFlow):
""" """
Produces ILSVRC12 images of shape [h, w, 3(BGR)], and a label between [0, 999], Produces uint8 ILSVRC12 images of shape [h, w, 3(BGR)], and a label between [0, 999],
and optionally a bounding box of [xmin, ymin, xmax, ymax]. and optionally a bounding box of [xmin, ymin, xmax, ymax].
""" """
def __init__(self, dir, name, meta_dir=None, shuffle=None, def __init__(self, dir, name, meta_dir=None, shuffle=None,
......
...@@ -6,7 +6,9 @@ ...@@ -6,7 +6,9 @@
import multiprocessing import multiprocessing
import six import six
from six.moves import queue, range from six.moves import queue, range
import tensorflow as tf
from ..utils import logger
from ..utils.concurrency import DIE, StoppableThread from ..utils.concurrency import DIE, StoppableThread
from ..tfutils.modelutils import describe_model from ..tfutils.modelutils import describe_model
from .base import OfflinePredictor, AsyncPredictorBase from .base import OfflinePredictor, AsyncPredictorBase
...@@ -83,7 +85,13 @@ class PredictorWorkerThread(StoppableThread): ...@@ -83,7 +85,13 @@ class PredictorWorkerThread(StoppableThread):
def run(self): def run(self):
while not self.stopped(): while not self.stopped():
batched, futures = self.fetch_batch() batched, futures = self.fetch_batch()
try:
outputs = self.func(batched) outputs = self.func(batched)
except tf.errors.CancelledError:
for f in futures:
f.cancel()
logger.warn("PredictorWorkerThread id={}, call was cancelled.".format(self.id))
return
# print "Worker {} batched {} Queue {}".format( # print "Worker {} batched {} Queue {}".format(
# self.id, len(futures), self.queue.qsize()) # self.id, len(futures), self.queue.qsize())
# debug, for speed testing # debug, for speed testing
......
...@@ -178,6 +178,7 @@ class AsyncMultiGPUTrainer(MultiGPUTrainer, ...@@ -178,6 +178,7 @@ class AsyncMultiGPUTrainer(MultiGPUTrainer,
self.sess.run([op]) self.sess.run([op])
next(self.async_step_counter) next(self.async_step_counter)
th = LoopThread(f) th = LoopThread(f)
th.name = "AsyncLoopThread-{}".format(k)
th.pause() th.pause()
th.start() th.start()
self.training_threads.append(th) self.training_threads.append(th)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment