Commit 760b924a authored by Yuxin Wu's avatar Yuxin Wu

Use DEALER-REP in MultiProcessMapData (fix #673)

parent bc9d2e1a
...@@ -2,7 +2,6 @@ ...@@ -2,7 +2,6 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# File: parallel_map.py # File: parallel_map.py
import numpy as np import numpy as np
import time
import ctypes import ctypes
import copy import copy
import threading import threading
...@@ -227,7 +226,7 @@ class MultiProcessMapDataZMQ(_ParallelMapData, _MultiProcessZMQDataFlow): ...@@ -227,7 +226,7 @@ class MultiProcessMapDataZMQ(_ParallelMapData, _MultiProcessZMQDataFlow):
def run(self): def run(self):
ctx = zmq.Context() ctx = zmq.Context()
socket = ctx.socket(zmq.DEALER) socket = ctx.socket(zmq.REP)
socket.setsockopt(zmq.IDENTITY, self.identity) socket.setsockopt(zmq.IDENTITY, self.identity)
socket.set_hwm(self.hwm) socket.set_hwm(self.hwm)
socket.connect(self.pipename) socket.connect(self.pipename)
...@@ -256,7 +255,7 @@ class MultiProcessMapDataZMQ(_ParallelMapData, _MultiProcessZMQDataFlow): ...@@ -256,7 +255,7 @@ class MultiProcessMapDataZMQ(_ParallelMapData, _MultiProcessZMQDataFlow):
def _reset_once(self): def _reset_once(self):
self.context = zmq.Context() self.context = zmq.Context()
self.socket = self.context.socket(zmq.ROUTER) self.socket = self.context.socket(zmq.DEALER)
self.socket.set_hwm(self._buffer_size * 2) self.socket.set_hwm(self._buffer_size * 2)
pipename = _get_pipe_name('dataflow-map') pipename = _get_pipe_name('dataflow-map')
_bind_guard(self.socket, pipename) _bind_guard(self.socket, pipename)
...@@ -272,16 +271,13 @@ class MultiProcessMapDataZMQ(_ParallelMapData, _MultiProcessZMQDataFlow): ...@@ -272,16 +271,13 @@ class MultiProcessMapDataZMQ(_ParallelMapData, _MultiProcessZMQDataFlow):
self._iter_worker = _repeat_iter(lambda: iter(self._proc_ids)) self._iter_worker = _repeat_iter(lambda: iter(self._proc_ids))
self._start_processes() self._start_processes()
time.sleep(5) # TODO temporarily work around #673
self._fill_buffer() # pre-fill the bufer self._fill_buffer() # pre-fill the bufer
def reset_state(self): def reset_state(self):
_MultiProcessZMQDataFlow.reset_state(self) _MultiProcessZMQDataFlow.reset_state(self)
def _send(self, dp): def _send(self, dp):
# round-robin assignment msg = [b"", dumps(dp)]
worker = next(self._iter_worker)
msg = [worker, dumps(dp)]
self.socket.send_multipart(msg, copy=False) self.socket.send_multipart(msg, copy=False)
def _recv(self): def _recv(self):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment