Commit 9a1043e1 authored by Yuxin Wu's avatar Yuxin Wu
parent 7158eede
...@@ -287,7 +287,7 @@ class MapData(ProxyDataFlow): ...@@ -287,7 +287,7 @@ class MapData(ProxyDataFlow):
Args: Args:
ds (DataFlow): input DataFlow ds (DataFlow): input DataFlow
func (datapoint -> datapoint | None): takes a datapoint and returns a new func (datapoint -> datapoint | None): takes a datapoint and returns a new
datapoint. Return None to discard this datapoint. datapoint. Return None to discard/skip this datapoint.
""" """
super(MapData, self).__init__(ds) super(MapData, self).__init__(ds)
self.func = func self.func = func
...@@ -321,7 +321,7 @@ class MapDataComponent(MapData): ...@@ -321,7 +321,7 @@ class MapDataComponent(MapData):
Args: Args:
ds (DataFlow): input DataFlow which produces either list or dict. ds (DataFlow): input DataFlow which produces either list or dict.
func (TYPE -> TYPE|None): takes ``dp[index]``, returns a new value for ``dp[index]``. func (TYPE -> TYPE|None): takes ``dp[index]``, returns a new value for ``dp[index]``.
return None to discard this datapoint. Return None to discard/skip this datapoint.
index (int or str): index or key of the component. index (int or str): index or key of the component.
""" """
self._index = index self._index = index
......
...@@ -149,7 +149,8 @@ class MultiThreadMapData(_ParallelMapData): ...@@ -149,7 +149,8 @@ class MultiThreadMapData(_ParallelMapData):
Args: Args:
ds (DataFlow): the dataflow to map ds (DataFlow): the dataflow to map
nr_thread (int): number of threads to use nr_thread (int): number of threads to use
map_func (callable): datapoint -> datapoint | None map_func (callable): datapoint -> datapoint | None. Return None to
discard/skip the datapoint.
buffer_size (int): number of datapoints in the buffer buffer_size (int): number of datapoints in the buffer
strict (bool): use "strict mode", see notes above. strict (bool): use "strict mode", see notes above.
""" """
...@@ -250,7 +251,8 @@ class MultiProcessMapDataZMQ(_ParallelMapData, _MultiProcessZMQDataFlow): ...@@ -250,7 +251,8 @@ class MultiProcessMapDataZMQ(_ParallelMapData, _MultiProcessZMQDataFlow):
Args: Args:
ds (DataFlow): the dataflow to map ds (DataFlow): the dataflow to map
nr_proc(int): number of threads to use nr_proc(int): number of threads to use
map_func (callable): datapoint -> datapoint | None map_func (callable): datapoint -> datapoint | None. Return None to
discard/skip the datapoint.
buffer_size (int): number of datapoints in the buffer buffer_size (int): number of datapoints in the buffer
strict (bool): use "strict mode", see notes above. strict (bool): use "strict mode", see notes above.
""" """
......
...@@ -24,7 +24,9 @@ def loads_msgpack(buf): ...@@ -24,7 +24,9 @@ def loads_msgpack(buf):
Args: Args:
buf: the output of `dumps`. buf: the output of `dumps`.
""" """
return msgpack.loads(buf, raw=False) # Since 0.6, the default max size was set to 1MB.
# We change it to approximately 1G.
return msgpack.loads(buf, raw=False, max_bin_len=1000000000)
def dumps_pyarrow(obj): def dumps_pyarrow(obj):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment