Commit d6f0c57a authored by Yuxin Wu's avatar Yuxin Wu

clarify the behavior of nested prefetch

parent fed2d8e4
...@@ -188,10 +188,10 @@ Both imdecode and the augmentors can be quite slow. We can parallelize them like ...@@ -188,10 +188,10 @@ Both imdecode and the augmentors can be quite slow. We can parallelize them like
``` ```
Since we are reading the database sequentially, having multiple identical instances of the Since we are reading the database sequentially, having multiple identical instances of the
underlying DataFlow will result in biased data distribution. Therefore we use `PrefetchData` to base LMDB reader will result in biased data distribution. Therefore we use `PrefetchData` to
launch the underlying DataFlow in one independent process, and only parallelize the transformations. launch the base DataFlow in one independent process, and only parallelize the transformations.
(`PrefetchDataZMQ` is faster but not fork-safe, so the first prefetch has to be `PrefetchData`. This (`PrefetchDataZMQ` will however create 25 readers instead, so the first prefetch has to be `PrefetchData`.
is supposed to get fixed in the future). These differences are explained in the documentation in more details.)
Let me summarize what the above DataFlow does: Let me summarize what the above DataFlow does:
......
...@@ -49,8 +49,9 @@ class PrefetchData(ProxyDataFlow): ...@@ -49,8 +49,9 @@ class PrefetchData(ProxyDataFlow):
Prefetch data from a DataFlow using Python multiprocessing utilities. Prefetch data from a DataFlow using Python multiprocessing utilities.
Note: Note:
This is significantly slower than :class:`PrefetchDataZMQ` when data 1. This is significantly slower than :class:`PrefetchDataZMQ` when data is large.
is large. 2. When nesting like this: ``PrefetchDataZMQ(PrefetchData(df, a), b)``.
A total of ``a`` instances of ``df`` worker processes will be created.
""" """
def __init__(self, ds, nr_prefetch, nr_proc=1): def __init__(self, ds, nr_prefetch, nr_proc=1):
""" """
...@@ -110,8 +111,10 @@ class PrefetchDataZMQ(ProxyDataFlow): ...@@ -110,8 +111,10 @@ class PrefetchDataZMQ(ProxyDataFlow):
A local directory is needed to put the ZMQ pipes. A local directory is needed to put the ZMQ pipes.
You can set this with env var ``$TENSORPACK_PIPEDIR`` if you're running on non-local FS such as NFS or GlusterFS. You can set this with env var ``$TENSORPACK_PIPEDIR`` if you're running on non-local FS such as NFS or GlusterFS.
Note that this dataflow is not fork-safe. You cannot nest this dataflow Note:
into another PrefetchDataZMQ or PrefetchData. 1. Once :meth:`reset_state` is called, this dataflow becomes not fork-safe.
2. When nesting like this: ``PrefetchDataZMQ(PrefetchDataZMQ(df, a), b)``.
A total of ``a * b`` instances of ``df`` worker processes will be created.
""" """
def __init__(self, ds, nr_proc=1, hwm=50): def __init__(self, ds, nr_proc=1, hwm=50):
""" """
...@@ -222,7 +225,8 @@ class ThreadedMapData(ProxyDataFlow): ...@@ -222,7 +225,8 @@ class ThreadedMapData(ProxyDataFlow):
With threads, there are tiny communication overhead, but due to GIL, you With threads, there are tiny communication overhead, but due to GIL, you
should avoid starting the threads in your main process. should avoid starting the threads in your main process.
Note that the threads will only start in the process which calls Note that the threads will only start in the process which calls
`reset_state()`. :meth:`reset_state()`.
So you can use ``PrefetchDataZMQ(ThreadedMapData(...), 1)`` to avoid GIL.
""" """
class _WorkerThread(StoppableThread): class _WorkerThread(StoppableThread):
def __init__(self, inq, outq, map_func): def __init__(self, inq, outq, map_func):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment