from enum import Enum
from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Type, Union
import torch
from detectron2.config import CfgNode
from .lr_scheduler import WarmupCosineLR, WarmupMultiStepLR
_GradientClipperInput = Union[torch.Tensor, Iterable[torch.Tensor]]
_GradientClipper = Callable[[_GradientClipperInput], None]
class GradientClipType(Enum):
VALUE = "value"
NORM = "norm"
def _create_gradient_clipper(cfg: CfgNode) -> _GradientClipper:
"""
Creates gradient clipping closure to clip by value or by norm,
according to the provided config.
"""
cfg = cfg.clone()
def clip_grad_norm(p: _GradientClipperInput):
torch.nn.utils.clip_grad_norm_(p, cfg.CLIP_VALUE, cfg.NORM_TYPE)
def clip_grad_value(p: _GradientClipperInput):
torch.nn.utils.clip_grad_value_(p, cfg.CLIP_VALUE)
_GRADIENT_CLIP_TYPE_TO_CLIPPER = {
GradientClipType.VALUE: clip_grad_value,
GradientClipType.NORM: clip_grad_norm,
}
return _GRADIENT_CLIP_TYPE_TO_CLIPPER[GradientClipType(cfg.CLIP_TYPE)]
def _generate_optimizer_class_with_gradient_clipping(
optimizer_type: Type[torch.optim.Optimizer], gradient_clipper: _GradientClipper
) -> Type[torch.optim.Optimizer]:
"""
Dynamically creates a new type that inherits the type of a given instance
and overrides the `step` method to add gradient clipping
"""
def optimizer_wgc_step(self, closure=None):
for group in self.param_groups:
for p in group["params"]:
gradient_clipper(p)
super(type(self), self).step(closure)
OptimizerWithGradientClip = type(
optimizer_type.__name__ + "WithGradientClip",
(optimizer_type,),
{"step": optimizer_wgc_step},
)
return OptimizerWithGradientClip
def maybe_add_gradient_clipping(
cfg: CfgNode, optimizer: torch.optim.Optimizer
) -> torch.optim.Optimizer:
"""
If gradient clipping is enabled through config options, wraps the existing
optimizer instance of some type OptimizerType to become an instance
of the new dynamically created class OptimizerTypeWithGradientClip
that inherits OptimizerType and overrides the `step` method to
include gradient clipping.
Args:
cfg: CfgNode
configuration options
optimizer: torch.optim.Optimizer
existing optimizer instance
Return:
optimizer: torch.optim.Optimizer
either the unmodified optimizer instance (if gradient clipping is
disabled), or the same instance with adjusted __class__ to override
the `step` method and include gradient clipping
"""
if not cfg.SOLVER.CLIP_GRADIENTS.ENABLED:
return optimizer
grad_clipper = _create_gradient_clipper(cfg.SOLVER.CLIP_GRADIENTS)
OptimizerWithGradientClip = _generate_optimizer_class_with_gradient_clipping(
type(optimizer), grad_clipper
)
optimizer.__class__ = OptimizerWithGradientClip
return optimizer
def build_optimizer(cfg: CfgNode, model: torch.nn.Module) -> torch.optim.Optimizer:
"""
Build an optimizer from config.
"""
params = get_default_optimizer_params(
model,
base_lr=cfg.SOLVER.BASE_LR,
weight_decay=cfg.SOLVER.WEIGHT_DECAY,
weight_decay_norm=cfg.SOLVER.WEIGHT_DECAY_NORM,
bias_lr_factor=cfg.SOLVER.BIAS_LR_FACTOR,
weight_decay_bias=cfg.SOLVER.WEIGHT_DECAY_BIAS,
)
optimizer = torch.optim.SGD(
params, cfg.SOLVER.BASE_LR, momentum=cfg.SOLVER.MOMENTUM, nesterov=cfg.SOLVER.NESTEROV
)
return maybe_add_gradient_clipping(cfg, optimizer)
def get_default_optimizer_params(
model: torch.nn.Module,
base_lr,
weight_decay,
weight_decay_norm,
bias_lr_factor=1.0,
weight_decay_bias=None,
overrides: Optional[Dict[str, Dict[str, float]]] = None,
):
"""
Get default param list for optimizer
Args:
overrides (dict: str -> (dict: str -> float)):
if not `None`, provides values for optimizer hyperparameters
(LR, weight decay) for module parameters with a given name; e.g.
{"embedding": {"lr": 0.01, "weight_decay": 0.1}} will set the LR and
weight decay values for all module parameters named `embedding` (default: None)
"""
if weight_decay_bias is None:
weight_decay_bias = weight_decay
norm_module_types = (
torch.nn.BatchNorm1d,
torch.nn.BatchNorm2d,
torch.nn.BatchNorm3d,
torch.nn.SyncBatchNorm,
torch.nn.GroupNorm,
torch.nn.InstanceNorm1d,
torch.nn.InstanceNorm2d,
torch.nn.InstanceNorm3d,
torch.nn.LayerNorm,
torch.nn.LocalResponseNorm,
)
params: List[Dict[str, Any]] = []
memo: Set[torch.nn.parameter.Parameter] = set()
for module in model.modules():
for module_param_name, value in module.named_parameters(recurse=False):
if not value.requires_grad:
continue
if value in memo:
continue
memo.add(value)
schedule_params = {
"lr": base_lr,
"weight_decay": weight_decay,
}
if isinstance(module, norm_module_types):
schedule_params["weight_decay"] = weight_decay_norm
elif module_param_name == "bias":
schedule_params["lr"] = base_lr * bias_lr_factor
schedule_params["weight_decay"] = weight_decay_bias
if overrides is not None and module_param_name in overrides:
schedule_params.update(overrides[module_param_name])
params += [
{
"params": [value],
"lr": schedule_params["lr"],
"weight_decay": schedule_params["weight_decay"],
}
]
return params
def build_lr_scheduler(
cfg: CfgNode, optimizer: torch.optim.Optimizer
) -> torch.optim.lr_scheduler._LRScheduler:
"""
Build a LR scheduler from config.
"""
name = cfg.SOLVER.LR_SCHEDULER_NAME
if name == "WarmupMultiStepLR":
return WarmupMultiStepLR(
optimizer,
cfg.SOLVER.STEPS,
cfg.SOLVER.GAMMA,
warmup_factor=cfg.SOLVER.WARMUP_FACTOR,
warmup_iters=cfg.SOLVER.WARMUP_ITERS,
warmup_method=cfg.SOLVER.WARMUP_METHOD,
)
elif name == "WarmupCosineLR":
return WarmupCosineLR(
optimizer,
cfg.SOLVER.MAX_ITER,
warmup_factor=cfg.SOLVER.WARMUP_FACTOR,
warmup_iters=cfg.SOLVER.WARMUP_ITERS,
warmup_method=cfg.SOLVER.WARMUP_METHOD,
)
else:
raise ValueError("Unknown LR scheduler: {}".format(name))