Commit ff9e5555 authored by Yuxin Wu's avatar Yuxin Wu

add speedometer and logging to send_dataflow_zmq

parent 675436ed
......@@ -3,7 +3,11 @@
# File: remote.py
# Author: Yuxin Wu <ppwwyyxxc@gmail.com>
from ..utils import logger
import time
from collections import deque
from .base import DataFlow
from ..utils import logger, get_tqdm
from ..utils.serialize import dumps, loads
try:
import zmq
except ImportError:
......@@ -12,12 +16,8 @@ except ImportError:
else:
__all__ = ['send_dataflow_zmq', 'RemoteDataZMQ']
from .base import DataFlow
from ..utils import logger
from ..utils.serialize import dumps, loads
def send_dataflow_zmq(df, addr, hwm=50):
def send_dataflow_zmq(df, addr, hwm=50, print_interval=100):
"""
Run DataFlow and send data to a ZMQ socket addr.
It will dump and send each datapoint to this addr with a PUSH socket.
......@@ -33,11 +33,17 @@ def send_dataflow_zmq(df, addr, hwm=50):
socket.connect(addr)
try:
df.reset_state()
logger.info("Serving data to {}".format(addr))
# TODO print statistics such as speed
logger.info("Serving data to {} ...".format(addr))
q = deque(maxlen=print_interval)
with get_tqdm(total=0) as pbar:
while True:
for dp in df.get_data():
start = time.time()
socket.send(dumps(dp), copy=False)
q.append(time.time() - start)
pbar.update(1)
if pbar.n % print_interval == 0:
pbar.write("Avg send time: {}".format(sum(q) / len(q)))
finally:
socket.setsockopt(zmq.LINGER, 0)
socket.close()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment