Commit d97c0081 authored by Yuxin Wu's avatar Yuxin Wu

docs about reading imagenet

parent feaae168
......@@ -355,5 +355,6 @@ def setup(app):
'auto_toc_tree_section': 'Contents',
'enable_math': True,
'enable_inline_math': True,
'enable_eval_rst': True
}, True)
app.add_transform(AutoStructify)
......@@ -61,18 +61,20 @@ A Dataflow has a `get_data()` method which yields a datapoint every time.
class MyDataFlow(DataFlow):
def get_data(self):
for k in range(100):
digit = np.random.rand(28, 28)
label = np.random.randint(10)
digit = np.random.rand(28, 28)
label = np.random.randint(10)
yield [digit, label]
```
Optionally, Dataflow can implement the following two methods:
+ `size()`. Return the number of elements the generator can produce. Certain modules might require this to be
implemented. For example, only Dataflows with the same number of elements can be joined together.
+ `size()`. Return the number of elements the generator can produce. Certain modules might require this.
For example, only Dataflows with the same number of elements can be joined together.
+ `reset_state()`. It's necessary if your Dataflow uses RNG. This
method should reset the internal state of this Dataflow (including RNG). It get called after a fork, so that different child
processes will have different random seed. You can also inherit `RNGDataFlow` which does this for `self.rng` already.
+ `reset_state()`. It's guranteed that the process which uses this DataFlow will invoke this method before using it.
So if this DataFlow needs to something after a `fork()`, you should put it here.
With this "low-level" Dataflow defined, you can then compose it with existing modules.
A typical situation is when your Dataflow uses random number generator (RNG). Then you'd need to reset the RNG here,
otherwise child processes will have the same random seed. The `RNGDataFlow` class does this already.
With a "low-level" Dataflow defined, you can then compose it with existing modules.
# Efficient DataFlow
This tutorial gives an overview of how to build an efficient DataFlow, using ImageNet
dataset as an example.
Our goal in the end is to have
a generator which yields ImageNet datapoints (after proper preprocessing) as fast as possible.
We use ILSVRC12 training set, which contains 1.28 million images.
Following the [ResNet example](../examples/ResNet), our pre-processing need images in their original resolution,
so we'll read the original dataset instead of a down-sampled version here.
The average resolution is about 400x350 <sup>[[1]]</sup>.
The original images (JPEG compressed) are 140G in total.
Note that the actual performance would depend on not only the disk, but also
memory (for caching) and CPU (for data processing).
You'll need to tune the parameters (#processes, #threads, size of buffer, etc.)
or change the pipeline for new tasks and new machines
to achieve better performance.
## Random Read
We start from a simple DataFlow:
```python
from tensorpack import *
ds0 = dataset.ILSVRC12('/path/to/ILSVRC12', 'train', shuffle=True)
ds1 = BatchData(ds0, 256, use_list=True)
TestDataSpeed(ds1).start_test()
```
Here `ds0` simply reads original images from filesystem, and `ds1` batch them, so
that we can measure the speed of this DataFlow in terms of "batch per second". By default `BatchData`
will concatenate the data into an `numpy.ndarray`, but since images are originally of different shapes, we use
`use_list=True` so that it just produces lists.
On an SSD you probably can already observe good speed here (e.g. 5 it/s), but on HDD the speed may be just 1 it/s,
because we're doing random read on the filesystem (regardless of whether `shuffle` is True).
We'll now add the cheapest pre-processing now to get an ndarray in the end instead of a list
(because TensorFlow will need ndarray eventually):
```eval_rst
.. code-block:: python
:emphasize-lines: 2,3
ds = dataset.ILSVRC12('/path/to/ILSVRC12', 'train', shuffle=True)
ds = AugmentImageComponent(ds, [imgaug.Resize(224)])
ds = BatchData(ds, 256)
```
You'll start to observe slow down after adding more pre-processing (such as those in the [ResNet example](../examples/ResNet/imagenet-resnet.py)).
Now it's time to add threads or processes:
```eval_rst
.. code-block:: python
:emphasize-lines: 3
ds0 = dataset.ILSVRC12('/path/to/ILSVRC12', 'train', shuffle=True)
ds1 = AugmentImageComponent(ds0, lots_of_augmentors)
ds = PrefetchDataZMQ(ds1, nr_proc=25)
ds = BatchData(ds, 256)
```
Here we started 25 processes to run `ds1`, and collect their output through ZMQ IPC protocol.
Using ZMQ to transfer data is faster than `multiprocessing.Queue`, but data copy (even
within one process) can still be quite expensive when you're dealing with large data.
For example, to reduce copy overhead, the ResNet example deliberately moves certain pre-processing (the mean/std normalization) from DataFlow to the graph.
This way the DataFlow only transfers uint8 images as opposed float32 which takes 4x more memory.
Alternatively, you can use multi-threading like this:
```python
ds0 = dataset.ILSVRC12('/path/to/ILSVRC12', 'train', shuffle=True)
augmentor = AugmentorList(lots_of_augmentors)
ds1 = ThreadedMapData(
ds0, nr_thread=25,
map_func=lambda x: augmentor.augment(x), buffer_size=1000)
# ds1 = PrefetchDataZMQ(ds1, nr_proc=1)
ds = BatchData(ds1, 256)
```
Since no `fork()` is happening here, there'll be only one instance of `ds0`.
25 threads will fetch data from `ds0`, run the augmentor function and
put results into a buffer of size 1000.
To reduce the effect of GIL, you can then uncomment the line so that everything above it (including all the
threads) happen in an independent process.
There is no answer whether it's faster to use threads or processes.
Processes avoid the cost of GIL but bring the cost of communication.
You can also try a combination of both (several processes each with several threads).
## Sequential Read
Random read is usually not a good idea, especially if you're not on SSD.
We can also dump the dataset into one single file and read it sequentially.
```python
from tensorpack import *
class RawILSVRC12(DataFlow):
def __init__(self):
meta = dataset.ILSVRCMeta()
self.imglist = meta.get_image_list('train')
# we apply a global shuffling here because later we'll only use local shuffling
np.random.shuffle(self.imglist)
self.dir = os.path.join('/path/to/ILSVRC', 'train')
def get_data(self):
for fname, label in self.imglist:
fname = os.path.join(self.dir, fname)
with open(fname, 'rb') as f:
jpeg = f.read()
jpeg = np.asarray(bytearray(jpeg), dtype='uint8')
yield [jpeg, label]
def size(self):
return len(self.imglist)
ds0 = RawILSVRC12()
ds1 = PrefetchDataZMQ(ds0, nr_proc=1)
dftools.dump_dataflow_to_lmdb(ds1, '/path/to/ILSVRC-train.lmdb')
```
The above script builds a DataFlow which produces jpeg-encoded ImageNet data.
We store the jpeg string as a numpy array because the function `cv2.imdecode` expect it later.
We use 1 prefetch process to speed up. If `nr_proc>1`, `ds1` will take data
from several forks of `ds0` and will not be identical to `ds0` any more.
It will generate a database file of 140G. We build a DataFlow to read the LMDB file sequentially:
```
from tensorpack import *
ds = LMDBData('/path/to/ILSVRC-train.lmdb', shuffle=False)
ds = BatchData(ds, 256, allow_list=True)
TestDataSpeed(ds).start_test()
```
Depending on whether the OS has cached the file for you (and how large the RAM is), the above script
can run at a speed of 10~130 it/s, roughly corresponding to 250MB~3.5GB/s bandwidth. You can test
your cached and uncached disk read bandwidth with `sudo hdparm -Tt /dev/sdX`.
As a reference, on Samsung SSD 850, the uncached speed is about 16it/s.
```eval_rst
.. code-block:: python
:emphasize-lines: 2
ds = LMDBData('/path/to/ILSVRC-train.lmdb', shuffle=False)
ds = LocallyShuffleData(ds, 50000)
ds = BatchData(ds, 256, allow_list=True)
```
Instead of shuffling all the training data in every epoch (which would require random read),
the added line above maintains a buffer of datapoints and shuffle them once a while.
It won't affect the model as long as the buffer is large enough,
but it can also consume a lot of memory if too large.
Then we add necessary transformations:
```eval_rst
.. code-block:: python
:emphasize-lines: 3-5
ds = LMDBData(db, shuffle=False)
ds = LocallyShuffleData(ds, 50000)
ds = LMDBDataPoint(ds)
ds = MapDataComponent(ds, lambda x: cv2.imdecode(x, cv2.IMREAD_COLOR), 0)
ds = AugmentImageComponent(ds, lots_of_augmentors)
ds = BatchData(ds, 256)
```
1. `LMDBData` deserialized the datapoints (from string to [jpeg_string, label])
2. Use opencv to decode the first component into ndarray
3. Apply augmentations to the ndarray
Both imdecode and the augmentors can be quite slow. We can parallelize them like this:
```eval_rst
.. code-block:: python
:emphasize-lines: 3,7
ds = LMDBData(db, shuffle=False)
ds = LocallyShuffleData(ds, 50000)
ds = PrefetchData(ds, 5000, 1)
ds = LMDBDataPoint(ds)
ds = MapDataComponent(ds, lambda x: cv2.imdecode(x, cv2.IMREAD_COLOR), 0)
ds = AugmentImageComponent(ds, lots_of_augmentors)
ds = PrefetchDataZMQ(ds, 25)
ds = BatchData(ds, 256)
```
Since we are reading the database sequentially, have multiple identical instances of the
underlying DataFlow will result in biased data distribution. Therefore we use `PrefetchData` to
launch the underlying DataFlow in one independent process, and only parallelize the transformations.
(`PrefetchDataZMQ` is faster but not fork-safe, so the first prefetch has to be `PrefetchData`. This is [issue#138])
Let me summarize what the above DataFlow does:
1. One process reads LMDB file, shuffle them in a buffer and put them into a `multiprocessing.Queue` (used by `PrefetchData`).
2. 25 processes take items from the queue, decode and process them into [image, label] pairs, and
send them through ZMQ IPC pipes.
3. The main process takes data from the pipe and feed it into the graph, according to
how the `Trainer` is implemented.
The above DataFlow can run at a speed of 5~10 batches per second, if you have good CPUs, RAM, disks and augmentors.
As a reference, tensorpack can train ResNet-18 (a shallow ResNet) at 5.5 batches per second on 4 TitanX Pascal.
So DataFlow won't be a serious bottleneck if configured properly.
## Larger Datasets?
For larger datasets (and smaller networks) you could be seriously bounded by CPU or disk speed of a single machine.
Then it's best to run DataFlow distributely and collect them on the
training machine. Currently there is only little support for this feature.
[1]: #ref
<div id=ref> </div>
[[1]]. [ImageNet: A Large-Scale Hierarchical Image Database](http://www.image-net.org/papers/imagenet_cvpr09.pdf), CVPR09
......@@ -9,7 +9,8 @@ Test.
glance
dataflow
efficient-data
tf-queue
efficient-dataflow
model
trainer
callback
# Efficient Data Loading
# How data goes into graph
This tutorial gives an overview of how to efficiently load data in tensorpack, using ImageNet
dataset as an example.
This tutorial covers how data goes from DataFlow to TensorFlow graph.
They are tensorpack internal details, but it's important to know
if you care about efficiency.
Note that the actual performance would depend on not only the disk, but also
memory (for caching) and CPU (for data processing), so the solution in this tutorial is
not necessarily the best for different scenarios.
### Use TensorFlow queues
## Use TensorFlow queues
In general, `feed_dict` is slow and should never appear in your critical loop.
i.e., you should avoid loops like this:
......@@ -33,7 +30,8 @@ while True:
This is now automatically handled by tensorpack trainers already (unless you used the demo ``SimpleTrainer``),
see [Trainer](trainer.md) for details.
TensorFlow is providing staging interface which may further improve the speed. This is
TensorFlow provides staging interface which will further improve the speed in the future. This is
[issue#140](https://github.com/ppwwyyxx/tensorpack/issues/140).
You can also avoid `feed_dict` by using TensorFlow native operators to read data, which is also
......@@ -41,7 +39,7 @@ supported here.
It probably allows you to reach the best performance, but at the cost of implementing the
reading / preprocessing ops in C++ if there isn't one for your task. We won't talk about it here.
### Figure out your bottleneck
## Figure out the bottleneck
For training we will only worry about the throughput but not the latency.
Thread 1 & 2 runs in parallel, and the faster one will block to wait for the slower one.
......@@ -56,32 +54,3 @@ there are ways to understand which one is the bottleneck:
2. Benchmark them separately. You can use `TestDataSpeed` to benchmark a DataFlow, and
use `FakeData` as a fast replacement in a dry run to benchmark the training
iterations.
### Load ImageNet efficiently
We take ImageNet dataset as an example of how to optimize a DataFlow.
We use ILSVRC12 training set, which contains 1.28 million images.
Following the [ResNet example](../examples/ResNet), our pre-processing need images in their original resolution, so we'll read the original
dataset instead of a down-sampled version here.
The average resolution is about 400x350 <sup>[[1]]</sup>.
The original images (JPEG compressed) are 140G in total.
We start from a simple DataFlow:
```python
from tensorpack import *
ds = dataset.ILSVRC12('/path/to/ILSVRC12', 'train', shuffle=True)
ds = BatchData(ds, 256, use_list=True)
TestDataSpeed(ds).start_test()
```
Here the first `ds` simply reads original images from filesystem, and second `ds` batch them, so
that we can test the speed of this DataFlow in the unit of batch per second. By default `BatchData`
will concatenate the data into an ndarray, but since images are originally of different shapes, we use
`use_list=True` so that it just produces lists.
[1]: #ref
<div id=ref> </div>
[[1]]. [ImageNet: A Large-Scale Hierarchical Image Database](http://www.image-net.org/papers/imagenet_cvpr09.pdf), CVPR09
......@@ -88,10 +88,9 @@ class MeanVarianceNormalize(ImageAugmentor):
"""
Linearly scales the image to have zero mean and unit norm.
``x = (x - mean) / adjusted_stddev``
where ``adjusted_stddev = max(stddev, 1.0/sqrt(num_pixels * channels))
where ``adjusted_stddev = max(stddev, 1.0/sqrt(num_pixels * channels))``
This augmentor always returns float32 images.
``
"""
def __init__(self, all_channel=True):
......
......@@ -20,7 +20,7 @@ from ..utils import logger
from ..utils.gpu import change_gpu
__all__ = ['PrefetchData', 'PrefetchDataZMQ', 'PrefetchOnGPUs',
'ThreadedMapData', 'StartNewProcess']
'ThreadedMapData']
class PrefetchProcess(mp.Process):
......@@ -265,21 +265,3 @@ class ThreadedMapData(ProxyDataFlow):
for _ in range(sz):
self._in_queue.put(next(self._itr))
yield self._out_queue.get()
def StartNewProcess(ds, queue_size):
"""
Run ds in a new process, and use multiprocessing.queue to send data back.
Args:
ds (DataFlow): a DataFlow.
queue_size (int): the size of queue.
Returns:
a fork-safe DataFlow, therefore is safe to use under another PrefetchData or
PrefetchDataZMQ.
Note:
There could be a zmq version of this in the future.
"""
return PrefetchData(ds, queue_size, 1)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment