import math
from collections import defaultdict
import torch
from torch.optim.optimizer import Optimizer
from apex.contrib.combine_tensors import combine_npu
class NpuFusedAdamW(Optimizer):
"""Implements AdamW algorithm.
Currently NPU-only. Requires Apex to be installed via
``pip install -v --no-cache-dir --global-option="--cpp_ext" --global-option="--npu_float_status" ./``.
This version of NPU fused AdamW implements 1 fusions.
* A combine-tensor apply launch that batches the elementwise updates applied to all the model's parameters
into one or a few kernel launches.
:class:`apex.optimizers.NpuFusedAdamW` may be used as a drop-in replacement for ``torch.optim.AdamW``::
opt = apex.optimizers.NpuFusedAdamW(model.parameters(), lr = ....)
...
opt.step()
:class:`apex.optimizers.FusedAdamW` should be used with Amp. Currently, if you wish to use :class:`NpuFusedAdamW`
with Amp, only ``opt_level O1 and O2`` can be choosed::
opt = apex.optimizers.NpuFusedAdamW(model.parameters(), lr = ....)
model, opt = amp.initialize(model, opt, opt_level="O2")
...
opt.step()
The original Adam algorithm was proposed in `Adam: A Method for Stochastic Optimization`_.
The AdamW variant was proposed in `Decoupled Weight Decay Regularization`_.
Arguments:
params (iterable): iterable of parameters to optimize or dicts defining
parameter groups
lr (float, optional, default: 1e-3): learning rate
betas (Tuple[float, float], optional, default: (0.9, 0.999)): coefficients used
for computing running averages of gradient and its square
eps (float, optional, default: 1e-8): term added to the denominator to improve
numerical stability
weight_decay (float, optional, default: 1e-2): weight decay coefficient
amsgrad (boolean, optional, default: False): whether to use the AMSGrad variant of
this algorithm from the paper `On the Convergence of Adam and Beyond`_
.. _Adam\: A Method for Stochastic Optimization:
https://arxiv.org/abs/1412.6980
.. _Decoupled Weight Decay Regularization:
https://arxiv.org/abs/1711.05101
.. _On the Convergence of Adam and Beyond:
https://openreview.net/forum?id=ryQu7f-RZ
"""
def __init__(self, params, lr=1e-3, betas=(0.9, 0.999), eps=1e-8,
weight_decay=1e-2, amsgrad=False):
if lr < 0.0:
raise ValueError("Invalid learning rate: {}".format(lr))
if eps < 0.0:
raise ValueError("Invalid epsilon value: {}".format(eps))
if betas[0] < 0.0 or betas[0] >= 1.0:
raise ValueError("Invalid beta parameter at index 0: {}".format(betas[0]))
if betas[1] < 0.0 or betas[1] >= 1.0:
raise ValueError("Invalid beta parameter at index 1: {}".format(betas[1]))
if weight_decay < 0.0:
raise ValueError("Invalid weight_decay value: {}".format(weight_decay))
defaults = dict(lr=lr, betas=betas, eps=eps,
weight_decay=weight_decay, amsgrad=amsgrad)
self.is_npu_fused_optimizer = True
super(NpuFusedAdamW, self).__init__(params, defaults)
def __setstate__(self, state):
super(NpuFusedAdamW, self).__setstate__(state)
for group in self.param_groups:
group.setdefault('amsgrad', False)
def _init_param_state(self, p, amsgrad):
state = self.state[p]
if len(state) == 0:
state['step'] = 0
state['exp_avg'] = torch.zeros_like(p, memory_format=torch.preserve_format)
state['exp_avg_sq'] = torch.zeros_like(p, memory_format=torch.preserve_format)
if amsgrad:
state['max_exp_avg_sq'] = torch.zeros_like(p, memory_format=torch.preserve_format)
else:
exp_avg_tmp = torch.zeros_like(p, memory_format=torch.preserve_format)
exp_avg_tmp.copy_(state['exp_avg'])
state['exp_avg'] = exp_avg_tmp
exp_avg_sq_tmp = torch.zeros_like(p, memory_format=torch.preserve_format)
exp_avg_sq_tmp.copy_(state['exp_avg_sq'])
state['exp_avg_sq'] = exp_avg_sq_tmp
if amsgrad:
max_exp_avg_sq_tmp = torch.zeros_like(p, memory_format=torch.preserve_format)
max_exp_avg_sq_tmp.copy_(state['max_exp_avg_sq'])
state['max_exp_avg_sq'] = max_exp_avg_sq_tmp
def _combine_group_param_states(self, group_index):
group = self.param_groups[group_index]
stash = self._amp_stash
group_params_list = stash.params_lists_indexed_by_group[group_index]
amsgrad = group['amsgrad']
combined_param_states = []
for params in group_params_list:
step_list = []
exp_avg_list = []
exp_avg_sq_list = []
max_exp_avg_sq_list = []
for p in params:
if p.grad is None:
continue
grad = p.grad
if grad.is_sparse:
raise RuntimeError('NpuFusedAdamW does not support sparse gradients, '
'please consider SparseAdam instead')
self._init_param_state(p, amsgrad)
state = self.state[p]
step_list.append(state['step'])
exp_avg_list.append(state['exp_avg'])
exp_avg_sq_list.append(state['exp_avg_sq'])
if amsgrad:
max_exp_avg_sq_list.append(state['max_exp_avg_sq'])
combined_step = 0
combined_exp_avg = None
combined_exp_avg_sq = None
combined_max_exp_avg_sq = None
if len(exp_avg_list) > 0:
combined_step = step_list[0]
combined_exp_avg = combine_npu(exp_avg_list)
combined_exp_avg_sq = combine_npu(exp_avg_sq_list)
combined_max_exp_avg_sq = combine_npu(max_exp_avg_sq_list)
combined_state = defaultdict(dict)
combined_state['step'] = combined_step
combined_state['exp_avg'] = combined_exp_avg
combined_state['exp_avg_sq'] = combined_exp_avg_sq
combined_state['max_exp_avg_sq'] = combined_max_exp_avg_sq
combined_param_states.append(combined_state)
stash.combined_param_states_indexed_by_group[group_index] = combined_param_states
def _combine_param_states_by_group(self):
stash = self._amp_stash
if stash.param_states_are_combined_by_group:
return
stash.combined_param_states_indexed_by_group = []
for _ in self.param_groups:
stash.combined_param_states_indexed_by_group.append([])
for i, _ in enumerate(self.param_groups):
self._combine_group_param_states(i)
stash.param_states_are_combined_by_group = True
def _group_step(self, group_index):
group = self.param_groups[group_index]
for p in group['params']:
if p.grad is None:
continue
grad = p.grad
if grad.is_sparse:
raise RuntimeError('NpuFusedAdamW does not support sparse gradients, '
'please consider SparseAdam instead')
state_p = self.state[p]
state_p['step'] += 1
amsgrad = group['amsgrad']
beta1, beta2 = group['betas']
stash = self._amp_stash
combined_group_params = stash.combined_params_indexed_by_group[group_index]
combined_group_grads = stash.combined_grads_indexed_by_group[group_index]
combined_group_param_states = stash.combined_param_states_indexed_by_group[group_index]
for combined_param, combined_grad, combined_param_state in zip(combined_group_params,
combined_group_grads,
combined_group_param_states):
if combined_param is None or combined_grad is None:
continue
combined_param.mul_(1 - group['lr'] * group['weight_decay'])
exp_avg, exp_avg_sq = combined_param_state['exp_avg'], combined_param_state['exp_avg_sq']
if amsgrad:
max_exp_avg_sq = combined_param_state['max_exp_avg_sq']
combined_param_state['step'] += 1
bias_correction1 = 1 - beta1 ** combined_param_state['step']
bias_correction2 = 1 - beta2 ** combined_param_state['step']
exp_avg.mul_(beta1).add_(combined_grad, alpha=1 - beta1)
exp_avg_sq.mul_(beta2).addcmul_(combined_grad, combined_grad, value=1 - beta2)
if amsgrad:
torch.max(max_exp_avg_sq, exp_avg_sq, out=max_exp_avg_sq)
denom = (max_exp_avg_sq.sqrt() / math.sqrt(bias_correction2)).add_(group['eps'])
else:
denom = (exp_avg_sq.sqrt() / math.sqrt(bias_correction2)).add_(group['eps'])
step_size = group['lr'] / bias_correction1
combined_param.addcdiv_(exp_avg, denom, value=-step_size)
@torch.no_grad()
def step(self, closure=None):
if not hasattr(self, "_amp_stash"):
raise RuntimeError('apex.optimizers.NpuFusedAdamW should be used with AMP.')
self._check_already_combined_params_and_grads()
self._combine_params_and_grads_by_group()
self._combine_param_states_by_group()
loss = None
if closure is not None:
with torch.enable_grad():
loss = closure()
for i, _ in enumerate(self.param_groups):
self._group_step(i)
return loss