Commit a682083f authored by Yuxin Wu's avatar Yuxin Wu

(fix #368)

parent 54f45384
......@@ -117,8 +117,7 @@ class PrefetchDataZMQ(ProxyDataFlow):
Note:
1. Once :meth:`reset_state` is called, this dataflow becomes not fork-safe.
2. When nesting like this: ``PrefetchDataZMQ(PrefetchDataZMQ(df, a), b)``.
A total of ``a * b`` instances of ``df`` worker processes will be created.
2. This dataflow is not fork-safe. You cannot nest it.
3. The underlying dataflow worker will be forked multiple times When ``nr_proc>1``.
As a result, unless the underlying dataflow is fully shuffled, the data distribution
produced by this dataflow will be wrong.
......@@ -139,7 +138,22 @@ class PrefetchDataZMQ(ProxyDataFlow):
self._size = -1
self.nr_proc = nr_proc
self._hwm = hwm
self._finish_setup = False
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PULL)
pipedir = os.environ.get('TENSORPACK_PIPEDIR', '.')
assert os.path.isdir(pipedir), pipedir
self.pipename = "ipc://{}/dataflow-pipe-".format(pipedir.rstrip('/')) + str(uuid.uuid1())[:6]
self.socket.set_hwm(self._hwm)
self.socket.bind(self.pipename)
self.procs = [PrefetchProcessZMQ(self.ds, self.pipename, self._hwm)
for _ in range(self.nr_proc)]
self.start_processes()
# __del__ not guranteed to get called at exit
import atexit
atexit.register(lambda x: x.__del__(), self)
def get_data(self):
try:
......@@ -165,25 +179,7 @@ class PrefetchDataZMQ(ProxyDataFlow):
All forked dataflows are reset **once and only once** in spawned processes.
Nothing more can be done when calling this method.
"""
if self._finish_setup:
return
self._finish_setup = True
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PULL)
pipedir = os.environ.get('TENSORPACK_PIPEDIR', '.')
assert os.path.isdir(pipedir), pipedir
self.pipename = "ipc://{}/dataflow-pipe-".format(pipedir.rstrip('/')) + str(uuid.uuid1())[:6]
self.socket.set_hwm(self._hwm)
self.socket.bind(self.pipename)
self.procs = [PrefetchProcessZMQ(self.ds, self.pipename, self._hwm)
for _ in range(self.nr_proc)]
self.start_processes()
# __del__ not guranteed to get called at exit
import atexit
atexit.register(lambda x: x.__del__(), self)
pass
def start_processes(self):
start_proc_mask_signal(self.procs)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment