Commit 03c16776 authored by Yuxin Wu's avatar Yuxin Wu

refine the dataflow/input doc

parent 42322257
......@@ -9,7 +9,7 @@ A DataFlow has a `get_data()` generator method,
which yields `datapoints`.
A datapoint is a **list** of Python objects which is called the `components` of a datapoint.
For example, to train on MNIST dataset, you can build a DataFlow with a `get_data()` method
For example, to train on MNIST dataset, you can write a DataFlow with a `get_data()` method
that yields datapoints (lists) of two components:
a numpy array of shape (64, 28, 28), and an array of shape (64,).
......@@ -28,7 +28,7 @@ df = MyDataFlow(dir='/my/data', shuffle=True)
df = AugmentImageComponent(df, [imgaug.Resize((225, 225))])
# group data into batches of size 128
df = BatchData(df, 128)
# start 3 processes to run the dataflow in parallel, and communicate with ZeroMQ
# start 3 processes to run the dataflow in parallel
df = PrefetchDataZMQ(df, 3)
````
You can find more complicated DataFlow in the [ResNet training script](../examples/ResNet/imagenet-resnet.py)
......
......@@ -7,7 +7,7 @@ a __Python generator__ which yields preprocessed ImageNet images and labels as f
Since it is simply a generator interface, you can use the DataFlow in other Python-based frameworks (e.g. Keras)
or your own code as well.
We use ILSVRC12 training set, which contains 1.28 million images.
**What we are going to do**: We'll use ILSVRC12 training set, which contains 1.28 million images.
The original images (JPEG compressed) are 140G in total.
The average resolution is about 400x350 <sup>[[1]]</sup>.
Following the [ResNet example](../examples/ResNet), we need images in their original resolution,
......@@ -15,19 +15,27 @@ so we will read the original dataset (instead of a down-sampled version), and
then apply complicated preprocessing to it.
We will need to reach a speed of, roughly 1k ~ 2k images per second, to keep GPUs busy.
Note that the actual performance would depend on not only the disk, but also
memory (for caching) and CPU (for data processing).
You may need to tune the parameters (#processes, #threads, size of buffer, etc.)
or change the pipeline for new tasks and new machines to achieve the best performance.
This tutorial is quite complicated because you do need this knowledge of hardware & system to run fast on ImageNet-sized dataset.
However, for __smaller datasets__ (e.g. several GBs of space, or lightweight preprocessing), a simple reader plus some prefetch should work well enough.
Some things to know before reading:
1. Having a fast Python generator **alone** may or may not help with your overall training speed.
You need mechanisms to hide the latency of all preprocessing stages, as mentioned in the
[previous tutorial](http://localhost:8000/tutorial/input-source.html).
2. Requirements on reading training set and validation set are different.
In training it's OK to reorder, regroup, or even duplicate some datapoints, as long as the
distribution roughly stays the same.
But in validation we often need the exact set of data, to be able to compute the correct error.
This will affect how we build the DataFlow.
3. The actual performance would depend on not only the disk, but also memory (for caching) and CPU (for data processing).
You may need to tune the parameters (#processes, #threads, size of buffer, etc.)
or change the pipeline for new tasks and new machines to achieve the best performance.
4. This tutorial could be too complicated for people new to system architectures, but you do need these to be able to run fast enough on ImageNet-sized dataset.
However, for smaller datasets (e.g. several GBs of images with lightweight preprocessing), a simple reader plus some prefetch should work well enough.
Figure out the bottleneck first, before trying to optimize any piece in the whole system.
## Random Read
We start from a simple DataFlow:
```python
from tensorpack import *
from tensorpack.dataflow import *
ds0 = dataset.ILSVRC12('/path/to/ILSVRC12', 'train', shuffle=True)
ds1 = BatchData(ds0, 256, use_list=True)
TestDataSpeed(ds1).start()
......@@ -44,8 +52,9 @@ By default, `BatchData`
will stack the datapoints into an `numpy.ndarray`, but since original images are of different shapes, we use
`use_list=True` so that it just produces lists.
On an SSD you probably can already observe good speed here (e.g. 5 it/s, that is 1280 images/s), but on HDD the speed may be just 1 it/s,
On a good filesystem you probably can already observe good speed here (e.g. 5 it/s, that is 1280 images/s), but on HDD the speed may be just 1 it/s,
because we are doing heavy random read on the filesystem (regardless of whether `shuffle` is True).
Image decoding in `cv2.imread` could also be a bottleneck at this early stage.
We will now add the cheapest pre-processing now to get an ndarray in the end instead of a list
(because training will need ndarray eventually):
......@@ -68,13 +77,13 @@ Now it's time to add threads or processes:
ds = PrefetchDataZMQ(ds1, nr_proc=25)
ds = BatchData(ds, 256)
```
Here we start 25 processes to run `ds1`, and collect their output through ZMQ IPC protocol.
Using ZMQ to transfer data is faster than `multiprocessing.Queue`, but data copy (even
within one process) can still be quite expensive when you're dealing with large data.
For example, to reduce copy overhead, the ResNet example deliberately moves certain pre-processing (the mean/std normalization) from DataFlow to the graph.
This way the DataFlow only transfers uint8 images as opposed float32 which takes 4x more memory.
Here we start 25 processes to run `ds1`, and collect their output through ZMQ IPC protocol,
which is faster than `multiprocessing.Queue`. You can also apply prefetch after batch, of course.
The above DataFlow might be fast, but since it forks the ImageNet reader (`ds0`),
it's **not a good idea to use it for validation** (for reasons mentioned at top).
Alternatively, you can use multi-threaded preprocessing like this:
Alternatively, you can use multi-threading like this:
```eval_rst
.. code-block:: python
:emphasize-lines: 3-6
......@@ -83,56 +92,69 @@ Alternatively, you can use multi-threading like this:
augmentor = AugmentorList(lots_of_augmentors)
ds1 = ThreadedMapData(
ds0, nr_thread=25,
map_func=lambda x: augmentor.augment(x), buffer_size=1000)
map_func=lambda dp: [augmentor.augment(dp[0]), dp[1]], buffer_size=1000)
# ds1 = PrefetchDataZMQ(ds1, nr_proc=1)
ds = BatchData(ds1, 256)
```
Since no `fork()` is happening here, there'll be only one instance of `ds0`.
25 threads will all fetch data from the same `ds0` instance, run the augmentor function and
put results into a buffer of size 1000.
To reduce the effect of GIL, you want to uncomment the line so that everything above it (including all the
`ThreadedMapData` launches a thread pool to fetch data and apply the mapping function on **a single
instance of** `ds0`. This is done by an intermediate buffer of size 1000 to hide the mapping latency.
To reduce the effect of GIL to your main training thread, you want to uncomment the line so that everything above it (including all the
threads) happen in an independent process.
There is no answer whether it is faster to use threads or processes.
Processes avoid the cost of GIL but bring the cost of communication.
You can also try a combination of both (several processes each with several threads).
You can also try a combination of both (several processes each with several threads),
but be careful of how forks affect your data distribution.
The above DataFlow still has a potential performance problem: only one thread is doing `cv2.imread`.
If you identify this as a bottleneck, you can also use:
```eval_rst
.. code-block:: python
:emphasize-lines: 5-6
ds0 = dataset.ILSVRC12Files('/path/to/ILSVRC12', 'train', shuffle=True)
augmentor = AugmentorList(lots_of_augmentors)
ds1 = ThreadedMapData(
ds0, nr_thread=25,
map_func=lambda dp:
[augmentor.augment(cv2.imread(dp[0], cv2.IMREAD_COLOR)), dp[1]],
buffer_size=1000)
ds1 = PrefetchDataZMQ(ds1, nr_proc=1)
ds = BatchData(ds1, 256)
```
Let's summarize what the above dataflow does:
1. One thread iterates over a shuffled list of (filename, label) pairs, and put them into a queue of size 1000.
2. 25 worker threads takes pairs and make them into (preprocessed image, label) pairs.
3. Both 1 and 2 happen in one separate process, and the results are sent back to main process through ZeroMQ.
4. Main process makes batches, and other tensorpack modules will then take care of how they should go into the graph.
## Sequential Read
Random read is usually not a good idea, especially if the data is not on a SSD.
Random read may not be a good idea when the data is not on an SSD.
We can also dump the dataset into one single LMDB file and read it sequentially.
```python
from tensorpack import *
class RawILSVRC12(DataFlow):
def __init__(self):
meta = dataset.ILSVRCMeta()
self.imglist = meta.get_image_list('train')
# we apply a global shuffling here because later we'll only use local shuffling
np.random.shuffle(self.imglist)
self.dir = os.path.join('/path/to/ILSVRC', 'train')
from tensorpack.dataflow import *
class BinaryILSVRC12(ILSVRCFiles):
def get_data(self):
for fname, label in self.imglist:
fname = os.path.join(self.dir, fname)
for fname, label in super(BinaryILSVRC12, self).get_data():
with open(fname, 'rb') as f:
jpeg = f.read()
jpeg = np.asarray(bytearray(jpeg), dtype='uint8')
yield [jpeg, label]
def size(self):
return len(self.imglist)
ds0 = RawILSVRC12()
ds0 = BinaryILSVRC12()
ds1 = PrefetchDataZMQ(ds0, nr_proc=1)
dftools.dump_dataflow_to_lmdb(ds1, '/path/to/ILSVRC-train.lmdb')
```
The above script builds a DataFlow which produces jpeg-encoded ImageNet data.
We store the jpeg string as a numpy array because the function `cv2.imdecode` expect this format.
We use 1 prefetch process to speed up. If `nr_proc>1`, `ds1` will take data
from several forks of `ds0` and will therefore be not identical to `ds0` any more.
We store the jpeg string as a numpy array because the function `cv2.imdecode` later expect this format.
Please note we can only use 1 prefetch process to speed up. If `nr_proc>1`, `ds1` will take data
from several forks of `ds0`, then neither the content nor the order of `ds1` will be the same as `ds0`.
It will generate a database file of 140G. We build a DataFlow to read the LMDB file sequentially:
It will generate a database file of 140G. We build a DataFlow to read this LMDB file sequentially:
```
from tensorpack import *
ds = LMDBData('/path/to/ILSVRC-train.lmdb', shuffle=False)
ds = BatchData(ds, 256, use_list=True)
TestDataSpeed(ds).start()
......@@ -187,19 +209,19 @@ Both imdecode and the augmentors can be quite slow. We can parallelize them like
ds = BatchData(ds, 256)
```
Since we are reading the database sequentially, having multiple identical instances of the
Since we are reading the database sequentially, having multiple forked instances of the
base LMDB reader will result in biased data distribution. Therefore we use `PrefetchData` to
launch the base DataFlow in one independent process, and only parallelize the transformations.
(`PrefetchDataZMQ` will however create 25 readers instead, so the first prefetch has to be `PrefetchData`.
These differences are explained in the documentation in more details.)
launch the base DataFlow in only **one process**, and only parallelize the transformations
with another `PrefetchDataZMQ`.
(Nesting two `PrefetchDataZMQ`, however, will result in a different behavior.
These differences are explained in the API documentation in more details.)
Let me summarize what the above DataFlow does:
1. One process reads LMDB file, shuffle them in a buffer and put them into a `multiprocessing.Queue` (used by `PrefetchData`).
2. 25 processes take items from the queue, decode and process them into [image, label] pairs, and
send them through ZMQ IPC pipe.
3. The main process takes data from the pipe, makes batches and feeds them into the graph,
according to what [InputSource](http://tensorpack.readthedocs.io/en/latest/tutorial/input-source.html) is used.
3. The main process takes data from the pipe, makes batches.
The above DataFlow can run at a speed of 1k ~ 2k images per second if you have good CPUs, RAM, disks and augmentors.
As a reference, tensorpack can train ResNet-18 (a shallow ResNet) at 4.5 batches (1.2k images) per second on 4 old TitanX.
......@@ -209,7 +231,12 @@ So DataFlow will not be a serious bottleneck if configured properly.
## More Efficient DataFlow
To work with larger datasets (or smaller networks, or more/better GPUs) you could be severely bounded by CPU or disk speed of a single machine.
One way is to optimize the preprocessing routine (e.g. write something in C++ or use TF reading operators).
One way is to optimize the preprocessing routine, for example:
1. Write some preprocessing steps in C++ or use better libraries
2. Move certain preprocessing steps (e.g. mean/std normalization) to TF operators which may be faster
3. Transfer less data, e.g. use uint8 images rather than float32.
Another way to scale is to run DataFlow in a distributed fashion and collect them on the
training machine. E.g.:
```python
......@@ -233,7 +260,6 @@ df = RemoteDataZMQ('ipc:///tmp/ipc-socket', 'tcp://0.0.0.0:8877')
TestDataSpeed(df).start()
```
[1]: #ref
<div id=ref> </div>
......
# Input Pipeline
This tutorial covers some general basics of the possible methods to send data from external sources to a TensorFlow graph,
This tutorial contains some general discussions on the topic of
"how to read data efficiently to work with TensorFlow",
and how tensorpack support these methods.
You don't have to read it because these are details under the tensorpack interface,
but knowing it could help understand the efficiency and choose the best input pipeline for your task.
......@@ -11,13 +12,15 @@ but knowing it could help understand the efficiency and choose the best input pi
![prefetch](https://cloud.githubusercontent.com/assets/1381301/26525192/36e5de48-4304-11e7-88ab-3b790bd0e028.png)
A common sense no matter what framework you use:
Start to prepare the next (batch of) data while you're training!
<center>
Prepare data in parallel with the training!
</center>
The reasons are:
1. Data preparation often consumes non-trivial time (depend on the actual problem).
2. Data preparation often uses completely different resources from training (see figure above) --
doing them together doesn't slow you down. In fact you can further parallelize different stages in
the preparation, because they also use different resources.
the preparation since they also use different resources.
3. Data preparation often doesn't depend on the result of the previous training step.
Let's do some simple math: according to [tensorflow/benchmarks](https://www.tensorflow.org/performance/benchmarks),
......@@ -27,24 +30,27 @@ down your training by 10%. Think about how many more copies are made during your
Failure to hide the data preparation latency is the major reason why people
cannot see good GPU utilization. __Always choose a framework that allows latency hiding.__
However most other TensorFlow wrappers are designed to be `feed_dict` based -- no latency hiding at all.
However most other TensorFlow wrappers are designed to be `feed_dict` based.
This is the major reason why tensorpack is [faster](https://gist.github.com/ppwwyyxx/8d95da79f8d97036a7d67c2416c851b6).
## Python or C++ ?
## Python Reader or TF Reader ?
The above discussion is valid regardless of what you use to load/preprocess, Python code or TensorFlow operators (written in C++).
The above discussion is valid regardless of what you use to load/preprocess data,
either Python code or TensorFlow operators (written in C++).
The benefits of using TensorFlow ops are:
* Faster preprocessing.
* Faster read/preprocessing.
* Potentially true, but not necessarily. With Python code you can call a variety of other fast libraries (e.g. lmdb), which
you have no access to in TF ops.
you have no access to in TF ops. For example, LMDB could be faster than TFRecords.
* Python may be just fast enough.
As long as data preparation runs faster than training, it makes no difference at all.
And for most types of problems, up to the scale of multi-GPU ImageNet training,
As long as data preparation runs faster than training, and the latency of all four blocks in the
above figure is hidden, it makes no difference at all.
For most types of problems, up to the scale of multi-GPU ImageNet training,
Python can offer enough speed if you use a fast library (e.g. `tensorpack.dataflow`).
See the [Efficient DataFlow](http://tensorpack.readthedocs.io/en/latest/tutorial/efficient-dataflow.html) tutorial.
See the [Efficient DataFlow](http://tensorpack.readthedocs.io/en/latest/tutorial/efficient-dataflow.html) tutorial
on how to build a fast Python reader with DataFlow.
* No "Copy to TF" (i.e. `feed_dict`) stage.
......@@ -54,6 +60,10 @@ The benefits of using TensorFlow ops are:
and TF `StagingArea` can help hide the "Copy to GPU" latency.
They are used by most examples in tensorpack.
The benefits of using Python reader is obvious:
it's much much easier to write Python to read different data format,
handle corner cases in noisy data, preprocess, etc.
## InputSource
`InputSource` is an abstract interface in tensorpack, to describe where the input come from and how they enter the graph.
......@@ -67,7 +77,7 @@ For example,
When you set `TrainConfig(dataflow=)`, tensorpack trainers automatically adds proper prefetching for you.
You can also use `TrainConfig(data=)` option to use a customized `InputSource`.
In cases you want to use TF ops rather than a DataFlow, you can use `TensorInput` as the `InputSource`
In case you want to use TF ops rather than a DataFlow, you can use `TensorInput` as the `InputSource`
(See the [PTB example](https://github.com/ppwwyyxx/tensorpack/tree/master/examples/PennTreebank)).
## Figure out the Bottleneck
......
......@@ -246,10 +246,11 @@ class ThreadedMapData(ProxyDataFlow):
Note:
1. There is tiny communication overhead with threads, but you
should avoid starting many threads in your main process to avoid GIL.
should avoid starting many threads in your main process to reduce GIL contention.
The threads will only start in the process which calls :meth:`reset_state()`.
Therefore you can use ``PrefetchDataZMQ(ThreadedMapData(...), 1)`` to avoid GIL.
Therefore you can use ``PrefetchDataZMQ(ThreadedMapData(...), 1)``
to reduce GIL contention.
2. Threads run in parallel and can take different time to run the
mapping function. Therefore the order of datapoints won't be
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment