Commit 4521b1f7 authored by Yuxin Wu's avatar Yuxin Wu

add docs and fix initialization in ThreadedMapData (fix #293)

parent 815e9439
...@@ -212,8 +212,9 @@ class ThreadedMapData(ProxyDataFlow): ...@@ -212,8 +212,9 @@ class ThreadedMapData(ProxyDataFlow):
Note that the threads will only start in the process which calls Note that the threads will only start in the process which calls
`reset_state()`. `reset_state()`.
""" """
class WorkerThread(StoppableThread): class _WorkerThread(StoppableThread):
def __init__(self, inq, outq, map_func): def __init__(self, inq, outq, map_func):
super(ThreadedMapData._WorkerThread, self).__init__()
self.inq = inq self.inq = inq
self.outq = outq self.outq = outq
self.func = map_func self.func = map_func
...@@ -228,7 +229,10 @@ class ThreadedMapData(ProxyDataFlow): ...@@ -228,7 +229,10 @@ class ThreadedMapData(ProxyDataFlow):
def __init__(self, ds, nr_thread, map_func, buffer_size=200): def __init__(self, ds, nr_thread, map_func, buffer_size=200):
""" """
Args: Args:
pass ds (DataFlow): the dataflow to map
nr_thread (int): number of threads to use
map_func (callable): datapoint -> datapoint | None
buffer_size (int): number of datapoints in the buffer
""" """
super(ThreadedMapData, self).__init__(ds) super(ThreadedMapData, self).__init__(ds)
self.infinite_ds = RepeatedData(ds, -1) self.infinite_ds = RepeatedData(ds, -1)
...@@ -244,7 +248,7 @@ class ThreadedMapData(ProxyDataFlow): ...@@ -244,7 +248,7 @@ class ThreadedMapData(ProxyDataFlow):
t.join() t.join()
self._in_queue = queue.Queue() self._in_queue = queue.Queue()
self._out_queue = queue.Queue() self._out_queue = queue.Queue()
self._threads = [ThreadedMapData.WorkerThread( self._threads = [ThreadedMapData._WorkerThread(
self._in_queue, self._out_queue, self.map_func) self._in_queue, self._out_queue, self.map_func)
for _ in range(self.nr_thread)] for _ in range(self.nr_thread)]
for t in self._threads: for t in self._threads:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment