import os.path as osp
import pickle
import shutil
import tempfile
import warnings
import mmcv
import torch
import torch.distributed as dist
from mmcv.runner import get_dist_info
try:
from mmcv.engine import (single_gpu_test, multi_gpu_test,
collect_results_gpu, collect_results_cpu)
from_mmcv = True
except (ImportError, ModuleNotFoundError):
warnings.warn(
'DeprecationWarning: single_gpu_test, multi_gpu_test, '
'collect_results_cpu, collect_results_gpu from mmaction2 will be '
'deprecated. Please install mmcv through master branch.')
from_mmcv = False
if not from_mmcv:
def single_gpu_test(model, data_loader):
"""Test model with a single gpu.
This method tests model with a single gpu and
displays test progress bar.
Args:
model (nn.Module): Model to be tested.
data_loader (nn.Dataloader): Pytorch data loader.
Returns:
list: The prediction results.
"""
model.eval()
results = []
for data in data_loader:
with torch.no_grad():
result = model(return_loss=False, **data)
results.extend(result)
return results
def multi_gpu_test(
model, data_loader, tmpdir=None, gpu_collect=True):
"""Test model with multiple gpus.
This method tests model with multiple gpus and collects the results
under two different modes: gpu and cpu modes. By setting
'gpu_collect=True' it encodes results to gpu tensors and use gpu
communication for results collection. On cpu mode it saves the results
on different gpus to 'tmpdir' and collects them by the rank 0 worker.
Args:
model (nn.Module): Model to be tested.
data_loader (nn.Dataloader): Pytorch data loader.
tmpdir (str): Path of directory to save the temporary results from
different gpus under cpu mode. Default: None
gpu_collect (bool): Option to use either gpu or cpu to collect
results. Default: True
Returns:
list: The prediction results.
"""
model.eval()
results = []
dataset = data_loader.dataset
rank, world_size = get_dist_info()
for data in data_loader:
with torch.no_grad():
result = model(return_loss=False, **data)
results.extend(result)
if gpu_collect:
results = collect_results_gpu(results, len(dataset))
else:
results = collect_results_cpu(results, len(dataset), tmpdir)
return results
def collect_results_cpu(result_part, size, tmpdir=None):
"""Collect results in cpu mode.
It saves the results on different gpus to 'tmpdir' and collects
them by the rank 0 worker.
Args:
result_part (list): Results to be collected
size (int): Result size.
tmpdir (str): Path of directory to save the temporary results from
different gpus under cpu mode. Default: None
Returns:
list: Ordered results.
"""
rank, world_size = get_dist_info()
if tmpdir is None:
MAX_LEN = 512
dir_tensor = torch.full((MAX_LEN, ),
32,
dtype=torch.uint8,
device='cuda')
if rank == 0:
mmcv.mkdir_or_exist('.dist_test')
tmpdir = tempfile.mkdtemp(dir='.dist_test')
tmpdir = torch.tensor(
bytearray(tmpdir.encode()),
dtype=torch.uint8,
device='cuda')
dir_tensor[:len(tmpdir)] = tmpdir
dist.broadcast(dir_tensor, 0)
tmpdir = dir_tensor.cpu().numpy().tobytes().decode().rstrip()
else:
tmpdir = osp.join(tmpdir, '.dist_test')
mmcv.mkdir_or_exist(tmpdir)
dist.barrier()
mmcv.dump(result_part, osp.join(tmpdir, f'part_{rank}.pkl'))
dist.barrier()
if rank != 0:
return None
part_list = []
for i in range(world_size):
part_file = osp.join(tmpdir, f'part_{i}.pkl')
part_list.append(mmcv.load(part_file))
ordered_results = []
for res in zip(*part_list):
ordered_results.extend(list(res))
ordered_results = ordered_results[:size]
shutil.rmtree(tmpdir)
return ordered_results
def collect_results_gpu(result_part, size):
"""Collect results in gpu mode.
It encodes results to gpu tensors and use gpu communication for results
collection.
Args:
result_part (list): Results to be collected
size (int): Result size.
Returns:
list: Ordered results.
"""
rank, world_size = get_dist_info()
part_tensor = torch.tensor(
bytearray(pickle.dumps(result_part)),
dtype=torch.uint8,
device='cuda')
shape_tensor = torch.tensor(part_tensor.shape, device='cuda')
shape_list = [shape_tensor.clone() for _ in range(world_size)]
dist.all_gather(shape_list, shape_tensor)
shape_max = torch.tensor(shape_list).max()
part_send = torch.zeros(shape_max, dtype=torch.uint8, device='cuda')
part_send[:shape_tensor[0]] = part_tensor
part_recv_list = [
part_tensor.new_zeros(shape_max) for _ in range(world_size)
]
dist.all_gather(part_recv_list, part_send)
if rank == 0:
part_list = []
for recv, shape in zip(part_recv_list, shape_list):
part_list.append(
pickle.loads(recv[:shape[0]].cpu().numpy().tobytes()))
ordered_results = []
for res in zip(*part_list):
ordered_results.extend(list(res))
ordered_results = ordered_results[:size]
return ordered_results
return None