Commit 6486fc5f authored by Yuxin Wu's avatar Yuxin Wu

counting statistics for RemoteDataZMQ

parent 6f086458
......@@ -52,7 +52,12 @@ def send_dataflow_zmq(df, addr, hwm=50, print_interval=100):
class RemoteDataZMQ(DataFlow):
""" Produce data from ZMQ PULL socket(s). """
"""
Produce data from ZMQ PULL socket(s).
Attributes:
cnt1, cnt2 (int): number of data points received from addr1 and addr2
"""
def __init__(self, addr1, addr2=None):
"""
Args:
......@@ -64,6 +69,10 @@ class RemoteDataZMQ(DataFlow):
self._addr1 = addr1
self._addr2 = addr2
def reset_state(self):
self.cnt1 = 0
self.cnt2 = 0
def get_data(self):
try:
ctx = zmq.Context()
......@@ -75,6 +84,7 @@ class RemoteDataZMQ(DataFlow):
while True:
dp = loads(socket.recv(copy=False).bytes)
yield dp
self.cnt1 += 1
else:
socket1 = ctx.socket(zmq.PULL)
socket1.set_hwm(50)
......@@ -93,6 +103,10 @@ class RemoteDataZMQ(DataFlow):
for sock, evt in evts:
dp = loads(sock.recv(copy=False).bytes)
yield dp
if sock == socket1:
self.cnt1 += 1
else:
self.cnt2 += 1
finally:
ctx.destroy(linger=0)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment