import argparse
import os
import random
import shutil
import time
import warnings
import math
import glob
import numpy as np
import sys
import torch
import torch.npu
import torch.nn as nn
import torch.nn.parallel
import torch.backends.cudnn as cudnn
import torch.distributed as dist
import torch.nn.functional as F
import torch.optim
import torch.utils.data
import torch.utils.data.distributed
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torchvision.models as models
from collections import OrderedDict
import torch.onnx
import pth2onnx
sys.path.append(os.path.join(os.path.abspath(os.path.dirname(__file__)), '../'))
import mnasnet
import moxing as mox
try:
from apex.parallel import DistributedDataParallel as DDP
from apex.fp16_utils import *
from apex import amp, optimizers
except ImportError:
raise ImportError("Please install apex from https://www.github.com/nvidia/apex to run this example.")
parser = argparse.ArgumentParser(description='PyTorch ImageNet Training')
parser.add_argument('--data', metavar='DIR',
help='path to dataset')
parser.add_argument('--device', default='npu', type=str, help='npu or gpu')
parser.add_argument('--device-list', default='0,1,2,3,4,5,6,7', type=str, help='device id list')
parser.add_argument('-a', '--arch', metavar='ARCH', default='resnet18')
parser.add_argument('-j', '--workers', default=4, type=int, metavar='N',
help='number of data loading workers (default: 4)')
parser.add_argument('--epochs', default=90, type=int, metavar='N',
help='number of total epochs to run')
parser.add_argument('--start-epoch', default=0, type=int, metavar='N',
help='manual epoch number (useful on restarts)')
parser.add_argument('-b', '--batch-size', default=256, type=int,
metavar='N',
help='mini-batch size (default: 256), this is the total '
'batch size of all NPUs on the current node when '
'using Data Parallel or Distributed Data Parallel')
parser.add_argument('--label-smoothing', '--ls', default=0.1, type=float)
parser.add_argument('--lr', '--learning-rate', default=0.1, type=float,
metavar='LR', help='initial learning rate', dest='lr')
parser.add_argument('--momentum', default=0.9, type=float, metavar='M',
help='momentum')
parser.add_argument('--wd', '--weight-decay', default=1e-4, type=float,
metavar='W', help='weight decay (default: 1e-4)',
dest='weight_decay')
parser.add_argument('-p', '--print-freq', default=10, type=int,
metavar='N', help='print frequency (default: 10)')
parser.add_argument('--resume', default='', type=str, metavar='PATH',
help='path to latest checkpoint (default: none)')
parser.add_argument('-e', '--evaluate', dest='evaluate', action='store_true',
help='evaluate model on validation set')
parser.add_argument('--pretrained', dest='pretrained', action='store_true',
help='use pre-trained model')
parser.add_argument('--world-size', default=-1, type=int,
help='number of nodes for distributed training')
parser.add_argument('--rank', default=-1, type=int,
help='node rank for distributed training')
parser.add_argument('--dist-url', default='tcp://224.66.41.62:23456', type=str,
help='url used to set up distributed training')
parser.add_argument('--dist-backend', default='nccl', type=str,
help='distributed backend')
parser.add_argument('--seed', default=1, type=int,
help='seed for initializing training. ')
parser.add_argument('--npu', default=0, type=int,
help='NPU id to use.')
parser.add_argument('--warmup', default=0, type=int,
help='Warmup epochs.')
parser.add_argument('--local_rank', default=0, type=int,
help="rank id of process")
parser.add_argument('--run-prof', action='store_true', help='only for prof')
parser.add_argument('--multiprocessing-distributed', action='store_true',
help='Use multi-processing distributed training to launch '
'N processes per node, which has N NPUs. This is the '
'fastest way to use PyTorch for either single node or '
'multi node data parallel training')
parser.add_argument('--pretrained_weight', dest='pretrained_weight',
help='pretrained weight dir')
parser.add_argument('--data_url', metavar='DIR', default='/cache/data_url', help='path to dataset')
parser.add_argument('--train_url', default="/cache/training",
type=str,
help="setting dir of training output")
parser.add_argument('--onnx', default=True, action='store_true',
help="convert pth model to onnx")
parser.add_argument('--class_num', default=1000, type=int,
help='number of class')
best_acc1 = 0
CALCULATE_DEVICE = "npu:0"
CACHE_TRAINING_URL = "/cache/training"
is_best_name = "checkpoint.pth.tar"
def main():
args = parser.parse_args()
print('===========================')
print(args)
print('===========================')
if args.npu is None:
args.npu = 0
global CALCULATE_DEVICE
if 'npu' in CALCULATE_DEVICE:
torch.npu.set_device(CALCULATE_DEVICE)
if args.seed is not None:
random.seed(args.seed)
torch.manual_seed(args.seed)
cudnn.deterministic = True
warnings.warn('You have chosen to seed training. '
'This will turn on the CUDNN deterministic setting, '
'which can slow down your training considerably! '
'You may see unexpected behavior when restarting '
'from checkpoints.')
if args.npu is not None:
warnings.warn('You have chosen a specific NPU. This will completely '
'disable data parallelism.')
if args.dist_url == "env://" and args.world_size == -1:
args.world_size = int(os.environ["WORLD_SIZE"])
args.distributed = args.world_size > 1 or args.multiprocessing_distributed
args.process_device_map = device_id_to_process_device_map(args.device_list)
if args.device == 'npu':
ngpus_per_node = len(args.process_device_map)
else:
ngpus_per_node = torch.cuda.device_count()
if args.multiprocessing_distributed:
args.world_size = ngpus_per_node * args.world_size
main_worker(args.local_rank, ngpus_per_node, args)
else:
main_worker(args.npu, ngpus_per_node, args)
def main_worker(npu, ngpus_per_node, args):
global best_acc1
if args.npu is not None:
print("Use NPU: {} for training".format(args.npu))
if args.distributed:
if args.dist_url == "env://" and args.rank == -1:
args.rank = int(os.environ["RANK"])
if args.multiprocessing_distributed:
args.rank = args.rank * ngpus_per_node + npu
if args.device == 'npu':
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = '29688'
dist.init_process_group(backend=args.dist_backend,
world_size=args.world_size, rank=args.rank)
else:
dist.init_process_group(backend=args.dist_backend, init_method=args.dist_url,
world_size=args.world_size, rank=args.rank)
if args.pretrained:
print("=> using pre-trained model '{}'".format(args.arch))
CACHE_MODEL_URL = "/cache/model"
os.makedirs(CACHE_MODEL_URL, exist_ok=True)
mox.file.copy_parallel(args.pretrained_weight, os.path.join(CACHE_MODEL_URL, "checkpoint.pth"))
pth = os.path.join(CACHE_MODEL_URL, "checkpoint.pth")
pretrained_dict = torch.load(pth, map_location="cpu")
model = mnasnet.mnasnet1_0(num_classes=args.class_num)
if "classifier.1.weight" in pretrained_dict:
pretrained_dict.pop("classifier.1.weight")
pretrained_dict.pop("classifier.1.bias")
if "module.classifier.1.weight" in pretrained_dict:
pretrained_dict.pop("module.classifier.1.weight")
pretrained_dict.pop("module.classifier.1.bias")
model.load_state_dict(pretrained_dict, strict=False)
else:
print("=> creating model '{}'".format('mansnet'))
model = mnasnet.mnasnet1_0(num_classes=args.class_num)
args.loss_scale = 128
loc = 'npu:{}'.format(args.npu)
torch.npu.set_device(loc)
args.batch_size = int(args.batch_size / ngpus_per_node)
real_path = '/cache/data_url'
if not os.path.exists(real_path):
os.makedirs(real_path)
mox.file.copy_parallel(args.data_url, real_path)
print("training data finish copy to %s." % real_path)
traindir = os.path.join(real_path, 'train')
valdir = os.path.join(real_path, 'val')
normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
train_dataset = datasets.ImageFolder(
traindir,
transforms.Compose([
transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
normalize,
]))
val_dataset = datasets.ImageFolder(valdir, transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
normalize,
]))
train_sampler = None
val_sampler = None
if args.distributed:
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
train_loader = torch.utils.data.DataLoader(
train_dataset, batch_size=args.batch_size, shuffle=(train_sampler is None),
num_workers=args.workers,
pin_memory=False,
sampler=train_sampler,
drop_last=True)
val_loader = torch.utils.data.DataLoader(
val_dataset,
batch_size=args.batch_size, shuffle=False,
num_workers=args.workers, pin_memory=False,
sampler=val_sampler,
drop_last=True)
model = model.to(loc)
criterion = LabelSmoothingCrossEntropy().to(loc)
optimizer = torch.optim.SGD(model.parameters(), args.lr,
momentum=args.momentum,
weight_decay=args.weight_decay,
nesterov=True)
lr_schedule = CosineWithWarmup(optimizer, args.warmup, 0.1, args.epochs)
model, optimizer = amp.initialize(model, optimizer, opt_level="O1", loss_scale=args.loss_scale)
if args.multiprocessing_distributed:
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.npu], broadcast_buffers=False)
if args.resume:
if os.path.isfile(args.resume):
print("=> loading checkpoint '{}'".format(args.resume))
checkpoint = torch.load(args.resume, map_location=loc)
args.start_epoch = checkpoint['epoch']
best_acc1 = checkpoint['best_acc1']
model.load_state_dict(checkpoint['state_dict'])
optimizer.load_state_dict(checkpoint['optimizer'])
amp.load_state_dict(checkpoint['amp'])
print("=> loaded checkpoint '{}' (epoch {})"
.format(args.resume, checkpoint['epoch']))
else:
print("=> no checkpoint found at '{}'".format(args.resume))
cudnn.benchmark = True
if args.evaluate:
validate(val_loader, model, criterion, args)
return
for epoch in range(args.start_epoch, args.epochs):
if args.distributed:
train_sampler.set_epoch(epoch)
lr_schedule.step(epoch)
if args.rank == 0:
print('lr = ', lr_schedule.get_lr()[0])
file = open('log.txt', 'a')
print('lr = ', lr_schedule.get_lr()[0], file=file)
file.close()
if args.run_prof:
runprof(train_loader, model, criterion, optimizer, epoch, args)
print('output to output.prof')
return
train(train_loader, model, criterion, optimizer, epoch, args)
acc1 = validate(val_loader, model, criterion, args)
is_best = acc1 >= best_acc1
best_acc1 = max(acc1, best_acc1)
if not args.multiprocessing_distributed or (args.multiprocessing_distributed
and args.rank % ngpus_per_node == 0):
save_checkpoint({
'epoch': epoch + 1,
'arch': args.arch,
'state_dict': model.state_dict(),
'best_acc1': best_acc1,
'optimizer': optimizer.state_dict(),
'amp': amp.state_dict(),
}, is_best, args)
if args.onnx:
pth2onnx.convert_pth_to_onnx(args)
mox.file.copy_parallel(CACHE_TRAINING_URL, args.train_url)
def train(train_loader, model, criterion, optimizer, epoch, args):
batch_time = AverageMeter('Time', ':6.3f')
data_time = AverageMeter('Data', ':6.3f')
losses = AverageMeter('Loss', ':.4e')
top1 = AverageMeter('Acc@1', ':6.2f')
top5 = AverageMeter('Acc@5', ':6.2f')
FPS = AverageMeter('FPS', ':6.2f')
progress = ProgressMeter(
len(train_loader),
[batch_time, data_time, losses, top1, top5, FPS],
prefix="Epoch: [{}]".format(epoch),
fpath='./log.txt')
model.train()
end = time.time()
for i, (images, target) in enumerate(train_loader):
data_time.update(time.time() - end)
loc = 'npu:{}'.format(args.npu)
target = target.to(torch.int32)
images, target = images.to(loc, non_blocking=False), target.to(loc, non_blocking=False)
output = model(images)
loss = criterion(output, target)
acc1, acc5 = accuracy(output, target, topk=(1, 5))
losses.update(loss.item(), images.size(0))
top1.update(acc1[0], images.size(0))
top5.update(acc5[0], images.size(0))
optimizer.zero_grad()
with amp.scale_loss(loss, optimizer) as scaled_loss:
scaled_loss.backward()
optimizer.step()
batch_time_nw = time.time() - end;
if i >= 5:
batch_time.update(batch_time_nw)
if i >= 2:
batch_size = images.size(0)
FPS.update(batch_size / batch_time_nw * args.world_size)
end = time.time()
if i % args.print_freq == 0 and args.rank == 0:
progress.display(i)
print('NPU: {}, solve {} batchs'.format(args.rank, i))
def validate(val_loader, model, criterion, args):
batch_time = AverageMeter('Time', ':6.3f')
losses = AverageMeter('Loss', ':.4e')
top1 = AverageMeter('Acc@1', ':6.2f')
top5 = AverageMeter('Acc@5', ':6.2f')
progress = ProgressMeter(
len(val_loader),
[batch_time, losses, top1, top5],
prefix='Test: ',
fpath='./log.txt')
model.eval()
with torch.no_grad():
end = time.time()
for i, (images, target) in enumerate(val_loader):
loc = 'npu:{}'.format(args.npu)
target = target.to(torch.int32)
images, target = images.to(loc, non_blocking=False), target.to(loc, non_blocking=False)
output = model(images)
loss = criterion(output, target)
acc1, acc5 = accuracy(output, target, topk=(1, 5))
losses.update(loss.item(), images.size(0))
top1.update(acc1[0], images.size(0))
top5.update(acc5[0], images.size(0))
batch_time.update(time.time() - end)
end = time.time()
if i % args.print_freq == 0 and args.rank == 0:
progress.display(i)
print(' * Acc@1 {top1.avg:.3f} Acc@5 {top5.avg:.3f}'
.format(top1=top1, top5=top5))
file = open('log.txt', 'a')
print(' * Acc@1 {top1.avg:.3f} Acc@5 {top5.avg:.3f}'
.format(top1=top1, top5=top5),
file=file)
file.close()
return top1.avg
def runprof(train_loader, model, criterion, optimizer, epoch, args):
model.train()
prefetcher = data_prefetcher(train_loader)
images, target = prefetcher.next()
i = -1
while images is not None:
i += 1
if args.npu is not None:
images = images.cuda(args.npu, non_blocking=True)
if 'npu' in CALCULATE_DEVICE:
target = target.to(torch.int32)
images, target = images.to(CALCULATE_DEVICE, non_blocking=True), target.to(CALCULATE_DEVICE, non_blocking=True)
if i >= 5:
with torch.autograd.profiler.profile(use_cuda=True) as prof:
out = model(images)
loss = criterion(out, target)
optimizer.zero_grad()
with amp.scale_loss(loss, optimizer) as scaled_loss:
scaled_loss.backward()
optimizer.step()
prof.export_chrome_trace("output.prof")
return
else:
output = model(images)
loss = criterion(output, target)
optimizer.zero_grad()
with amp.scale_loss(loss, optimizer) as scaled_loss:
scaled_loss.backward()
optimizer.step()
images, target = prefetcher.next()
def save_checkpoint(state, is_best, args, filename='checkpoint.pth.tar'):
if not os.path.exists(CACHE_TRAINING_URL):
os.makedirs(CACHE_TRAINING_URL, 0o755)
checkpoint_save_path = os.path.join(CACHE_TRAINING_URL, filename)
torch.save(state, checkpoint_save_path)
if is_best:
args.is_best_name = args.train_url + 'model_best_acc%.4f_epoch%d.pth.tar' % (state['best_acc1'], state['epoch'])
mox.file.copy_parallel(checkpoint_save_path, args.is_best_name)
class LabelSmoothingCrossEntropy(nn.Module):
def __init__(self, eps=0.1, reduction='mean'):
super(LabelSmoothingCrossEntropy, self).__init__()
self.eps = eps
self.reduction = reduction
def forward(self, output, target):
c = output.size()[-1]
log_preds = F.log_softmax(output, dim=-1)
if self.reduction == 'sum':
loss = -log_preds.sum()
else:
loss = -log_preds.sum(dim=-1)
if self.reduction == 'mean':
loss = loss.mean()
return loss * self.eps / c + (1 - self.eps) * F.nll_loss(log_preds, target, reduction=self.reduction)
class AverageMeter(object):
"""Computes and stores the average and current value"""
def __init__(self, name, fmt=':f'):
self.name = name
self.fmt = fmt
self.reset()
def reset(self):
self.val = 0
self.avg = 0
self.sum = 0
self.count = 0
def update(self, val, n=1):
self.val = val
self.sum += val * n
self.count += n
self.avg = self.sum / self.count
def __str__(self):
fmtstr = '{name} {val' + self.fmt + '} ({avg' + self.fmt + '})'
return fmtstr.format(**self.__dict__)
class ProgressMeter(object):
def __init__(self, num_batches, meters, prefix="", fpath=None):
self.batch_fmtstr = self._get_batch_fmtstr(num_batches)
self.meters = meters
self.prefix = prefix
if fpath is not None:
self.file = open(fpath, 'a')
def display(self, batch):
entries = [self.prefix + self.batch_fmtstr.format(batch)]
entries += [str(meter) for meter in self.meters]
print('\t'.join(entries))
if self.file is not None:
self.file.write('\t'.join(entries))
self.file.write('\n')
self.file.flush()
def _get_batch_fmtstr(self, num_batches):
num_digits = len(str(num_batches // 1))
fmt = '{:' + str(num_digits) + 'd}'
return '[' + fmt + '/' + fmt.format(num_batches) + ']'
def close(self):
if self.file is not None:
self.file.close()
class CosineWithWarmup(torch.optim.lr_scheduler._LRScheduler):
""" Implements a schedule where the first few epochs are linear warmup, and
then there's cosine annealing after that."""
def __init__(self, optimizer: torch.optim.Optimizer, warmup_len: int,
warmup_start_multiplier: float, max_epochs: int,
last_epoch: int = -1):
if warmup_len < 0:
raise ValueError("Warmup can't be less than 0.")
self.warmup_len = warmup_len
if not (0.0 <= warmup_start_multiplier <= 1.0):
raise ValueError(
"Warmup start multiplier must be within [0.0, 1.0].")
self.warmup_start_multiplier = warmup_start_multiplier
if max_epochs < 1 or max_epochs < warmup_len:
raise ValueError("Max epochs must be longer than warm-up.")
self.max_epochs = max_epochs
self.cosine_len = self.max_epochs - self.warmup_len
self.eta_min = 0.0
super().__init__(optimizer, last_epoch)
def get_lr(self):
if self.last_epoch > self.max_epochs:
raise ValueError(
"Epoch may not be greater than max_epochs={}.".format(
self.max_epochs))
if self.last_epoch < self.warmup_len or self.cosine_len == 0:
slope = (1.0 - self.warmup_start_multiplier) / self.warmup_len
lr_multiplier = self.warmup_start_multiplier + slope * self.last_epoch
else:
cosine_epoch = self.last_epoch - self.warmup_len
lr_multiplier = self.eta_min + (1.0 - self.eta_min) * (
1 + math.cos(math.pi * cosine_epoch / self.cosine_len)) / 2
assert lr_multiplier >= 0.0
return [base_lr * lr_multiplier for base_lr in self.base_lrs]
def accuracy(output, target, topk=(1,)):
"""Computes the accuracy over the k top predictions for the specified values of k"""
with torch.no_grad():
maxk = max(topk)
batch_size = target.size(0)
_, pred = output.topk(maxk, 1, True, True)
pred = pred.t()
correct = pred.eq(target.view(1, -1).expand_as(pred))
res = []
for k in topk:
correct_k = correct[:k].reshape(-1).float().sum(0, keepdim=True)
res.append(correct_k.mul_(100.0 / batch_size))
return res
def device_id_to_process_device_map(device_list):
devices = device_list.split(",")
devices = [int(x) for x in devices]
devices.sort()
process_device_map = dict()
for process_id, device_id in enumerate(devices):
process_device_map[process_id] = device_id
return process_device_map
def fast_collate(batch):
imgs = [img[0] for img in batch]
targets = torch.tensor([target[1] for target in batch], dtype=torch.int64)
w = imgs[0].size[0]
h = imgs[0].size[1]
tensor = torch.zeros((len(imgs), 3, h, w), dtype=torch.uint8)
for i, img in enumerate(imgs):
nump_array = np.asarray(img, dtype=np.uint8)
tens = torch.from_numpy(nump_array)
if (nump_array.ndim < 3):
nump_array = np.expand_dims(nump_array, axis=-1)
nump_array = np.rollaxis(nump_array, 2)
tensor[i] += torch.from_numpy(nump_array)
return tensor, targets
class data_prefetcher():
def __init__(self, loader):
self.loader = iter(loader)
self.stream = torch.npu.Stream()
self.mean = torch.tensor([0.485 * 255, 0.456 * 255, 0.406 * 255]).npu().view(1, 3, 1, 1)
self.std = torch.tensor([0.229 * 255, 0.224 * 255, 0.225 * 255]).npu().view(1, 3, 1, 1)
self.preload()
def preload(self):
try:
self.next_input, self.next_target = next(self.loader)
except StopIteration:
self.next_input = None
self.next_target = None
return
with torch.npu.stream(self.stream):
self.next_input = self.next_input.npu(non_blocking=True)
self.next_target = self.next_target.npu(non_blocking=True)
self.next_input = self.next_input.float()
self.next_input = self.next_input.sub_(self.mean).div_(self.std)
def next(self):
torch.npu.current_stream().wait_stream(self.stream)
input = self.next_input
target = self.next_target
self.preload()
return input, target
if __name__ == '__main__':
main()