Commit 7a3e4c4d authored by Yuxin Wu's avatar Yuxin Wu

move send-queue to simulatormaster

parent c12cf88b
...@@ -59,7 +59,7 @@ class SimulatorProcess(multiprocessing.Process): ...@@ -59,7 +59,7 @@ class SimulatorProcess(multiprocessing.Process):
action = loads(data) action = loads(data)
reward, isOver = player.action(action) reward, isOver = player.action(action)
c2s_socket.send(dumps((reward, isOver)), copy=False) c2s_socket.send(dumps((reward, isOver)), copy=False)
noop = s2c_socket.recv(copy=False) ACK = s2c_socket.recv(copy=False)
#cnt += 1 #cnt += 1
#if cnt % 100 == 0: #if cnt % 100 == 0:
#print_total_timer() #print_total_timer()
...@@ -102,6 +102,13 @@ class SimulatorMaster(threading.Thread): ...@@ -102,6 +102,13 @@ class SimulatorMaster(threading.Thread):
self.socket_lock = threading.Lock() self.socket_lock = threading.Lock()
self.daemon = True self.daemon = True
# queueing messages to client
self.send_queue = queue.Queue(maxsize=50)
self.send_thread = LoopThread(lambda:
self.s2c_socket.send_multipart(self.send_queue.get()))
self.send_thread.start()
# make sure socket get closed at the end
def clean_context(soks, context): def clean_context(soks, context):
for s in soks: for s in soks:
s.close() s.close()
...@@ -113,7 +120,6 @@ class SimulatorMaster(threading.Thread): ...@@ -113,7 +120,6 @@ class SimulatorMaster(threading.Thread):
self.clients = defaultdict(SimulatorMaster.ClientState) self.clients = defaultdict(SimulatorMaster.ClientState)
while True: while True:
ident, msg = self.c2s_socket.recv_multipart() ident, msg = self.c2s_socket.recv_multipart()
#assert _ == ""
client = self.clients[ident] client = self.clients[ident]
client.protocol_state = 1 - client.protocol_state # first flip the state client.protocol_state = 1 - client.protocol_state # first flip the state
if not client.protocol_state == 0: # state-action if not client.protocol_state == 0: # state-action
...@@ -126,6 +132,7 @@ class SimulatorMaster(threading.Thread): ...@@ -126,6 +132,7 @@ class SimulatorMaster(threading.Thread):
self._on_episode_over(ident) self._on_episode_over(ident)
else: else:
self._on_datapoint(ident) self._on_datapoint(ident)
self.send_queue.put([ident, 'Thanks']) # just an ACK
@abstractmethod @abstractmethod
def _on_state(self, state, ident): def _on_state(self, state, ident):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment