Commit d1a90991 authored by Yuxin Wu's avatar Yuxin Wu Committed by GitHub

DataFlow Serializers (#802)

* dataflow serializers

* add missing file

* deprecate old stuff; update docs

* update examples & docs
parent 05e54783
......@@ -359,6 +359,10 @@ _DEPRECATED_NAMES = set([
'TryResumeTraining',
'QueueInputTrainer',
'SimplePredictBuilder',
'LMDBDataPoint',
'TFRecordData',
'dump_dataflow_to_lmdb',
'dump_dataflow_to_tfrecord',
# renamed stuff:
'DumpTensor',
......
......@@ -151,7 +151,7 @@ class BinaryILSVRC12(dataset.ILSVRC12Files):
yield [jpeg, label]
ds0 = BinaryILSVRC12('/path/to/ILSVRC/', 'train')
ds1 = PrefetchDataZMQ(ds0, nr_proc=1)
dftools.dump_dataflow_to_lmdb(ds1, '/path/to/ILSVRC-train.lmdb')
LMDBSerializer.save(ds1, '/path/to/ILSVRC-train.lmdb')
```
The above script builds a DataFlow which produces jpeg-encoded ImageNet data.
We store the jpeg string as a numpy array because the function `cv2.imdecode` later expect this format.
......@@ -160,9 +160,9 @@ from several forks of `ds0`, then neither the content nor the order of `ds1` wil
See [documentation](../modules/dataflow.html#tensorpack.dataflow.PrefetchDataZMQ)
about caveats of `PrefetchDataZMQ`.
It will generate a database file of 140G. We build a DataFlow to read this LMDB file sequentially:
It will generate a database file of 140G. We load the DataFlow back by reading this LMDB file sequentially:
```
ds = LMDBData('/path/to/ILSVRC-train.lmdb', shuffle=False)
ds = LMDBSerializer.load('/path/to/ILSVRC-train.lmdb', shuffle=False)
ds = BatchData(ds, 256, use_list=True)
TestDataSpeed(ds).start()
```
......@@ -175,7 +175,7 @@ As a reference, on Samsung SSD 850, the uncached speed is about 16it/s.
.. code-block:: python
:emphasize-lines: 2
ds = LMDBData('/path/to/ILSVRC-train.lmdb', shuffle=False)
ds = LMDBSerializer.load('/path/to/ILSVRC-train.lmdb', shuffle=False)
ds = LocallyShuffleData(ds, 50000)
ds = BatchData(ds, 256, use_list=True)
```
......@@ -189,15 +189,14 @@ Then we add necessary transformations:
.. code-block:: python
:emphasize-lines: 3-5
ds = LMDBData(db, shuffle=False)
ds = LMDBSerializer.load(db, shuffle=False)
ds = LocallyShuffleData(ds, 50000)
ds = LMDBDataPoint(ds)
ds = MapDataComponent(ds, lambda x: cv2.imdecode(x, cv2.IMREAD_COLOR), 0)
ds = AugmentImageComponent(ds, lots_of_augmentors)
ds = BatchData(ds, 256)
```
1. `LMDBDataPoint` deserialize the datapoints (from raw bytes to [jpeg bytes, label] -- what we dumped in `RawILSVRC12`)
1. First we deserialize the datapoints (from raw bytes to [jpeg bytes, label] -- what we dumped in `RawILSVRC12`)
2. Use OpenCV to decode the first component (jpeg bytes) into ndarray
3. Apply augmentations to the ndarray
......@@ -206,10 +205,9 @@ Both imdecode and the augmentors can be quite slow. We can parallelize them like
.. code-block:: python
:emphasize-lines: 3,7
ds = LMDBData(db, shuffle=False)
ds = LMDBSerializer.load(db, shuffle=False)
ds = LocallyShuffleData(ds, 50000)
ds = PrefetchData(ds, 5000, 1)
ds = LMDBDataPoint(ds)
ds = MapDataComponent(ds, lambda x: cv2.imdecode(x, cv2.IMREAD_COLOR), 0)
ds = AugmentImageComponent(ds, lots_of_augmentors)
ds = PrefetchDataZMQ(ds, 25)
......
......@@ -9,7 +9,7 @@ import numpy as np
import argparse
import bob.ap
from tensorpack.dataflow import dftools, DataFlow, LMDBDataPoint
from tensorpack.dataflow import DataFlow, LMDBSerializer
from tensorpack.utils.argtools import memoized
from tensorpack.utils.stats import OnlineMoments
from tensorpack.utils import serialize, fs, logger
......@@ -103,7 +103,7 @@ class RawTIMIT(DataFlow):
def compute_mean_std(db, fname):
ds = LMDBDataPoint(db, shuffle=False)
ds = LMDBSerializer.load(db, shuffle=False)
ds.reset_state()
o = OnlineMoments()
with get_tqdm(total=ds.size()) as bar:
......@@ -133,6 +133,6 @@ if __name__ == '__main__':
args = parser.parse_args()
if args.command == 'build':
ds = RawTIMIT(args.dataset)
dftools.dump_dataflow_to_lmdb(ds, args.db)
LMDBSerializer.save(ds, args.db)
elif args.command == 'stat':
compute_mean_std(args.db, args.output)
......@@ -78,7 +78,7 @@ class Model(ModelDesc):
def get_data(path, isTrain, stat_file):
ds = LMDBDataPoint(path, shuffle=isTrain)
ds = LMDBSerializer.load(path, shuffle=isTrain)
mean, std = serialize.loads(open(stat_file, 'rb').read())
ds = MapDataComponent(ds, lambda x: (x - mean) / std)
ds = TIMITBatch(ds, BATCH)
......
......@@ -3,7 +3,7 @@ import os
import argparse
import numpy as np
import zipfile
from tensorpack import RNGDataFlow, MapDataComponent, dftools
from tensorpack import RNGDataFlow, MapDataComponent, LMDBSerializer
class ImageDataFromZIPFile(RNGDataFlow):
......@@ -108,7 +108,7 @@ if __name__ == '__main__':
ds = CenterSquareResize(ds, index=0)
if args.create:
ds = ImageEncode(ds, index=0)
dftools.dump_dataflow_to_lmdb(ds, args.lmdb)
LMDBSerializer.save(ds, args.lmdb)
if args.debug:
ds.reset_state()
for i in ds.get_data():
......
......@@ -241,7 +241,7 @@ def apply(model_path, lowres_path="", output_path='.'):
def get_data(file_name):
if file_name.endswith('.lmdb'):
ds = LMDBDataPoint(file_name, shuffle=True)
ds = LMDBSerializer.load(file_name, shuffle=True)
ds = ImageDecode(ds, index=0)
elif file_name.endswith('.zip'):
ds = ImageDataFromZIPFile(file_name, shuffle=True)
......
......@@ -2,15 +2,13 @@
# File: dftools.py
import os
import multiprocessing as mp
from six.moves import range
from .base import DataFlow
from ..utils import logger
from ..utils.utils import get_tqdm
from ..utils.concurrency import DIE
from ..utils.serialize import dumps
from ..utils.develop import deprecated
from .serialize import LMDBSerializer, TFRecordSerializer
__all__ = ['dump_dataflow_to_process_queue',
'dump_dataflow_to_lmdb', 'dump_dataflow_to_tfrecord']
......@@ -55,84 +53,11 @@ def dump_dataflow_to_process_queue(df, size, nr_consumer):
return q, proc
@deprecated("Use LMDBSerializer.save instead!", "2019-01-31")
def dump_dataflow_to_lmdb(df, lmdb_path, write_frequency=5000):
"""
Dump a Dataflow to a lmdb database, where the keys are indices and values
are serialized datapoints.
The output database can be read directly by
:class:`tensorpack.dataflow.LMDBDataPoint`.
Args:
df (DataFlow): the DataFlow to dump.
lmdb_path (str): output path. Either a directory or a mdb file.
write_frequency (int): the frequency to write back data to disk.
"""
assert isinstance(df, DataFlow), type(df)
isdir = os.path.isdir(lmdb_path)
if isdir:
assert not os.path.isfile(os.path.join(lmdb_path, 'data.mdb')), "LMDB file exists!"
else:
assert not os.path.isfile(lmdb_path), "LMDB file exists!"
df.reset_state()
db = lmdb.open(lmdb_path, subdir=isdir,
map_size=1099511627776 * 2, readonly=False,
meminit=False, map_async=True) # need sync() at the end
try:
sz = df.size()
except NotImplementedError:
sz = 0
with get_tqdm(total=sz) as pbar:
idx = -1
# LMDB transaction is not exception-safe!
# although it has a context manager interface
txn = db.begin(write=True)
for idx, dp in enumerate(df.get_data()):
txn.put(u'{}'.format(idx).encode('ascii'), dumps(dp))
pbar.update()
if (idx + 1) % write_frequency == 0:
txn.commit()
txn = db.begin(write=True)
txn.commit()
keys = [u'{}'.format(k).encode('ascii') for k in range(idx + 1)]
with db.begin(write=True) as txn:
txn.put(b'__keys__', dumps(keys))
logger.info("Flushing database ...")
db.sync()
db.close()
LMDBSerializer.save(df, lmdb_path, write_frequency)
@deprecated("Use TFRecordSerializer.save instead!", "2019-01-31")
def dump_dataflow_to_tfrecord(df, path):
"""
Dump all datapoints of a Dataflow to a TensorFlow TFRecord file,
using :func:`serialize.dumps` to serialize.
Args:
df (DataFlow):
path (str): the output file path
"""
df.reset_state()
with tf.python_io.TFRecordWriter(path) as writer:
try:
sz = df.size()
except NotImplementedError:
sz = 0
with get_tqdm(total=sz) as pbar:
for dp in df.get_data():
writer.write(dumps(dp).to_pybytes())
pbar.update()
from ..utils.develop import create_dummy_func # noqa
try:
import lmdb
except ImportError:
dump_dataflow_to_lmdb = create_dummy_func('dump_dataflow_to_lmdb', 'lmdb') # noqa
try:
import tensorflow as tf
except ImportError:
dump_dataflow_to_tfrecord = create_dummy_func( # noqa
'dump_dataflow_to_tfrecord', 'tensorflow')
TFRecordSerializer.save(df, path)
......@@ -13,6 +13,7 @@ from ..utils.timer import timed_operation
from ..utils.loadcaffe import get_caffe_pb
from ..utils.serialize import loads
from ..utils.argtools import log_once
from ..utils.develop import log_deprecated
from .base import RNGDataFlow, DataFlow, DataFlowReentrantGuard
from .common import MapData
......@@ -65,7 +66,7 @@ class LMDBData(RNGDataFlow):
Read a LMDB database and produce (k,v) raw bytes pairs.
The raw bytes are usually not what you're interested in.
You might want to use
:class:`LMDBDataDecoder`, :class:`LMDBDataPoint`, or apply a
:class:`LMDBDataDecoder` or apply a
mapper function after :class:`LMDBData`.
"""
def __init__(self, lmdb_path, shuffle=True, keys=None):
......@@ -77,8 +78,8 @@ class LMDBData(RNGDataFlow):
It can also be a format string e.g. ``{:0>8d}`` which will be
formatted with the indices from 0 to *total_size - 1*.
If not provided, it will then look in the database for ``__keys__`` which
:func:`dump_dataflow_to_lmdb` used to store the list of keys.
If not given, it will then look in the database for ``__keys__`` which
:func:`LMDBSerializer.save` used to store the list of keys.
If still not found, it will iterate over the database to find
all the keys.
"""
......@@ -150,7 +151,7 @@ class LMDBData(RNGDataFlow):
class LMDBDataDecoder(MapData):
""" Read a LMDB database and produce a decoded output."""
""" Read a LMDB database with a custom decoder and produce decoded outputs."""
def __init__(self, lmdb_data, decoder):
"""
Args:
......@@ -164,33 +165,8 @@ class LMDBDataDecoder(MapData):
class LMDBDataPoint(MapData):
"""
Read a LMDB file and produce deserialized datapoints.
It **only** accepts the database produced by
:func:`tensorpack.dataflow.dftools.dump_dataflow_to_lmdb`,
which uses :func:`tensorpack.utils.serialize.dumps` for serialization.
Example:
.. code-block:: python
ds = LMDBDataPoint("/data/ImageNet.lmdb", shuffle=False) # read and decode
# The above is equivalent to:
ds = LMDBData("/data/ImageNet.lmdb", shuffle=False) # read
ds = LMDBDataPoint(ds) # decode
# Sometimes it makes sense to separate reading and decoding
# to be able to make decoding parallel.
"""
def __init__(self, *args, **kwargs):
"""
Args:
args, kwargs: Same as in :class:`LMDBData`.
In addition, args[0] can be a :class:`LMDBData` instance.
In this case args[0] has to be the only argument.
"""
log_deprecated("LMDBDataPoint", "Use LMDBSerializer.load() instead!", "2019-01-31")
if isinstance(args[0], DataFlow):
ds = args[0]
assert len(args) == 1 and len(kwargs) == 0, \
......@@ -267,18 +243,8 @@ class SVMLightData(RNGDataFlow):
class TFRecordData(DataFlow):
"""
Produce datapoints from a TFRecord file, assuming each record is
serialized by :func:`serialize.dumps`.
This class works with :func:`dftools.dump_dataflow_to_tfrecord`.
"""
def __init__(self, path, size=None):
"""
Args:
path (str): path to the TFRecord file
size (int): total number of records, because this metadata is not
stored in the TFRecord file.
"""
log_deprecated("TFRecordData", "Use TFRecordSerializer.load instead!", "2019-01-31")
self._path = path
self._size = int(size)
......
# -*- coding: utf-8 -*-
# File: serialize.py
import os
import numpy as np
from ..utils.utils import get_tqdm
from ..utils import logger
from ..utils.serialize import dumps, loads
from .base import DataFlow
from .format import LMDBData
from .common import MapData, FixedSizeData
from .raw import DataFromList, DataFromGenerator
__all__ = ['LMDBSerializer', 'NumpySerializer', 'TFRecordSerializer']
def _reset_df_and_get_size(df):
df.reset_state()
try:
sz = df.size()
except NotImplementedError:
sz = 0
return sz
class LMDBSerializer():
"""
Serialize a Dataflow to a lmdb database, where the keys are indices and values
are serialized datapoints.
You will need to `pip install lmdb` to use it.
"""
@staticmethod
def save(df, path, write_frequency=5000):
"""
Args:
df (DataFlow): the DataFlow to serialize.
path (str): output path. Either a directory or an lmdb file.
write_frequency (int): the frequency to write back data to disk.
"""
assert isinstance(df, DataFlow), type(df)
isdir = os.path.isdir(path)
if isdir:
assert not os.path.isfile(os.path.join(path, 'data.mdb')), "LMDB file exists!"
else:
assert not os.path.isfile(path), "LMDB file exists!"
db = lmdb.open(path, subdir=isdir,
map_size=1099511627776 * 2, readonly=False,
meminit=False, map_async=True) # need sync() at the end
size = _reset_df_and_get_size(df)
with get_tqdm(total=size) as pbar:
idx = -1
# LMDB transaction is not exception-safe!
# although it has a context manager interface
txn = db.begin(write=True)
for idx, dp in enumerate(df.get_data()):
txn.put(u'{:08}'.format(idx).encode('ascii'), dumps(dp))
pbar.update()
if (idx + 1) % write_frequency == 0:
txn.commit()
txn = db.begin(write=True)
txn.commit()
keys = [u'{:08}'.format(k).encode('ascii') for k in range(idx + 1)]
with db.begin(write=True) as txn:
txn.put(b'__keys__', dumps(keys))
logger.info("Flushing database ...")
db.sync()
db.close()
@staticmethod
def load(path, shuffle=True):
"""
Note:
If you found deserialization being the bottleneck, you can use :class:`LMDBData` as the reader
and run deserialization as a mapper in parallel.
"""
df = LMDBData(path, shuffle=shuffle)
return MapData(df, lambda dp: loads(dp[1]))
class NumpySerializer():
"""
Serialize the entire dataflow to a npz dict.
Note that this would have to store the entire dataflow in memory,
and is also >10x slower than the other serializers.
"""
@staticmethod
def save(df, path):
"""
Args:
df (DataFlow): the DataFlow to serialize.
path (str): output npz file.
"""
buffer = []
size = _reset_df_and_get_size(df)
with get_tqdm(total=size) as pbar:
for dp in df.get_data():
buffer.append(dp)
pbar.update()
np.savez_compressed(path, buffer=buffer)
@staticmethod
def load(path, shuffle=True):
buffer = np.load(path)['buffer']
return DataFromList(buffer, shuffle=shuffle)
class TFRecordSerializer():
"""
Serialize datapoints to bytes (by tensorpack's default serializer) and write to a TFRecord file.
Note that TFRecord does not support random access and is in fact not very performant.
It's better to use :class:`LMDBSerializer`.
"""
@staticmethod
def save(df, path):
"""
Args:
df (DataFlow): the DataFlow to serialize.
path (str): output tfrecord file.
"""
size = _reset_df_and_get_size(df)
with tf.python_io.TFRecordWriter(path) as writer, get_tqdm(total=size) as pbar:
for dp in df.get_data():
writer.write(dumps(dp).to_pybytes())
pbar.update()
@staticmethod
def load(path, size=None):
"""
Args:
size (int): total number of records. If not provided, the returned dataflow will have no `size()`.
It's needed because this metadata is not stored in the TFRecord file.
"""
gen = tf.python_io.tf_record_iterator(path)
ds = DataFromGenerator(gen)
ds = MapData(ds, loads)
if size is not None:
ds = FixedSizeData(ds, size)
return ds
from ..utils.develop import create_dummy_class # noqa
try:
import lmdb
except ImportError:
LMDBSerializer = create_dummy_class('LMDBSerializer', 'lmdb') # noqa
try:
import tensorflow as tf
except ImportError:
TFRecordSerializer = create_dummy_class('TFRecordSerializer', 'tensorflow') # noqa
if __name__ == '__main__':
from .raw import FakeData
import time
ds = FakeData([[300, 300, 3], [1]], 1000)
print(time.time())
TFRecordSerializer.save(ds, 'out.tfrecords')
print(time.time())
df = TFRecordSerializer.load('out.tfrecords', size=1000)
df.reset_state()
for idx, dp in enumerate(df.get_data()):
pass
print("TF Finished, ", idx)
print(time.time())
LMDBSerializer.save(ds, 'out.lmdb')
print(time.time())
df = LMDBSerializer.load('out.lmdb')
df.reset_state()
for idx, dp in enumerate(df.get_data()):
pass
print("LMDB Finished, ", idx)
print(time.time())
NumpySerializer.save(ds, 'out.npz')
print(time.time())
df = NumpySerializer.load('out.npz')
df.reset_state()
for idx, dp in enumerate(df.get_data()):
pass
print("Numpy Finished, ", idx)
print(time.time())
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment