Commit be7eae86 authored by Yuxin Wu's avatar Yuxin Wu

Do not try to clear the queue at first reset. (fix #548)

parent 3b994877
...@@ -497,6 +497,9 @@ class JoinData(DataFlow): ...@@ -497,6 +497,9 @@ class JoinData(DataFlow):
d.reset_state() d.reset_state()
def size(self): def size(self):
"""
Return the minimum size among all.
"""
return min([k.size() for k in self.df_lists]) return min([k.size() for k in self.df_lists])
def get_data(self): def get_data(self):
......
...@@ -185,6 +185,7 @@ class QueueInput(FeedfreeInput): ...@@ -185,6 +185,7 @@ class QueueInput(FeedfreeInput):
self.queue = queue self.queue = queue
self.ds = ds self.ds = ds
self._inf_ds = RepeatedData(ds, -1) self._inf_ds = RepeatedData(ds, -1)
self._started = False
def _size(self): def _size(self):
return self.ds.size() return self.ds.size()
...@@ -204,21 +205,24 @@ class QueueInput(FeedfreeInput): ...@@ -204,21 +205,24 @@ class QueueInput(FeedfreeInput):
self._dequeue_op = self.queue.dequeue(name='dequeue_for_reset') self._dequeue_op = self.queue.dequeue(name='dequeue_for_reset')
def _reset_state(self): def _reset_state(self):
self.thread.pause() # pause enqueue if self._started: # do not try to clear the queue if there is nothing
self.thread.pause() # pause enqueue
opt = tf.RunOptions()
opt.timeout_in_ms = 2000 # 2s opt = tf.RunOptions()
sess = tf.get_default_session() opt.timeout_in_ms = 2000 # 2s
# dequeue until empty sess = tf.get_default_session()
try: # dequeue until empty
while True: try:
sess.run(self._dequeue_op, options=opt) while True:
except tf.errors.DeadlineExceededError: sess.run(self._dequeue_op, options=opt)
pass except tf.errors.DeadlineExceededError:
pass
# reset dataflow, start thread
self.thread.reinitialize_dataflow() # reset dataflow, start thread
self.thread.resume() self.thread.reinitialize_dataflow()
self.thread.resume()
else:
self._started = True
def _create_ema_callback(self): def _create_ema_callback(self):
""" """
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment