Commit fed2d8e4 authored by Yuxin Wu's avatar Yuxin Wu

move ZMQ processes to `reset_state()`

parent 21c7f94a
...@@ -210,10 +210,6 @@ class ILSVRC12(RNGDataFlow): ...@@ -210,10 +210,6 @@ class ILSVRC12(RNGDataFlow):
box = root.find('object').find('bndbox').getchildren() box = root.find('object').find('bndbox').getchildren()
box = map(lambda x: float(x.text), box) box = map(lambda x: float(x.text), box)
# box[0] /= size[0]
# box[1] /= size[1]
# box[2] /= size[0]
# box[3] /= size[1]
return np.asarray(box, dtype='float32') return np.asarray(box, dtype='float32')
with timed_operation('Loading Bounding Boxes ...'): with timed_operation('Loading Bounding Boxes ...'):
......
...@@ -127,25 +127,8 @@ class PrefetchDataZMQ(ProxyDataFlow): ...@@ -127,25 +127,8 @@ class PrefetchDataZMQ(ProxyDataFlow):
except NotImplementedError: except NotImplementedError:
self._size = -1 self._size = -1
self.nr_proc = nr_proc self.nr_proc = nr_proc
self._hwm = hwm
self.context = zmq.Context() self._finish_setup = False
self.socket = self.context.socket(zmq.PULL)
pipedir = os.environ.get('TENSORPACK_PIPEDIR', '.')
assert os.path.isdir(pipedir), pipedir
self.pipename = "ipc://{}/dataflow-pipe-".format(pipedir.rstrip('/')) + str(uuid.uuid1())[:6]
self.socket.set_hwm(hwm)
self.socket.bind(self.pipename)
self.procs = [PrefetchProcessZMQ(self.ds, self.pipename, hwm)
for _ in range(self.nr_proc)]
self.start_processes()
# __del__ not guranteed to get called at exit
import atexit
atexit.register(lambda x: x.__del__(), self)
def start_processes(self):
start_proc_mask_signal(self.procs)
def get_data(self): def get_data(self):
try: try:
...@@ -167,8 +150,32 @@ class PrefetchDataZMQ(ProxyDataFlow): ...@@ -167,8 +150,32 @@ class PrefetchDataZMQ(ProxyDataFlow):
raise raise
def reset_state(self): def reset_state(self):
# do nothing. all ds are reset once and only once in spawned processes """
pass All forked dataflows are reset **once and only once** in spawned processes.
Nothing more can be done when calling this method.
"""
if self._finish_setup:
return
self._finish_setup = True
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PULL)
pipedir = os.environ.get('TENSORPACK_PIPEDIR', '.')
assert os.path.isdir(pipedir), pipedir
self.pipename = "ipc://{}/dataflow-pipe-".format(pipedir.rstrip('/')) + str(uuid.uuid1())[:6]
self.socket.set_hwm(self._hwm)
self.socket.bind(self.pipename)
self.procs = [PrefetchProcessZMQ(self.ds, self.pipename, self._hwm)
for _ in range(self.nr_proc)]
self.start_processes()
# __del__ not guranteed to get called at exit
import atexit
atexit.register(lambda x: x.__del__(), self)
def start_processes(self):
start_proc_mask_signal(self.procs)
def __del__(self): def __del__(self):
# on exit, logger may not be functional anymore # on exit, logger may not be functional anymore
...@@ -185,21 +192,19 @@ class PrefetchDataZMQ(ProxyDataFlow): ...@@ -185,21 +192,19 @@ class PrefetchDataZMQ(ProxyDataFlow):
class PrefetchOnGPUs(PrefetchDataZMQ): class PrefetchOnGPUs(PrefetchDataZMQ):
""" """
Prefetch with each process having its own ``CUDA_VISIBLE_DEVICES`` variable Similar to :class:`PrefetchDataZMQ`,
but prefetch with each process having its own ``CUDA_VISIBLE_DEVICES`` variable
mapped to one GPU. mapped to one GPU.
""" """
def __init__(self, ds, gpus, pipedir=None): def __init__(self, ds, gpus):
""" """
Args: Args:
ds (DataFlow): input DataFlow. ds (DataFlow): input DataFlow.
gpus (list[int]): list of GPUs to use. Will also start this many gpus (list[int]): list of GPUs to use. Will also start this number of processes.
of processes.
pipedir (str): a local directory where the pipes should be put.
Useful if you're running on non-local FS such as NFS or GlusterFS.
""" """
self.gpus = gpus self.gpus = gpus
super(PrefetchOnGPUs, self).__init__(ds, len(gpus), pipedir) super(PrefetchOnGPUs, self).__init__(ds, len(gpus))
def start_processes(self): def start_processes(self):
with mask_sigint(): with mask_sigint():
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment