'''Train CIFAR10 with PyTorch.'''
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.backends.cudnn as cudnn
import torch.distributed as dist
from torch_npu.contrib import transfer_to_npu
import torchvision
import torchvision.transforms as transforms
import os
import argparse
from models import *
from bugfix import progress_bar
import apex
from apex import amp
parser = argparse.ArgumentParser(description='PyTorch CIFAR10 Training')
parser.add_argument('--batch_size', default=128, type=int, help='train batch size')
parser.add_argument('--lr', default=0.1, type=float, help='learning rate')
parser.add_argument('--n_epochs', type=int, default='200', help='total training epochs')
parser.add_argument('--num_workers', type=int, default='2', help='dataset parallel workers num')
parser.add_argument('--resume', '-r', action='store_true',
help='resume from checkpoint')
parser.add_argument('--local_rank', type=int, default=0, help='local rank in ddp')
parser.add_argument('--world_size', type=int, default=1, help='world size in ddp')
args = parser.parse_args()
local_rank = args.local_rank
torch.npu.set_device(local_rank)
dist.init_process_group(backend='hccl', init_method='tcp://127.0.0.1:23333',
world_size=args.world_size, rank=args.local_rank)
device = 'cuda' if torch.cuda.is_available() else 'cpu'
best_acc = 0
start_epoch = 0
train_epoch = args.n_epochs
print('==> Preparing data..')
transform_train = transforms.Compose([
transforms.RandomCrop(32, padding=4),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
transform_test = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
trainset = torchvision.datasets.CIFAR10(
root='./data', train=True, download=True, transform=transform_train)
trainloader = torch.utils.data.DataLoader(
trainset, batch_size=args.batch_size, shuffle=True, num_workers=args.num_workers)
testset = torchvision.datasets.CIFAR10(
root='./data', train=False, download=True, transform=transform_test)
testloader = torch.utils.data.DataLoader(
testset, batch_size=100, shuffle=False, num_workers=args.num_workers)
classes = ('plane', 'car', 'bird', 'cat', 'deer',
'dog', 'frog', 'horse', 'ship', 'truck')
print('==> Building model..')
net = ResNet50()
net = net.npu()
if args.resume:
print('==> Resuming from checkpoint..')
assert os.path.isdir('checkpoint'), 'Error: no checkpoint directory found!'
checkpoint = torch.load('./checkpoint/ckpt.pth')
net.load_state_dict(checkpoint['net'])
best_acc = checkpoint['acc']
start_epoch = checkpoint['epoch']
criterion = nn.CrossEntropyLoss()
optimizer = apex.optimizers.NpuFusedSGD(net.parameters(), lr=args.lr,
momentum=0.9, weight_decay=5e-4)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=200)
net, optimizer = amp.initialize(net, optimizer, opt_level='O2', combine_grad=True, loss_scale=128.)
net = torch.nn.parallel.DistributedDataParallel(net, device_ids=[local_rank], broadcast_buffers=False)
def train(epoch):
print('\nEpoch: %d' % epoch)
net.train()
train_loss = 0
correct = 0
total = 0
for batch_idx, (inputs, targets) in enumerate(trainloader):
inputs, targets = inputs.to(device), targets.to(device)
optimizer.zero_grad()
outputs = net(inputs)
loss = criterion(outputs, targets)
with amp.scale_loss(loss, optimizer) as scaled_loss:
scaled_loss.backward()
optimizer.step()
train_loss += loss.item()
_, predicted = outputs.max(1)
total += targets.size(0)
correct += predicted.eq(targets).sum().item()
if torch.distributed.get_rank() == 0:
avg_step_time = progress_bar(batch_idx, len(trainloader), 'Loss: %.3f | Acc: %.3f%% (%d/%d)'
% (train_loss/(batch_idx+1), 100.*correct/total, correct, total))
else:
avg_step_time = None
print(f"Train: average_step_time: {avg_step_time} train_loss: {train_loss/(batch_idx+1)}", flush=True)
def test(epoch):
global best_acc
net.eval()
test_loss = 0
correct = 0
total = 0
with torch.no_grad():
for batch_idx, (inputs, targets) in enumerate(testloader):
inputs, targets = inputs.to(device), targets.to(device)
outputs = net(inputs)
loss = criterion(outputs, targets)
test_loss += loss.item()
_, predicted = outputs.max(1)
total += targets.size(0)
correct += predicted.eq(targets).sum().item()
if torch.distributed.get_rank() == 0:
avg_step_time = progress_bar(batch_idx, len(testloader), 'Loss: %.3f | Acc: %.3f%% (%d/%d)'
% (test_loss/(batch_idx+1), 100.*correct/total, correct, total))
else:
avg_step_time = None
acc = 100.*correct/total
if acc > best_acc:
print('Saving..')
state = {
'net': net.state_dict(),
'acc': acc,
'epoch': epoch,
}
if not os.path.isdir('checkpoint'):
os.mkdir('checkpoint')
torch.save(state, './checkpoint/ckpt.pth')
best_acc = acc
print(f"Val: average_step_time: {avg_step_time} val_loss: {test_loss/(batch_idx+1)} ",
f"acc: {acc} best_acc: {best_acc}", flush=True)
for epoch in range(start_epoch, start_epoch+train_epoch):
train(epoch)
test(epoch)
scheduler.step()