Commit b2d106a3 authored by Yuxin Wu's avatar Yuxin Wu

reduce memory of MapAndBatch

parent c2d99a44
...@@ -411,13 +411,13 @@ class MultiProcessMapAndBatchDataZMQ(_MultiProcessZMQDataFlow): ...@@ -411,13 +411,13 @@ class MultiProcessMapAndBatchDataZMQ(_MultiProcessZMQDataFlow):
self.context = zmq.Context() self.context = zmq.Context()
self.socket = self.context.socket(zmq.PULL) self.socket = self.context.socket(zmq.PULL)
self.socket.set_hwm(self.buffer_size * 2 // self.batch_size) self.socket.set_hwm(max(5, self.buffer_size * 2 // self.batch_size))
_bind_guard(self.socket, result_pipe) _bind_guard(self.socket, result_pipe)
dispatcher = MultiProcessMapAndBatchDataZMQ._Dispatcher(self.ds, job_pipe, self.buffer_size) dispatcher = MultiProcessMapAndBatchDataZMQ._Dispatcher(self.ds, job_pipe, self.buffer_size)
self._proc_ids = [u'{}'.format(k).encode('utf-8') for k in range(self.num_proc)] self._proc_ids = [u'{}'.format(k).encode('utf-8') for k in range(self.num_proc)]
worker_hwm = int(self.buffer_size * 2 // self.num_proc) worker_hwm = max(3, self.buffer_size * 2 // self.num_proc // self.batch_size)
self._procs = [MultiProcessMapAndBatchDataZMQ._Worker( self._procs = [MultiProcessMapAndBatchDataZMQ._Worker(
self._proc_ids[k], self.map_func, job_pipe, result_pipe, worker_hwm, self.batch_size) self._proc_ids[k], self.map_func, job_pipe, result_pipe, worker_hwm, self.batch_size)
for k in range(self.num_proc)] for k in range(self.num_proc)]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment