"""test class ModelRunner."""
from unittest.mock import MagicMock, Mock, patch
from unittest import mock
import os
import sys
import json
import tempfile
import shutil
import pytest
import numpy as np
import mindspore as ms
from mindspore import Tensor
from mindformers import MindFormerConfig
from mindformers.model_runner import (
register_auto_class,
get_model,
ModelRunner,
MindIEModelRunner,
InputBuilder,
_get_model_config,
_load_distributed_safetensors,
_load_safetensors,
_check_valid_safetensors_path
)
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
def _create_mock_config_with_dict_access(auto_map):
"""
Create a MagicMock config that supports both attribute and dictionary access.
This is needed because register_auto_class uses both:
- config.model.model_config.auto_map (attribute access)
- config["model"]["model_config"]["auto_map"] (dictionary access)
Args:
auto_map (dict): The auto_map dictionary
Returns:
MagicMock: A config mock that supports both access patterns
"""
model_config_mock = MagicMock()
model_config_mock.auto_map = auto_map
def model_config_getitem(*args, **kwargs):
"""__getitem__ accepts variable arguments"""
key = args[-1] if args else None
if key == "auto_map":
return auto_map
return MagicMock()
model_config_mock.__getitem__ = model_config_getitem
model_mock = MagicMock()
model_mock.model_config = model_config_mock
def model_getitem(*args, **kwargs):
"""__getitem__ accepts variable arguments"""
key = args[-1] if args else None
if key == "model_config":
return model_config_mock
return MagicMock()
model_mock.__getitem__ = model_getitem
config = MagicMock()
config.model = model_mock
def config_getitem(*args, **kwargs):
"""__getitem__ accepts variable arguments"""
key = args[-1] if args else None
if key == "model":
return model_mock
return MagicMock()
config.__getitem__ = config_getitem
return config
class MockRegistryFactory:
"""Factory for creating mock registries and module types"""
@staticmethod
def create_module_type_mock():
"""Create a mock MindFormerModuleType with standard enum values"""
mock_module_type = MagicMock()
mock_module_type.CONFIG = 'CONFIG'
mock_module_type.TOKENIZER = 'TOKENIZER'
mock_module_type.MODELS = 'MODELS'
mock_module_type.PROCESSOR = 'PROCESSOR'
return mock_module_type
@staticmethod
def create_empty_registry():
"""Create an empty registry dictionary"""
return {
'CONFIG': {},
'TOKENIZER': {},
'MODELS': {},
'PROCESSOR': {}
}
@staticmethod
def create_registry_with_config(config_name='test_config'):
"""Create a registry with a pre-registered config"""
registry = MockRegistryFactory.create_empty_registry()
registry['CONFIG'][config_name] = MagicMock()
return registry
class MockConfigFactory:
"""Factory for creating mock configurations"""
@staticmethod
def create_auto_map_config():
"""Create a standard auto_map configuration"""
return {
'AutoConfig': 'module.ConfigClass',
'AutoTokenizer': ['module.TokenizerSlow', 'module.TokenizerFast'],
'AutoModel': 'module.ModelClass',
'AutoProcessor': 'module.ProcessorClass'
}
@staticmethod
def create_model_config(
num_layers=2,
num_heads=4,
n_kv_heads=None,
hidden_size=128,
compute_dtype='float32',
batch_size=1,
seq_length=512,
is_dynamic=False
):
"""Create a mock model configuration"""
mock_config = MagicMock()
mock_config.num_layers = num_layers
mock_config.num_heads = num_heads
mock_config.n_kv_heads = n_kv_heads
mock_config.hidden_size = hidden_size
mock_config.compute_dtype = compute_dtype
mock_config.batch_size = batch_size
mock_config.seq_length = seq_length
mock_config.is_dynamic = is_dynamic
mock_config.num_hidden_layers = num_layers
mock_config.num_attention_heads = num_heads
mock_config.num_key_value_heads = n_kv_heads
return mock_config
@staticmethod
def create_mindformer_config(
arch_type='llama',
use_parallel=False,
moe_config=None,
load_checkpoint=None
):
"""Create a mock MindFormerConfig"""
mock_config = MindFormerConfig()
mock_config.output_dir = './output'
mock_config.model = MagicMock()
mock_config.use_parallel = use_parallel
mock_config.model.arch.type = arch_type
mock_config.model.model_config = {'type': arch_type}
mock_config.moe_config = moe_config
mock_config.load_checkpoint = load_checkpoint
mock_config.context = MagicMock()
mock_config.parallel_config = MagicMock()
mock_config.callbacks = []
callback = MagicMock()
callback.type = 'CheckpointMonitor'
callback.keep_checkpoint_max = 5
mock_config.callbacks.append(callback)
return mock_config
class MockRunnerFactory:
"""Factory for creating mock runners"""
@staticmethod
def create_runner_mock(
num_layers=2,
warmup_step=2,
is_multi_modal=False,
use_legacy=True
):
"""Create a mock MindIEModelRunner instance"""
runner = MagicMock(spec=MindIEModelRunner)
runner.num_layers = num_layers
runner.model = MagicMock()
runner.use_legacy = use_legacy
runner.warmup_step = warmup_step
runner.is_multi_modal_model = is_multi_modal
runner.model_config = MagicMock()
runner.processor = MagicMock()
runner.config = MindFormerConfig()
runner.config.output_dir = './output'
return runner
class TempDirFixture:
"""Context manager for temporary directories"""
def __init__(self):
self.test_dir = None
def __enter__(self):
self.test_dir = tempfile.mkdtemp()
return self.test_dir
def __exit__(self, exc_type, exc_val, exc_tb):
if self.test_dir and os.path.exists(self.test_dir):
shutil.rmtree(self.test_dir)
class TestRegisterAutoClass:
"""Test register_auto_class function"""
@pytest.fixture(autouse=True)
def setup(self):
"""Set up test fixtures"""
auto_map = MockConfigFactory.create_auto_map_config()
self.config = _create_mock_config_with_dict_access(auto_map)
self.config.model.model_config.type = 'test_config'
self.config.model.arch.type = 'test_model'
self.config.processor.tokenizer.type = 'test_tokenizer'
self.model_path = '/test/path'
def _setup_mocks(self, mock_get_class, mock_register, mock_module_type, registry=None):
"""Helper method to setup common mocks"""
mock_module_type.CONFIG = 'CONFIG'
mock_module_type.TOKENIZER = 'TOKENIZER'
mock_module_type.MODELS = 'MODELS'
mock_module_type.PROCESSOR = 'PROCESSOR'
if registry is None:
registry = MockRegistryFactory.create_empty_registry()
mock_register.registry = registry
if mock_get_class:
mock_get_class.return_value = MagicMock()
@patch('mindformers.model_runner.MindFormerModuleType')
@patch('mindformers.model_runner.MindFormerRegister')
@patch('mindformers.model_runner.get_class_from_dynamic_module')
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_register_auto_config(self, mock_get_class, mock_register, mock_module_type):
"""Test registering AutoConfig"""
self._setup_mocks(mock_get_class, mock_register, mock_module_type)
register_auto_class(self.config, self.model_path, 'AutoConfig')
mock_get_class.assert_called_once_with('module.ConfigClass', self.model_path)
mock_register.register_cls.assert_called_once()
@patch('mindformers.model_runner.MindFormerModuleType')
@patch('mindformers.model_runner.MindFormerRegister')
@patch('mindformers.model_runner.get_class_from_dynamic_module')
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_register_auto_tokenizer_fast(self, mock_get_class, mock_register, mock_module_type):
"""Test registering AutoTokenizer with use_fast=True"""
self._setup_mocks(mock_get_class, mock_register, mock_module_type)
register_auto_class(self.config, self.model_path, 'AutoTokenizer', use_fast=True)
mock_get_class.assert_called_once_with('module.TokenizerFast', self.model_path)
@patch('mindformers.model_runner.MindFormerModuleType')
@patch('mindformers.model_runner.MindFormerRegister')
@patch('mindformers.model_runner.get_class_from_dynamic_module')
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_register_auto_tokenizer_slow(self, mock_get_class, mock_register, mock_module_type):
"""Test registering AutoTokenizer with use_fast=False"""
self._setup_mocks(mock_get_class, mock_register, mock_module_type)
register_auto_class(self.config, self.model_path, 'AutoTokenizer', use_fast=False)
mock_get_class.assert_called_once_with('module.TokenizerSlow', self.model_path)
@patch('mindformers.model_runner.MindFormerModuleType')
@patch('mindformers.model_runner.MindFormerRegister')
@patch('mindformers.model_runner.get_class_from_dynamic_module')
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_register_auto_tokenizer_no_fast(self, mock_get_class, mock_register, mock_module_type):
"""Test registering AutoTokenizer when fast tokenizer is None"""
self.config.model.model_config.auto_map['AutoTokenizer'] = ['module.TokenizerSlow', None]
self._setup_mocks(mock_get_class, mock_register, mock_module_type)
register_auto_class(self.config, self.model_path, 'AutoTokenizer', use_fast=True)
mock_get_class.assert_called_once_with('module.TokenizerSlow', self.model_path)
@patch('mindformers.model_runner.MindFormerModuleType')
@patch('mindformers.model_runner.MindFormerRegister')
@patch('mindformers.model_runner.get_class_from_dynamic_module')
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_register_auto_model(self, mock_get_class, mock_register, mock_module_type):
"""Test registering AutoModel"""
self._setup_mocks(mock_get_class, mock_register, mock_module_type)
register_auto_class(self.config, self.model_path, 'AutoModel')
mock_get_class.assert_called_once_with('module.ModelClass', self.model_path)
@patch('mindformers.model_runner.MindFormerModuleType')
@patch('mindformers.model_runner.MindFormerRegister')
@patch('mindformers.model_runner.get_class_from_dynamic_module')
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_register_auto_processor(self, mock_get_class, mock_register, mock_module_type):
"""Test registering AutoProcessor"""
self._setup_mocks(mock_get_class, mock_register, mock_module_type)
register_auto_class(self.config, self.model_path, 'AutoProcessor')
mock_get_class.assert_called_once_with('module.ProcessorClass', self.model_path)
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_register_no_auto_map(self):
"""Test when config has no auto_map"""
self.config.model.model_config.auto_map = None
register_auto_class(self.config, self.model_path, 'AutoConfig')
@patch('mindformers.model_runner.MindFormerModuleType')
@patch('mindformers.model_runner.MindFormerRegister')
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_register_already_registered(self, mock_register, mock_module_type):
"""Test when class is already registered"""
registry = MockRegistryFactory.create_registry_with_config('test_config')
self._setup_mocks(None, mock_register, mock_module_type, registry=registry)
register_auto_class(self.config, self.model_path, 'AutoConfig')
mock_register.register_cls.assert_not_called()
class TestGetModel:
"""Test get_model function"""
@pytest.fixture(autouse=True)
def setup_teardown(self):
"""Set up and tear down test fixtures"""
self.temp_dir = TempDirFixture()
self.test_dir = self.temp_dir.__enter__()
self.yaml_file = os.path.join(self.test_dir, 'config.yaml')
with open(self.yaml_file, 'w', encoding='utf-8') as f:
f.write('model:\n arch:\n type: llama\n model_config:\n type: llama_config\n')
yield
self.temp_dir.__exit__(None, None, None)
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_invalid_path(self):
"""Test with invalid path"""
with pytest.raises(ValueError) as exc_info:
get_model('/nonexistent/path')
assert 'does not exist' in str(exc_info.value)
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_not_directory(self):
"""Test with file instead of directory"""
with pytest.raises(ValueError) as exc_info:
get_model(self.yaml_file)
assert 'does not exist or is not a directory' in str(exc_info.value)
@patch('mindformers.model_runner.AutoTokenizer')
@patch('mindformers.model_runner.MindFormerConfig')
@patch('mindformers.model_runner.register_auto_class')
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_get_model_single_modal(self, mock_register, mock_config_cls, mock_tokenizer):
"""Test getting single modal model"""
mock_config = MockConfigFactory.create_mindformer_config()
mock_config.processor = MagicMock()
mock_config_cls.return_value = mock_config
mock_tok = MagicMock()
mock_tokenizer.from_pretrained.return_value = mock_tok
tokenizer, input_builder = get_model(self.test_dir)
assert tokenizer == mock_tok
assert isinstance(input_builder, InputBuilder)
mock_register.assert_called_once()
@patch('mindformers.model_runner.AutoTokenizer')
@patch('mindformers.model_runner.MindFormerConfig')
@patch('mindformers.model_runner.register_auto_class')
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_get_model_with_kwargs(self, mock_register, mock_config_cls, mock_tokenizer):
"""Test get_model with additional kwargs"""
mock_config = MockConfigFactory.create_mindformer_config()
mock_config_cls.return_value = mock_config
mock_tok = MagicMock()
mock_tokenizer.from_pretrained.return_value = mock_tok
get_model(self.test_dir, revision='main', trust_remote_code=True, use_fast=False)
mock_tokenizer.from_pretrained.assert_called_once_with(
self.test_dir,
revision='main',
trust_remote_code=True,
use_fast=False
)
class TestGetModelConfig:
"""Test _get_model_config function"""
@pytest.fixture(autouse=True)
def setup_teardown(self):
"""Set up and tear down test fixtures"""
self.temp_dir = TempDirFixture()
self.test_dir = self.temp_dir.__enter__()
yield
self.temp_dir.__exit__(None, None, None)
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_get_config_success(self):
"""Test successfully getting config file"""
yaml_file = os.path.join(self.test_dir, 'model_config.yaml')
with open(yaml_file, 'w', encoding='utf-8') as f:
f.write('test: config')
result = _get_model_config(self.test_dir)
assert result == yaml_file
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_no_yaml_file(self):
"""Test when directory has no yaml file"""
with pytest.raises(FileNotFoundError) as exc_info:
_get_model_config(self.test_dir)
assert 'no yaml file' in str(exc_info.value)
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_path_not_exist(self):
"""Test when path doesn't exist"""
with pytest.raises(ValueError) as exc_info:
_get_model_config('/nonexistent/path')
assert 'not exist' in str(exc_info.value)
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_multiple_yaml_files(self):
"""Test when directory has multiple yaml files (should return first one)"""
yaml1 = os.path.join(self.test_dir, 'config1.yaml')
yaml2 = os.path.join(self.test_dir, 'config2.yaml')
with open(yaml1, 'w', encoding='utf-8') as f:
f.write('test: 1')
with open(yaml2, 'w', encoding='utf-8') as f:
f.write('test: 2')
result = _get_model_config(self.test_dir)
assert result.endswith('.yaml')
class TestModel:
"""
Test Model.
"""
def forward(self, input_ids, valid_length_each_example, block_tables, slot_mapping, prefill, use_past,
position_ids=None, spec_mask=None, q_seq_lens=None, adapter_ids=None, prefill_head_indices=None,
mindie_warm_up=False, key_cache=None, value_cache=None):
"""
Check the info of inputs
Args:
input_ids (np.ndarray): rank is 2, and data type is int32.
valid_length_each_example (Union[np.ndarray, list]): rank is 1, and data type is int32.
block_tables (np.ndarray): rank is 2, and data type is int32.
slot_mapping (np.ndarray): rank is 1, and data type is int32.
prefill (bool).
use_past (bool).
position_ids (Union[np.ndarray, list]): rank is 1, and data type is int32.
spec_mask (np.ndarray): rank is 2 or 3, and data type is float16.
q_seq_lens (Union[np.ndarray, list]): rank is 1, and data type is int32.
adapter_ids (list): rank is 1, and data type is string
prefill_head_indices (Union[np.ndarray, list]): rank is 1, and data type is int32.
mindie_warm_up (bool).
Return:
res: (Tensor): given that shape is (2, 16000).
current_index (list): given that length is 1.
"""
assert isinstance(input_ids, np.ndarray) and input_ids.ndim == 2 and input_ids.dtype == np.int32
assert isinstance(valid_length_each_example, (np.ndarray, list))
if isinstance(valid_length_each_example, np.ndarray):
assert valid_length_each_example.ndim == 1 and valid_length_each_example.dtype == np.int32
else:
assert isinstance(valid_length_each_example[0], int)
assert isinstance(block_tables, np.ndarray) and block_tables.ndim == 2 and block_tables.dtype == np.int32
assert isinstance(slot_mapping, np.ndarray) and slot_mapping.ndim == 1 and slot_mapping.dtype == np.int32
assert isinstance(prefill, bool)
assert isinstance(use_past, bool)
assert isinstance(mindie_warm_up, bool)
assert isinstance(key_cache, type(None))
assert isinstance(value_cache, type(None))
if position_ids is not None:
if isinstance(position_ids, np.ndarray):
assert position_ids.ndim == 1 and position_ids.dtype == np.int32
else:
assert isinstance(position_ids[0], int)
if spec_mask is not None:
assert isinstance(spec_mask, np.ndarray) and (2 <= spec_mask.ndim <= 3) and spec_mask.dtype == np.float16
if q_seq_lens is not None:
if isinstance(q_seq_lens, np.ndarray):
assert q_seq_lens.ndim == 1 and q_seq_lens.dtype == np.int32
else:
assert isinstance(q_seq_lens[0], int)
if adapter_ids is not None:
assert isinstance(adapter_ids, list) and adapter_ids.dtype == np.str
if prefill_head_indices is not None:
if isinstance(prefill_head_indices, np.ndarray):
assert prefill_head_indices.ndim == 1 and prefill_head_indices.dtype == np.int32
else:
assert isinstance(prefill_head_indices[0], int)
res = np.arange(32000).reshape(2, -1)
current_index = [1]
return Tensor.from_numpy(res), current_index
class TestModelRunner:
"""Test ModelRunner factory class"""
@pytest.fixture(autouse=True)
def setup_teardown(self):
"""Set up and tear down test fixtures"""
self.temp_dir = TempDirFixture()
self.test_dir = self.temp_dir.__enter__()
self.yaml_file = os.path.join(self.test_dir, 'config.yaml')
with open(self.yaml_file, 'w', encoding='utf-8') as f:
f.write('model:\n arch:\n type: custom_model\n model_config:\n ')
yield
self.temp_dir.__exit__(None, None, None)
@patch('mindformers.model_runner.MindIEModelRunner')
@patch('mindformers.model_runner.MindFormerConfig')
@patch('mindformers.model_runner.models')
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_model_runner_default(self, mock_models, mock_config_cls, mock_runner_cls):
"""Test ModelRunner with default model type"""
mock_config = MockConfigFactory.create_mindformer_config()
mock_config_cls.return_value = mock_config
mock_models.__all__ = ['llama']
mock_runner = MagicMock()
mock_runner_cls.return_value = mock_runner
result = ModelRunner(
model_path=self.test_dir,
npu_mem_size=1,
cpu_mem_size=1,
block_size=128
)
assert result == mock_runner
mock_runner_cls.assert_called_once()
@patch('mindformers.model_runner.MindIEModelRunner')
@patch('mindformers.model_runner.MindFormerConfig')
@patch('mindformers.model_runner.models')
@patch('importlib.import_module')
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_model_runner_custom_type_import_success(self, mock_import, mock_models,
mock_config_cls, mock_default_runner):
"""Test ModelRunner with custom model type that imports successfully"""
mock_config = MockConfigFactory.create_mindformer_config(arch_type='custom_model')
mock_config_cls.return_value = mock_config
mock_models.__all__ = ['llama', 'gpt']
mock_custom_runner_cls = MagicMock()
mock_custom_module = MagicMock()
mock_custom_module.MindIEModelRunner = mock_custom_runner_cls
mock_import.return_value = mock_custom_module
mock_runner = MagicMock()
mock_custom_runner_cls.return_value = mock_runner
result = ModelRunner(
model_path=self.test_dir,
npu_mem_size=1,
cpu_mem_size=1,
block_size=128
)
assert result == mock_runner
mock_import.assert_called_with('custom_model', ['MindIEModelRunner'])
@patch('mindformers.model_runner.MindIEModelRunner')
@patch('mindformers.model_runner.MindFormerConfig')
@patch('mindformers.model_runner.models')
@patch('importlib.import_module')
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_model_runner_custom_type_import_fail(self, mock_import, mock_models,
mock_config_cls, mock_default_runner):
"""Test ModelRunner with custom model type that fails to import"""
mock_config = MockConfigFactory.create_mindformer_config(arch_type='custom_model')
mock_config_cls.return_value = mock_config
mock_models.__all__ = ['llama', 'gpt']
mock_runner = MagicMock()
mock_default_runner.return_value = mock_runner
result = ModelRunner(
model_path=self.test_dir,
npu_mem_size=1,
cpu_mem_size=1,
block_size=128
)
assert result == mock_runner
mock_default_runner.assert_called_once()
class TestMindIEModelRunnerInit:
"""Test MindIEModelRunner initialization"""
@pytest.fixture(autouse=True)
def setup_teardown(self):
"""Set up and tear down test fixtures"""
self.temp_dir = TempDirFixture()
self.test_dir = self.temp_dir.__enter__()
self.yaml_file = os.path.join(self.test_dir, 'config.yaml')
config_content = """
model:
arch:
type: llama
model_config:
type: llama_config
num_hidden_layers: 2
hidden_size: 128
num_attention_heads: 4
compute_dtype: float32
seq_length: 512
batch_size: 1
processor:
tokenizer:
type: llama_tokenizer
context:
device_target: CPU
mode: 0
"""
with open(self.yaml_file, 'w', encoding='utf-8') as f:
f.write(config_content)
yield
self.temp_dir.__exit__(None, None, None)
def _setup_base_mocks(self):
"""Setup common mocks for initialization tests"""
patches = {
'ms_model': patch('mindspore.Model'),
'auto_model': patch('mindformers.model_runner.AutoModel'),
'auto_tokenizer': patch('mindformers.model_runner.AutoTokenizer'),
'auto_config': patch('mindformers.model_runner.AutoConfig'),
'build_context': patch('mindformers.model_runner.build_context'),
'is_legacy': patch('mindformers.model_runner.is_legacy_model'),
'config_cls': patch('mindformers.model_runner.MindFormerConfig'),
'gen_config': patch('mindformers.model_runner.GenerationConfig'),
'get_load_path': patch('mindformers.model_runner.get_load_path_after_hf_convert'),
'no_init': patch('mindformers.model_runner.no_init_parameters'),
'register': patch('mindformers.model_runner.register_auto_class'),
}
return patches
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_init_invalid_plugin_params(self):
"""Test initialization with invalid plugin params"""
with patch('mindformers.model_runner.MindFormerConfig'):
with pytest.raises(ValueError) as exc_info:
MindIEModelRunner(
model_path=self.test_dir,
config_path=self.yaml_file,
npu_mem_size=1,
cpu_mem_size=1,
block_size=128,
plugin_params={'invalid': 'dict'}
)
assert 'plugin params should be str type' in str(exc_info.value)
@patch('mindspore.Model')
@patch('mindformers.model_runner.AutoModel')
@patch('mindformers.model_runner.AutoTokenizer')
@patch('mindformers.model_runner.AutoConfig')
@patch('mindformers.model_runner.build_context')
@patch('mindformers.model_runner.is_legacy_model')
@patch('mindformers.model_runner.MindFormerConfig')
@patch('mindformers.model_runner.GenerationConfig')
@patch('mindformers.model_runner.get_load_path_after_hf_convert')
@patch('mindformers.model_runner.no_init_parameters')
@patch('mindformers.model_runner.register_auto_class')
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_init_single_modal_legacy(self, mock_register, mock_no_init, mock_get_load_path,
mock_gen_config, mock_config_cls, mock_is_legacy,
mock_build_ctx, mock_auto_config, mock_tokenizer,
mock_auto_model, mock_ms_model):
"""Test initialization for single modal model with legacy mode"""
mock_config = MockConfigFactory.create_mindformer_config()
mock_config_cls.return_value = mock_config
mock_model_config = MockConfigFactory.create_model_config()
mock_auto_config.from_pretrained.return_value = mock_model_config
mock_is_legacy.return_value = True
mock_model = MagicMock()
mock_model.__class__.__name__ = 'TestModel'
mock_model.kvcache.return_value = (MagicMock(), MagicMock())
mock_auto_model.from_config.return_value = mock_model
mock_tok = MagicMock()
mock_tokenizer.from_pretrained.return_value = mock_tok
mock_get_load_path.return_value = None
mock_gen_config.from_model_config.return_value = MagicMock()
ctx_mgr = MagicMock()
ctx_mgr.__enter__ = Mock(return_value=None)
ctx_mgr.__exit__ = Mock(return_value=False)
mock_no_init.return_value = ctx_mgr
runner = MindIEModelRunner(
model_path=self.test_dir,
config_path=self.yaml_file,
npu_mem_size=1,
cpu_mem_size=1,
block_size=128
)
assert runner.num_layers == 2
assert runner.warmup_step == 2
assert runner.is_multi_modal_model is False
mock_register.assert_called()
assert mock_register.call_count == 3
@patch('mindspore.Model')
@patch('mindformers.model_runner.AutoModel')
@patch('mindformers.model_runner.AutoTokenizer')
@patch('mindformers.model_runner.AutoConfig')
@patch('mindformers.model_runner.build_context')
@patch('mindformers.model_runner.is_legacy_model')
@patch('mindformers.model_runner.MindFormerConfig')
@patch('mindformers.model_runner.GenerationConfig')
@patch('mindformers.model_runner.get_load_path_after_hf_convert')
@patch('mindformers.model_runner.no_init_parameters')
@patch('mindformers.model_runner.register_auto_class')
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_init_with_npu_device_ids_error(self, mock_register, mock_no_init, mock_get_load_path,
mock_gen_config, mock_config_cls, mock_is_legacy,
mock_build_ctx, mock_auto_config, mock_tokenizer,
mock_auto_model, mock_ms_model):
"""Test initialization with multiple npu_device_ids in non-parallel mode"""
mock_config = MockConfigFactory.create_mindformer_config()
mock_config_cls.return_value = mock_config
with pytest.raises(ValueError) as exc_info:
MindIEModelRunner(
model_path=self.test_dir,
config_path=self.yaml_file,
npu_mem_size=1,
cpu_mem_size=1,
block_size=128,
npu_device_ids=[0, 1]
)
assert 'should only contain one device_id' in str(exc_info.value)
@patch('mindspore.Model')
@patch('mindformers.model_runner.AutoModel')
@patch('mindformers.model_runner.AutoTokenizer')
@patch('mindformers.model_runner.AutoConfig')
@patch('mindformers.model_runner.build_context')
@patch('mindformers.model_runner.is_legacy_model')
@patch('mindformers.model_runner.MindFormerConfig')
@patch('mindformers.model_runner.GenerationConfig')
@patch('mindformers.model_runner.get_load_path_after_hf_convert')
@patch('mindformers.model_runner.no_init_parameters')
@patch('mindformers.model_runner.register_auto_class')
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_init_dynamic_kv_cache_error(self, mock_register, mock_no_init, mock_get_load_path,
mock_gen_config, mock_config_cls, mock_is_legacy,
mock_build_ctx, mock_auto_config, mock_tokenizer,
mock_auto_model, mock_ms_model):
"""Test initialization with npu_mem_size=-1 for unsupported model"""
mock_config = MockConfigFactory.create_mindformer_config()
mock_config_cls.return_value = mock_config
mock_model_config = MockConfigFactory.create_model_config()
mock_auto_config.from_pretrained.return_value = mock_model_config
mock_is_legacy.return_value = True
mock_model = MagicMock()
mock_model.__class__.__name__ = 'UnsupportedModel'
mock_auto_model.from_config.return_value = mock_model
mock_get_load_path.return_value = None
mock_gen_config.from_model_config.return_value = MagicMock()
ctx_mgr = MagicMock()
ctx_mgr.__enter__ = Mock(return_value=None)
ctx_mgr.__exit__ = Mock(return_value=False)
mock_no_init.return_value = ctx_mgr
with pytest.raises(ValueError) as exc_info:
MindIEModelRunner(
model_path=self.test_dir,
config_path=self.yaml_file,
npu_mem_size=-1,
cpu_mem_size=1,
block_size=128
)
assert 'npu_mem_size=-1 only support in parallel mode' in str(exc_info.value)
@patch('mindspore.Model')
@patch('mindformers.model_runner.AutoModel')
@patch('mindformers.model_runner.AutoTokenizer')
@patch('mindformers.model_runner.AutoConfig')
@patch('mindformers.model_runner.build_context')
@patch('mindformers.model_runner.is_legacy_model')
@patch('mindformers.model_runner.MindFormerConfig')
@patch('mindformers.model_runner.GenerationConfig')
@patch('mindformers.model_runner.get_load_path_after_hf_convert')
@patch('mindformers.model_runner.no_init_parameters')
@patch('mindformers.model_runner.register_auto_class')
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_update_model_config_with_plugin_params(self, mock_register, mock_no_init,
mock_get_load_path, mock_gen_config,
mock_config_cls, mock_is_legacy,
mock_build_ctx, mock_auto_config,
mock_tokenizer, mock_auto_model, mock_ms_model):
"""Test update_model_config with plugin parameters"""
mock_config = MockConfigFactory.create_mindformer_config()
mock_config_cls.return_value = mock_config
mock_model_config = MockConfigFactory.create_model_config()
mock_auto_config.from_pretrained.return_value = mock_model_config
mock_is_legacy.return_value = True
mock_model = MagicMock()
mock_model.__class__.__name__ = 'TestModel'
mock_auto_model.from_config.return_value = mock_model
mock_get_load_path.return_value = None
mock_gen_config.from_model_config.return_value = MagicMock()
ctx_mgr = MagicMock()
ctx_mgr.__enter__ = Mock(return_value=None)
ctx_mgr.__exit__ = Mock(return_value=False)
mock_no_init.return_value = ctx_mgr
plugin_params = json.dumps({'plugin_type': 'speculative_decoding'})
runner = MindIEModelRunner(
model_path=self.test_dir,
config_path=self.yaml_file,
npu_mem_size=1,
cpu_mem_size=1,
block_size=128,
plugin_params=plugin_params
)
assert runner.model_config.parallel_decoding_params is not None
@patch('mindspore.Model')
@patch('mindformers.model_runner.AutoModel')
@patch('mindformers.model_runner.AutoTokenizer')
@patch('mindformers.model_runner.AutoConfig')
@patch('mindformers.model_runner.build_context')
@patch('mindformers.model_runner.build_parallel_config')
@patch('mindformers.model_runner.is_legacy_model')
@patch('mindformers.model_runner.MindFormerConfig')
@patch('mindformers.model_runner.GenerationConfig')
@patch('mindformers.model_runner.get_load_path_after_hf_convert')
@patch('mindformers.model_runner.no_init_parameters')
@patch('mindformers.model_runner.register_auto_class')
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_init_with_moe_config(self, mock_register, mock_no_init, mock_get_load_path,
mock_gen_config, mock_config_cls, mock_is_legacy,
mock_build_parallel, mock_build_ctx, mock_auto_config,
mock_tokenizer, mock_auto_model, mock_ms_model):
"""Test initialization with MoE config"""
mock_config = MockConfigFactory.create_mindformer_config(
arch_type='llama_moe',
use_parallel=True,
moe_config={'num_experts': 8}
)
mock_config_cls.return_value = mock_config
mock_model_config = MockConfigFactory.create_model_config()
mock_auto_config.from_pretrained.return_value = mock_model_config
mock_is_legacy.return_value = False
mock_model = MagicMock()
mock_model.__class__.__name__ = 'TestModel'
mock_auto_model.from_config.return_value = mock_model
mock_get_load_path.return_value = None
mock_gen_config.from_model_config.return_value = MagicMock()
ctx_mgr = MagicMock()
ctx_mgr.__enter__ = Mock(return_value=None)
ctx_mgr.__exit__ = Mock(return_value=False)
mock_no_init.return_value = ctx_mgr
runner = MindIEModelRunner(
model_path=self.test_dir,
config_path=self.yaml_file,
npu_mem_size=1,
cpu_mem_size=1,
block_size=128
)
assert runner.model_config.moe_config == {'num_experts': 8}
class TestMindIEModelRunnerMethods:
"""Test MindIEModelRunner methods"""
@pytest.fixture(autouse=True)
def setup(self):
"""Set up test fixtures"""
self.runner = MockRunnerFactory.create_runner_mock()
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_forward_legacy_prefill(self):
"""Test forward method in legacy mode with prefill"""
input_ids = [[1, 2, 3, 4]]
valid_length = [4]
mock_logits = Tensor(np.random.randn(1, 4, 1000).astype(np.float32))
current_idx = [0]
self.runner.model.forward.return_value = (mock_logits, current_idx)
result = MindIEModelRunner.forward(
self.runner,
input_ids=input_ids,
valid_length_each_example=valid_length,
prefill=True
)
self.runner.model.forward.assert_called_once()
assert self.runner.warmup_step == 1
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_forward_non_legacy_decode(self):
"""Test forward method in non-legacy mode with decode"""
self.runner.use_legacy = False
self.runner.warmup_step = 0
input_ids = [[1]]
valid_length = [1]
mock_logits = Tensor(np.random.randn(1, 1, 1000).astype(np.float32))
current_idx = []
self.runner.model.forward_mcore.return_value = (mock_logits, current_idx)
result = MindIEModelRunner.forward(
self.runner,
input_ids=input_ids,
valid_length_each_example=valid_length,
prefill=False
)
self.runner.model.forward_mcore.assert_called_once()
@patch('mindformers.model_runner.parallel_decoding_control')
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_forward_parallel_decoding(self, mock_parallel_ctrl):
"""Test forward with parallel decoding enabled"""
mock_parallel_ctrl.return_value = True
input_ids = [[1, 2]]
valid_length = [2]
mock_logits = Tensor(np.random.randn(1, 2, 1000).astype(np.float32))
self.runner.model.forward.return_value = (mock_logits, [])
result = MindIEModelRunner.forward(
self.runner,
input_ids=input_ids,
valid_length_each_example=valid_length
)
mock_parallel_ctrl.assert_called_once()
@patch('mindformers.model_runner.swap_cache')
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_swap(self, mock_swap_cache):
"""Test swap method"""
self.runner.key_host = [MagicMock(), MagicMock()]
self.runner.value_host = [MagicMock(), MagicMock()]
self.runner.model.kvcache.side_effect = [
(MagicMock(), MagicMock()),
(MagicMock(), MagicMock())
]
block_tables = [[0, 1], [2, 3]]
swap_type = True
MindIEModelRunner.swap(self.runner, block_tables, swap_type)
assert mock_swap_cache.call_count == 4
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_generate_position_ids_single_modal(self):
"""Test generate_position_ids for single modal model"""
input_ids = [1, 2, 3, 4, 5]
result = MindIEModelRunner.generate_position_ids(self.runner, input_ids)
assert list(result) == [0, 1, 2, 3, 4]
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_generate_position_ids_multi_modal_warmup(self):
"""Test generate_position_ids for multi-modal during warmup"""
self.runner.is_multi_modal_model = True
self.runner.warmup_step = 1
input_ids = [1, 2, 3]
result = MindIEModelRunner.generate_position_ids(self.runner, input_ids)
assert list(result) == [0, 1, 2]
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_generate_position_ids_multi_modal_no_warmup(self):
"""Test generate_position_ids for multi-modal after warmup"""
self.runner.is_multi_modal_model = True
self.runner.warmup_step = 0
self.runner.processor.decode_position_ids_from_input_ids.return_value = [0, 5, 10]
input_ids = [1, 2, 3]
result = MindIEModelRunner.generate_position_ids(self.runner, input_ids)
self.runner.processor.decode_position_ids_from_input_ids.assert_called_once_with(input_ids)
assert result == [0, 5, 10]
@patch('mindformers.model_runner.transform_and_load_checkpoint')
@patch('mindformers.model_runner.get_load_path_after_hf_convert')
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_load_checkpoint_with_checkpoint(self, mock_get_load, mock_transform_load):
"""Test load_checkpoint when checkpoint exists"""
self.runner.model_config.batch_size = 1
self.runner.model_config.seq_length = 512
mock_get_load.return_value = '/path/to/checkpoint'
with patch('mindspore.Model') as mock_ms_model:
MindIEModelRunner.load_checkpoint(self.runner)
mock_transform_load.assert_called_once()
self.runner.model.init_parameters_data.assert_called_once()
@patch('mindformers.model_runner.get_load_path_after_hf_convert')
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_load_checkpoint_no_checkpoint(self, mock_get_load):
"""Test load_checkpoint when no checkpoint exists"""
self.runner.model_config.batch_size = 1
self.runner.model_config.seq_length = 512
mock_get_load.return_value = None
with patch('mindspore.Model'):
MindIEModelRunner.load_checkpoint(self.runner)
self.runner.model.init_parameters_data.assert_called_once()
@patch('mindformers.model_runner.get_load_path_after_hf_convert')
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_load_checkpoint_legacy(self, mock_get_load):
"""Test load_checkpoint in legacy mode"""
self.runner.model_config.batch_size = 1
self.runner.model_config.seq_length = 512
self.runner.use_legacy = True
self.runner.model.prepare_inputs_for_predict_layout.return_value = MagicMock()
mock_get_load.return_value = None
with patch('mindspore.Model'):
MindIEModelRunner.load_checkpoint(self.runner)
self.runner.model.prepare_inputs_for_predict_layout.assert_called_once()
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_update_model_config_default_plugin(self):
"""Test update_model_config with default plugin config"""
self.runner.config.load_checkpoint = '/path/to/checkpoint'
MindIEModelRunner.update_model_config(
self.runner,
plugin_params={'plugin_type': None}
)
assert self.runner.model_config.parallel_decoding_params is None
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_update_model_config_json_string(self):
"""Test update_model_config with JSON string"""
self.runner.config.load_checkpoint = '/path/to/checkpoint'
plugin_params = json.dumps({'plugin_type': 'medusa'})
MindIEModelRunner.update_model_config(self.runner, plugin_params)
assert self.runner.model_config.parallel_decoding_params is not None
assert self.runner.model_config.parallel_decoding_params['parallel_decoding'] == 'medusa'
@patch('mindformers.model_runner.need_nz')
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_update_llm_config_legacy_with_nz(self, mock_need_nz):
"""Test update_llm_config in legacy mode with NZ"""
mock_need_nz.return_value = True
self.runner.use_legacy = True
config = MockConfigFactory.create_model_config(
num_layers=4,
num_heads=8,
hidden_size=256,
compute_dtype='float16',
seq_length=1024
)
MindIEModelRunner.update_llm_config(
self.runner,
config=config,
world_size=1,
npu_mem_size=2,
cpu_mem_size=1,
block_size=64
)
assert self.runner.num_layers == 4
assert self.runner.num_kv_heads == 8
assert config.block_size == 64
assert config.num_blocks is not None
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_update_llm_config_non_legacy(self):
"""Test update_llm_config in non-legacy mode"""
self.runner.use_legacy = False
self.runner.model_config.quantization_config = None
config = MockConfigFactory.create_model_config(
num_layers=6,
num_heads=12,
n_kv_heads=4,
hidden_size=384,
compute_dtype='float32',
seq_length=2048
)
with patch('mindformers.model_runner.need_nz', return_value=False):
MindIEModelRunner.update_llm_config(
self.runner,
config=config,
world_size=2,
npu_mem_size=4,
cpu_mem_size=2,
block_size=128
)
assert self.runner.num_layers == 6
assert self.runner.num_kv_heads == 2
assert self.runner.head_size == 32
@patch('mindformers.model_runner.str_to_ms_type', {'int8': ms.int8})
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_update_llm_config_with_quantization(self):
"""Test update_llm_config with quantization config"""
self.runner.use_legacy = False
self.runner.model_config.quantization_config = MagicMock()
self.runner.model_config.quantization_config.kvcache_dtype = 'int8'
config = MockConfigFactory.create_model_config()
with patch('mindformers.model_runner.need_nz', return_value=False):
MindIEModelRunner.update_llm_config(
self.runner,
config=config,
world_size=1,
npu_mem_size=1,
cpu_mem_size=1,
block_size=64
)
assert self.runner.dtype is not None
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_update_llm_config_no_max_position_embedding(self):
"""Test update_llm_config when max_position_embedding is not set"""
self.runner.use_legacy = False
config = MockConfigFactory.create_model_config()
config.max_position_embedding = None
with patch('mindformers.model_runner.need_nz', return_value=False):
MindIEModelRunner.update_llm_config(
self.runner,
config=config,
world_size=1,
npu_mem_size=1,
cpu_mem_size=1,
block_size=64
)
assert config.max_position_embedding == 512
class TestInputBuilder:
"""Test InputBuilder class"""
@pytest.fixture(autouse=True)
def setup(self):
"""Set up test fixtures"""
self.tokenizer = MagicMock()
self.tokenizer.chat_template = "{% for message in messages %}{{ message.content }}{% endfor %}"
self.tokenizer.apply_chat_template = MagicMock(return_value=[1, 2, 3, 4, 5])
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_init_default(self):
"""Test InputBuilder initialization with defaults"""
builder = InputBuilder(self.tokenizer)
assert builder.tokenizer == self.tokenizer
assert builder.system_role_name == 'system'
assert builder.user_role_name == 'user'
assert builder.max_length == 2048
assert builder.rank == 0
assert builder.adapt_to_max_length is False
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_init_with_params(self):
"""Test InputBuilder initialization with custom parameters"""
builder = InputBuilder(
self.tokenizer,
chat_template="custom template",
system_role_name="sys",
user_role_name="usr",
max_length=4096
)
assert builder.tokenizer.chat_template == "custom template"
assert builder.system_role_name == 'sys'
assert builder.user_role_name == 'usr'
assert builder.max_length == 4096
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_make_context_default(self):
"""Test make_context with default parameters"""
builder = InputBuilder(self.tokenizer)
conversation = [
{'role': 'user', 'content': 'Hello'},
{'role': 'assistant', 'content': 'Hi there!'}
]
result = builder.make_context(rank=0, conversation=conversation)
assert result == [1, 2, 3, 4, 5]
assert builder.rank == 0
assert builder.adapt_to_max_length is False
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_make_context_with_params(self):
"""Test make_context with custom parameters"""
builder = InputBuilder(self.tokenizer)
conversation = [{'role': 'user', 'content': 'Test'}]
result = builder.make_context(
rank=1,
conversation=conversation,
add_generation_prompt=False,
adapt_to_max_length=True
)
assert builder.rank == 1
assert builder.adapt_to_max_length is True
self.tokenizer.apply_chat_template.assert_called_once()
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_make_context_with_kwargs(self):
"""Test make_context with additional kwargs"""
builder = InputBuilder(self.tokenizer)
conversation = [{'role': 'user', 'content': 'Test'}]
result = builder.make_context(
rank=0,
conversation=conversation,
max_length=512,
padding=True
)
call_kwargs = self.tokenizer.apply_chat_template.call_args[1]
assert 'max_length' in call_kwargs
assert 'padding' in call_kwargs
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_apply_chat_template_no_method(self):
"""Test _apply_chat_template when tokenizer doesn't have the method"""
tokenizer = MagicMock(spec=[])
builder = InputBuilder(tokenizer)
with pytest.raises(RuntimeError) as exc_info:
builder._apply_chat_template([])
assert 'does not implement apply_chat_template' in str(exc_info.value)
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_apply_chat_template_no_template(self):
"""Test _apply_chat_template when tokenizer has no chat_template"""
tokenizer = MagicMock()
tokenizer.chat_template = None
builder = InputBuilder(tokenizer)
with pytest.raises(RuntimeError) as exc_info:
builder._apply_chat_template([])
assert 'not configured with a `chat_template`' in str(exc_info.value)
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_apply_chat_template_success(self):
"""Test _apply_chat_template successful execution"""
builder = InputBuilder(self.tokenizer)
conversation = [
{'role': 'system', 'content': 'You are helpful'},
{'role': 'user', 'content': 'Hello'}
]
result = builder._apply_chat_template(conversation)
assert result == [1, 2, 3, 4, 5]
self.tokenizer.apply_chat_template.assert_called_once_with(conversation)
class TestSafetensorsHelpers:
"""Test safetensors helper functions"""
@pytest.fixture(autouse=True)
def setup_teardown(self):
"""Set up and tear down test fixtures"""
self.temp_dir = TempDirFixture()
self.test_dir = self.temp_dir.__enter__()
yield
self.temp_dir.__exit__(None, None, None)
@patch('mindspore.load_distributed_checkpoint')
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_load_distributed_safetensors(self, mock_load_dist):
"""Test _load_distributed_safetensors"""
model = MagicMock()
strategy_path = '/path/to/strategy'
load_path = '/path/to/safetensors'
_load_distributed_safetensors(model, strategy_path, load_path)
mock_load_dist.assert_called_once_with(
network=model,
predict_strategy=strategy_path,
unified_safetensors_dir=load_path,
format='safetensors'
)
@patch('mindspore.load_checkpoint')
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_load_safetensors_single(self, mock_load_ckpt):
"""Test _load_safetensors with single file"""
sf_file = os.path.join(self.test_dir, 'model.safetensors')
with open(sf_file, 'wb') as f:
f.write(b'dummy content')
model = MagicMock()
_load_safetensors(model, self.test_dir)
mock_load_ckpt.assert_called_once()
call_args = mock_load_ckpt.call_args[1]
assert call_args['ckpt_file_name'].endswith('.safetensors')
assert call_args['net'] == model
assert call_args['format'] == 'safetensors'
@patch('mindspore.load_checkpoint')
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_load_safetensors_multiple(self, mock_load_ckpt):
"""Test _load_safetensors with multiple files"""
for i in range(3):
sf_file = os.path.join(self.test_dir, f'model_{i}.safetensors')
with open(sf_file, 'wb') as f:
f.write(f'dummy content {i}'.encode())
model = MagicMock()
_load_safetensors(model, self.test_dir)
assert mock_load_ckpt.call_count == 3
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_load_safetensors_no_files(self):
"""Test _load_safetensors when no safetensors files exist"""
model = MagicMock()
with pytest.raises(FileNotFoundError) as exc_info:
_load_safetensors(model, self.test_dir)
assert 'no safetensors files' in str(exc_info.value)
@patch('mindformers.model_runner.contains_safetensors_files')
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_check_valid_safetensors_path_valid(self, mock_contains):
"""Test _check_valid_safetensors_path with valid path"""
mock_contains.return_value = True
_check_valid_safetensors_path(self.test_dir)
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_check_valid_safetensors_path_invalid_type(self):
"""Test _check_valid_safetensors_path with invalid type"""
with pytest.raises(ValueError) as exc_info:
_check_valid_safetensors_path(123)
assert 'must be a str' in str(exc_info.value)
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_check_valid_safetensors_path_not_exist(self):
"""Test _check_valid_safetensors_path with non-existent path"""
with pytest.raises(ValueError) as exc_info:
_check_valid_safetensors_path('/nonexistent/path')
assert 'does not exist' in str(exc_info.value)
@patch('mindformers.model_runner.contains_safetensors_files')
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_check_valid_safetensors_path_no_safetensors(self, mock_contains):
"""Test _check_valid_safetensors_path when directory has no safetensors"""
mock_contains.return_value = False
with pytest.raises(ValueError) as exc_info:
_check_valid_safetensors_path(self.test_dir)
assert 'not a valid path for safetensors' in str(exc_info.value)
class TestMindIEModelRunner:
"""
Test MindIEModelRunner API.
1. Check the type of `__init__` attributes.
2. Check the type of `forward` inputs.
3. Check the dimension and data type of `forward` inputs if they are `np.ndarray` or `Tensor`.
4. Check the consistency between the `res` in model and the `logits` in model_runner which should be same
if the `res` is `Tensor`, otherwise, `res` would be a list[Tensor], the shape of `res[0]` is compared.
"""
def __init__(self, model_path, config_path, npu_mem_size, cpu_mem_size, block_size, rank_id=0, world_size=1,
npu_device_ids=None, plugin_params=None):
"""Test __init__ api"""
self.warmup_step = 2
self.is_multi_modal_model = False
self.model = TestModel()
assert isinstance(model_path, str)
assert isinstance(config_path, str)
assert isinstance(npu_mem_size, int)
assert isinstance(cpu_mem_size, int)
assert isinstance(block_size, int)
assert isinstance(rank_id, int)
assert isinstance(world_size, int)
assert isinstance(npu_device_ids, list) and isinstance(npu_device_ids[0], int)
if plugin_params:
assert isinstance(plugin_params, str)
assert config_path == os.path.join(model_path, 'test_config.yaml')
@pytest.mark.level0
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
@mock.patch('mindformers.model_runner.MindIEModelRunner.__init__', TestMindIEModelRunner.__init__)
def test_model_runner():
"""
Feature: Test ModelRunner API.
Description: Test ModelRunner API.
Expectation: Success.
"""
current_file_path = os.path.dirname(__file__)
model_path = os.path.join(current_file_path, "test_files")
npu_mem_size = 3
cpu_mem_size = 1
block_size = 128
rank_id = 0
world_size = 1
npu_device_ids = [0]
model_runner = ModelRunner(model_path=model_path, npu_mem_size=npu_mem_size, cpu_mem_size=cpu_mem_size,
block_size=block_size, rank_id=rank_id, world_size=world_size,
npu_device_ids=npu_device_ids)
model_runner.use_legacy = True
input_ids = np.arange(32 * 256).reshape(32, 256).astype(np.int32)
valid_length_each_example = np.arange(32).astype(np.int32)
block_tables = np.arange(32 * 256).reshape(32, 256).astype(np.int32)
slot_mapping = np.arange(32 * 256).astype(np.int32)
prefill = False
logits = model_runner.forward(input_ids=input_ids, valid_length_each_example=valid_length_each_example,
block_tables=block_tables, slot_mapping=slot_mapping, prefill=prefill)
assert logits.shape == (2, 16000)
prefill = True
logits = model_runner.forward(input_ids=input_ids, valid_length_each_example=valid_length_each_example,
block_tables=block_tables, slot_mapping=slot_mapping, prefill=prefill)
assert logits.shape == (1, 16000)
if __name__ == '__main__':
pytest.main([__file__, '-v'])