Commit 65bbd28a authored by Yuxin Wu's avatar Yuxin Wu

[MaskRCNN] parallel evaluation in horovod mode

parent 79711478
...@@ -24,6 +24,7 @@ from tensorpack import * ...@@ -24,6 +24,7 @@ from tensorpack import *
from tensorpack.tfutils.summary import add_moving_summary from tensorpack.tfutils.summary import add_moving_summary
from tensorpack.tfutils import optimizer from tensorpack.tfutils import optimizer
from tensorpack.tfutils.common import get_tf_version_tuple from tensorpack.tfutils.common import get_tf_version_tuple
from tensorpack.utils.serialize import loads, dumps
import tensorpack.utils.viz as tpviz import tensorpack.utils.viz as tpviz
from coco import COCODetection from coco import COCODetection
...@@ -399,19 +400,31 @@ def predict(pred_func, input_file): ...@@ -399,19 +400,31 @@ def predict(pred_func, input_file):
class EvalCallback(Callback): class EvalCallback(Callback):
""" """
A callback that runs COCO evaluation once a while. A callback that runs COCO evaluation once a while.
It supports multi-GPU evaluation if TRAINER=='replicated' and single-GPU evaluation if TRAINER=='horovod' It supports multi-gpu evaluation.
""" """
_chief_only = False
def __init__(self, in_names, out_names): def __init__(self, in_names, out_names):
self._in_names, self._out_names = in_names, out_names self._in_names, self._out_names = in_names, out_names
def _setup_graph(self): def _setup_graph(self):
num_gpu = cfg.TRAIN.NUM_GPUS num_gpu = cfg.TRAIN.NUM_GPUS
# Use two predictor threads per GPU to get better throughput if cfg.TRAINER == 'replicated':
self.num_predictor = 1 if cfg.TRAINER == 'horovod' else num_gpu * 2 # Use two predictor threads per GPU to get better throughput
self.predictors = [self._build_coco_predictor(k % num_gpu) for k in range(self.num_predictor)] self.num_predictor = num_gpu * 2
self.dataflows = [get_eval_dataflow(shard=k, num_shards=self.num_predictor) self.predictors = [self._build_coco_predictor(k % num_gpu) for k in range(self.num_predictor)]
for k in range(self.num_predictor)] self.dataflows = [get_eval_dataflow(shard=k, num_shards=self.num_predictor)
for k in range(self.num_predictor)]
else:
self.predictor = self._build_coco_predictor(0)
self.dataflow = get_eval_dataflow(shard=hvd.rank(), num_shards=hvd.size())
# use uint8 to aggregate strings
self.local_result_tensor = tf.placeholder(tf.uint8, shape=[None], name='local_result_string')
self.concat_results = hvd.allgather(self.local_result_tensor, name='concat_results')
local_size = tf.expand_dims(tf.size(self.local_result_tensor), 0)
self.string_lens = hvd.allgather(local_size, name='concat_sizes')
def _build_coco_predictor(self, idx): def _build_coco_predictor(self, idx):
graph_func = self.trainer.get_predictor(self._in_names, self._out_names, device=idx) graph_func = self.trainer.get_predictor(self._in_names, self._out_names, device=idx)
...@@ -425,17 +438,31 @@ class EvalCallback(Callback): ...@@ -425,17 +438,31 @@ class EvalCallback(Callback):
if len(self.epochs_to_eval) < 15: if len(self.epochs_to_eval) < 15:
logger.info("[EvalCallback] Will evaluate at epoch " + str(sorted(self.epochs_to_eval))) logger.info("[EvalCallback] Will evaluate at epoch " + str(sorted(self.epochs_to_eval)))
else: else:
if cfg.TRAINER == 'horovod':
logger.warn("[EvalCallback] Evaluation is single-GPU only and quite slow under horovod mode.")
logger.info("[EvalCallback] Will evaluate every {} epochs".format(interval)) logger.info("[EvalCallback] Will evaluate every {} epochs".format(interval))
def _eval(self): def _eval(self):
with ThreadPoolExecutor(max_workers=self.num_predictor, thread_name_prefix='EvalWorker') as executor, \ if cfg.TRAINER == 'replicated' or cfg.TRAIN.NUM_GPUS == 1:
tqdm.tqdm(total=sum([df.size() for df in self.dataflows])) as pbar: with ThreadPoolExecutor(max_workers=self.num_predictor, thread_name_prefix='EvalWorker') as executor, \
futures = [] tqdm.tqdm(total=sum([df.size() for df in self.dataflows])) as pbar:
for dataflow, pred in zip(self.dataflows, self.predictors): futures = []
futures.append(executor.submit(eval_coco, dataflow, pred, pbar)) for dataflow, pred in zip(self.dataflows, self.predictors):
all_results = list(itertools.chain(*[fut.result() for fut in futures])) futures.append(executor.submit(eval_coco, dataflow, pred, pbar))
all_results = list(itertools.chain(*[fut.result() for fut in futures]))
else:
local_results = eval_coco(self.dataflow, self.predictor)
results_as_arr = np.frombuffer(dumps(local_results), dtype=np.uint8)
sizes, concat_arrs = tf.get_default_session().run(
[self.string_lens, self.concat_results],
feed_dict={self.local_result_tensor: results_as_arr})
if hvd.rank() > 0:
return
all_results = []
start = 0
for size in sizes:
substr = concat_arrs[start: start + size]
results = loads(substr.tobytes())
all_results.extend(results)
start = start + size
output_file = os.path.join( output_file = os.path.join(
logger.get_logger_dir(), 'outputs{}.json'.format(self.global_step)) logger.get_logger_dir(), 'outputs{}.json'.format(self.global_step))
...@@ -450,6 +477,7 @@ class EvalCallback(Callback): ...@@ -450,6 +477,7 @@ class EvalCallback(Callback):
def _trigger_epoch(self): def _trigger_epoch(self):
if self.epoch_num in self.epochs_to_eval: if self.epoch_num in self.epochs_to_eval:
logger.info("Running evaluation ...")
self._eval() self._eval()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment