import os
import sys
from argparse import Namespace
import torch
import torch_npu
import pytest
from pytest_mock import MockFixture
from mindspeed.megatron_adaptor import repatch
from megatron.core.transformer import TransformerConfig
from megatron.core.transformer.enums import ModelType
from megatron.core.parallel_state import destroy_model_parallel
from megatron.core import mpu
from megatron.core import parallel_state
from megatron.core.models.gpt.gpt_layer_specs import get_gpt_layer_local_spec
from megatron.core.transformer.transformer_block import TransformerBlock
from megatron.core.transformer.moe.moe_utils import (
save_to_aux_losses_tracker,
clear_aux_losses_tracker,
)
from megatron.training import get_args
from megatron.training.arguments import parse_args, validate_args
from megatron.training.global_vars import set_args
from megatron.training.training import num_floating_point_operations
from megatron.training.initialize import (
_initialize_distributed,
_set_random_seed,
)
from mindspeed.core.pipeline_parallel.noop_layers.adaptor import (
origin_flop_calculator,
mindspeed_calc_flop,
)
from mindspeed.core.pipeline_parallel.noop_layers.moe_metrics_tracker import (
get_mean,
track_moe_metrics_impl,
)
from mindspeed.core.pipeline_parallel.noop_layers.transformer import (
build_layers_impl,
)
from mindspeed.core.pipeline_parallel.noop_layers.calc_flop import calc_flop
from mindspeed.core.transformer.transformer_block import _get_layer_offset
from mindspeed.core.pipeline_parallel.noop_layers.adaptor import (
NoopTransformerLayer,
)
from tests_extend.unit_tests.common import DistributedTest
os.environ["CUDA_DEVICE_MAX_CONNECTIONS"] = "1"
_ARGS = None
def track_moe_metrics_mock(loss_scale, writer, total_loss_dict=None):
tracker = parallel_state.get_moe_layer_wise_logging_tracker()
args = get_args()
if writer is not None:
aux_losses = {
k: v["values"].float() * loss_scale
for k, v in tracker.items()
}
for name, loss_list in aux_losses.items():
loss_list_mean = get_mean(loss_list, args.num_layers, args.noop_layers)
if total_loss_dict is not None:
if name not in total_loss_dict:
total_loss_dict[name] = loss_list_mean
else:
total_loss_dict[name] += loss_list_mean
clear_aux_losses_tracker()
def track_moe_metrics_megatron_original_mock(
loss_scale,
writer,
total_loss_dict=None,
):
tracker = parallel_state.get_moe_layer_wise_logging_tracker()
if writer is not None:
aux_losses = {
k: v["values"].float() * loss_scale
for k, v in tracker.items()
}
for name, loss_list in aux_losses.items():
if total_loss_dict is not None:
if name not in total_loss_dict:
total_loss_dict[name] = loss_list.mean()
else:
total_loss_dict[name] += loss_list.mean()
clear_aux_losses_tracker()
class TestNoopLayer(DistributedTest):
world_size = 8
def init_transformer_block(self):
args = get_args()
self.transformer_config = TransformerConfig(
num_layers=args.num_layers,
hidden_size=args.hidden_size,
num_attention_heads=args.num_attention_heads,
ffn_hidden_size=args.hidden_size,
use_cpu_initialization=args.use_cpu_initialization,
fp16=False,
sequence_parallel=args.sequence_parallel,
tensor_model_parallel_size=args.tensor_model_parallel_size,
expert_model_parallel_size=args.expert_model_parallel_size,
)
transformer_layer_spec = get_gpt_layer_local_spec(
args.num_experts,
args.moe_grouped_gemm,
args.qk_layernorm,
args.multi_latent_attention,
)
self.parallel_transformer = TransformerBlock(
config=self.transformer_config,
spec=transformer_layer_spec,
post_layer_norm=False,
pre_process=False,
post_process=False,
)
def set_args(self, tp_pp_vp_stage, num_layers, noop_layers):
global _ARGS
if _ARGS is None:
args = parse_args(ignore_unknown_args=True)
else:
args = _ARGS
(tp, pp, vp_stage) = tp_pp_vp_stage
args.tensor_model_parallel_size = tp
args.pipeline_model_parallel_size = pp
args.pipeline_dtype = torch.float32
args.num_layers_per_virtual_pipeline_stage = vp_stage
args.model_type = ModelType.encoder_or_decoder
args.noop_layers = noop_layers
args.num_layers = num_layers
args.hidden_size = 8
args.ffn_hidden_size = 8
args.num_attention_heads = 8
args.tokenizer_type = "Llama2Tokenizer"
args.tokenizer_model = "/home/dataset/model/llama-2-7b-hf/tokenizer.model"
args.seq_length = 128
args.max_position_embeddings = 128
args.micro_batch_size = 1
args.global_batch_size = 8
args.lr_warmup_fraction = 0.01
args.bf16 = True
args.overlap_p2p_comm = True
args.data_path = "/home/dataset/llama2/alpaca_text_document"
args.seed = 1234
args.batch_size = None
args.warmup = None
args.model_parallel_size = None
args.checkpoint_activations = False
args.recompute_activations = False
args.encoder_num_layers = None
args.sequence_parallel = None
args.encoder_seq_length = None
args.start_weight_decay = None
args.end_weight_decay = None
args.num_query_groups = None
validate_args(args)
set_args(args)
if _ARGS is None:
repatch(vars(args))
_ARGS = args
def initialize_distributed(self):
args = get_args()
destroy_model_parallel()
_initialize_distributed(None, None)
_set_random_seed(args.seed, args.data_parallel_random_init)
@pytest.mark.parametrize(
"tp_pp_vp_stage",
[(2, 2, 1), (2, 2, None), (2, 1, None), (1, 2, None), (1, 1, None)],
)
@pytest.mark.parametrize("num_layers", [24, 10])
@pytest.mark.parametrize("noop_layers", ["0,4,7,8,9"])
def test_valid_noop_layers(self, tp_pp_vp_stage, num_layers, noop_layers):
self.set_args(tp_pp_vp_stage, num_layers, noop_layers)
self.initialize_distributed()
self.init_transformer_block()
args = get_args()
assert num_layers == args.num_layers
assert num_layers == self.transformer_config.num_layers
assert args.noop_layers == {0, 4, 7, 8, 9}
for i, layer in enumerate(self.parallel_transformer.layers):
offset = _get_layer_offset(args)
global_num_layers = i + offset + 1
if (
isinstance(args.noop_layers, set)
and global_num_layers - 1 in args.noop_layers
):
assert isinstance(layer, NoopTransformerLayer)
else:
assert not isinstance(layer, NoopTransformerLayer)
@pytest.mark.parametrize(
"tp_pp_vp_stage",
[(2, 2, 1), (2, 2, None), (2, 1, None), (1, 2, None), (1, 1, None)],
)
@pytest.mark.parametrize("num_layers", [10])
@pytest.mark.parametrize("noop_layers", ["0,2,15,6,8"])
def test_invalid_noop_layers_out_of_range(
self, tp_pp_vp_stage, num_layers, noop_layers
):
with pytest.raises(AssertionError) as context:
self.set_args(tp_pp_vp_stage, num_layers, noop_layers)
assert (
"each element in args.noop_layers(0,2,15,6,8) "
"should bigger or equal to 0 and smaller than "
"args.num_layers(10)"
) in str(context.value)
@pytest.mark.parametrize(
"tp_pp_vp_stage",
[(2, 2, 1), (2, 2, None), (2, 1, None), (1, 2, None), (1, 1, None)],
)
@pytest.mark.parametrize("num_layers", [4])
@pytest.mark.parametrize("noop_layers", ["0,1,1,3"])
def test_moe_metrics_with_noop_layers(
self, tp_pp_vp_stage, num_layers, noop_layers
):
self.set_args(tp_pp_vp_stage, num_layers, noop_layers)
self.initialize_distributed()
self.init_transformer_block()
args = get_args()
loss = torch.tensor(1).npu()
name = "load_balancing_loss"
for layer_number in range(1, num_layers + 1):
if (
isinstance(args.noop_layers, set)
and layer_number - 1 in args.noop_layers
) or args.noop_layers is None:
save_to_aux_losses_tracker(
name,
loss,
layer_number,
num_layers,
)
total_loss_dict = dict()
track_moe_metrics_mock(1, True, total_loss_dict)
assert total_loss_dict.get(name) == 3
del parallel_state._MOE_LAYER_WISE_LOGGING_TRACKER[name]
@pytest.mark.parametrize(
"tp_pp_vp_stage",
[(2, 2, 1), (2, 2, None), (2, 1, None), (1, 2, None), (1, 1, None)],
)
@pytest.mark.parametrize("num_layers", [4])
@pytest.mark.parametrize("noop_layers", [None])
def test_moe_metrics_without_noop_layers(
self, tp_pp_vp_stage, num_layers, noop_layers
):
self.set_args(tp_pp_vp_stage, num_layers, noop_layers)
self.initialize_distributed()
self.init_transformer_block()
args = get_args()
loss = torch.tensor(1).npu()
name = "load_balancing_loss"
for layer_number in range(1, num_layers + 1):
if (
isinstance(args.noop_layers, set)
and layer_number - 1 in args.noop_layers
) or args.noop_layers is None:
save_to_aux_losses_tracker(
name,
loss,
layer_number,
num_layers,
)
total_loss_dict = dict()
track_moe_metrics_mock(1, True, total_loss_dict)
assert total_loss_dict.get(name) == 1
total_loss_dict.clear()
del parallel_state._MOE_LAYER_WISE_LOGGING_TRACKER[name]
@pytest.mark.parametrize(
"tp_pp_vp_stage",
[(2, 2, 1), (2, 2, None), (2, 1, None), (1, 2, None), (1, 1, None)],
)
@pytest.mark.parametrize("num_layers", [4])
@pytest.mark.parametrize("noop_layers", ["0,3"])
def test_moe_metrics_megatron_original_with_noop_layers(
self, tp_pp_vp_stage, num_layers, noop_layers
):
self.set_args(tp_pp_vp_stage, num_layers, noop_layers)
self.initialize_distributed()
self.init_transformer_block()
args = get_args()
loss = torch.tensor(1).npu()
name = "load_balancing_loss"
for layer_number in range(1, num_layers + 1):
if (
isinstance(args.noop_layers, set)
and layer_number - 1 in args.noop_layers
) or args.noop_layers is None:
save_to_aux_losses_tracker(
name,
loss,
layer_number,
num_layers,
)
total_loss_dict = dict()
track_moe_metrics_megatron_original_mock(1, True, total_loss_dict)
assert total_loss_dict.get(name) == 0.5
del parallel_state._MOE_LAYER_WISE_LOGGING_TRACKER[name]
@pytest.mark.parametrize(
"tp_pp_vp_stage",
[(2, 2, 1), (2, 2, None), (2, 1, None), (1, 2, None), (1, 1, None)],
)
@pytest.mark.parametrize("num_layers", [4])
@pytest.mark.parametrize("noop_layers", [None])
def test_moe_metrics_megatron_original_without_noop_layers(
self, tp_pp_vp_stage, num_layers, noop_layers
):
self.set_args(tp_pp_vp_stage, num_layers, noop_layers)
self.initialize_distributed()
self.init_transformer_block()
args = get_args()
loss = torch.tensor(1).npu()
name = "load_balancing_loss"
for layer_number in range(1, num_layers + 1):
if (
isinstance(args.noop_layers, set)
and layer_number - 1 in args.noop_layers
) or args.noop_layers is None:
save_to_aux_losses_tracker(
name,
loss,
layer_number,
num_layers,
)
total_loss_dict = dict()
track_moe_metrics_mock(1, True, total_loss_dict)
assert total_loss_dict.get(name) == 1
del parallel_state._MOE_LAYER_WISE_LOGGING_TRACKER[name]
@pytest.mark.parametrize(
"num_layers, noop_layers, megatron_expected, mindspeed_expected",
[
(4, None, 184817811456.0, 184817811456.0),
(4, {1, 2}, 184817811456.0, 114152177664.0),
],
)
def test_cacl_flop_adaptor(
self, num_layers, noop_layers, megatron_expected, mindspeed_expected
):
args = Namespace()
args.num_layers = num_layers
args.noop_layers = noop_layers
args.kv_channels = 128
args.num_attention_heads = 6
args.hidden_size = 768
args.seq_length = 768
args.ffn_hidden_size = 3072
args.padded_vocab_size = 12288
args.group_query_attention = False
args.swiglu = False
args.num_experts = None
args.is_hybrid_model = None
args.mtp_num_layers = None
args.moe_ffn_hidden_size = 0
args.multi_latent_attention = None
args.moe_shared_expert_intermediate_size = None
batch_size = 1
megatron_flop = origin_flop_calculator(args, batch_size)
mindspeed_flop = num_floating_point_operations(args, batch_size)
assert mindspeed_calc_flop(args, batch_size) == mindspeed_flop
assert megatron_flop == megatron_expected
assert mindspeed_flop == mindspeed_expected
@pytest.mark.parametrize(
" args, batch_size, expected",
[
(
Namespace(num_layers=2, noop_layers={1, 2}),
1,
2,
),
(
Namespace(num_layers=2, noop_layers=[1, 2]),
2,
2,
),
],
)
def test_cacl_flop(self, args, batch_size, expected):
def func(x, y):
return x
ret = calc_flop(func, args, batch_size)
assert ret.num_layers == expected
@pytest.mark.parametrize(
"values, num_layers, noop_layers, expected",
[
(
torch.tensor([1.0, 2.0, 3.0, 4.0]),
4,
{1, 2},
torch.tensor(5.0),
),
(
torch.tensor([1.0, 2.0, 3.0, 4.0]),
4,
[1, 2],
torch.tensor(2.5),
),
(
torch.tensor([1.0, 2.0, 3.0, 4.0]),
4,
{1, 2, 3, 4},
torch.tensor([torch.inf, torch.inf, torch.inf, torch.inf]),
),
],
)
def test_get_mean(self, values, num_layers, noop_layers, expected):
ret = get_mean(values, num_layers, noop_layers) == expected
if ret.shape == torch.Size([]):
assert ret
elif ret.shape == torch.Size([4]):
assert ret[0]
def test_track_mode_metrics_impl(mocker: MockFixture):
def mocker_func(*args, **kwarg):
return None
def mocker_func1(*agrs, **kwargs):
return {"test": {"values": torch.tensor([1.0, 2.0, 3.0, 4.0])}}
args = Namespace(num_layers=4, noop_layers={1, 2})
reduce_aux_losses_tracker_across_ranks = mocker_func
get_moe_layer_wise_logging_tracker = mocker_func1
clear_aux_losses_tracker = mocker_func
loss_scale = 1.0
args.num_layers = 4
loss_scale = torch.tensor(1.0)
writer = mocker.MagicMock()
writer.add_scalar = mocker_func
total_loss_dict = dict()
per_layer_logging = True
wandb_writer = mocker.MagicMock()
wandb_writer.log = mocker_func
track_moe_metrics_impl(
reduce_aux_losses_tracker_across_ranks=reduce_aux_losses_tracker_across_ranks,
get_moe_layer_wise_logging_tracker=get_moe_layer_wise_logging_tracker,
clear_aux_losses_tracker=clear_aux_losses_tracker,
loss_scale=loss_scale,
iteration=1,
writer=writer,
wandb_writer=wandb_writer,
total_loss_dict=total_loss_dict,
per_layer_logging=per_layer_logging,
)
assert total_loss_dict == {"test": 2.5}
def test_build_layers_impl(mocker: MockFixture):
class MockTransformer:
def __init__(self):
self.config = Namespace(noop_layers={1, 2})
self.submodules = Namespace(
layer_specs=[None, None, None],
layer_norm=True,
)
self.layers = None
self.post_process = False
self.post_layer_norm = False
self.final_layernorm = None
transformer = MockTransformer()
noop_transformer = NoopTransformerLayer
build_module = lambda x, **kwargs: x
mocker.patch(
"mindspeed.core.pipeline_parallel.noop_layers.transformer._get_layer_offset",
return_value=0,
)
build_layers_impl(transformer, noop_transformer, build_module)
assert isinstance(transformer.layers[1], NoopTransformerLayer)
assert isinstance(transformer.layers[2], NoopTransformerLayer)