import re
import pytest
from cmate.parser import Parser
from cmate.visitor import (
InfoCollector, _ExpressionEvaluator, ASTFormatter, RuleCollector, AssignmentProcessor, RuleCollector,
Severity,
CMateError
)
from cmate.data_source import DataSource
@pytest.fixture(scope='session')
def parser():
return Parser()
@pytest.fixture()
def info_collector():
return InfoCollector()
def test_collect_given_metadata_name_assignment_when_input_is_valid_then_extract_correct_metadata_info(parser, info_collector):
text = '''\
[metadata]
name = 'test_name'
---
'''
node = parser.parse(text)
info = info_collector.collect(node)
assert info == {'contexts': {}, 'metadata': {'name': 'test_name'}, 'targets': {}}
def test_parse_multiple_metadata_blocks_when_later_block_overrides_earlier_one(parser, info_collector):
text = '''\
[metadata]
name = 'test_1'
---
[metadata]
name = 'test_2'
---
'''
node = parser.parse(text)
info = info_collector.collect(node)
assert info == {'contexts': {}, 'metadata': {'name': 'test_2'}, 'targets': {}}
def test_collect_when_given_par_env_with_assert_in_desc_then_par_env_parsed_as_target(parser, info_collector):
text = '''\
[par env]
assert 1, ''
'''
node = parser.parse(text)
info = info_collector.collect(node)
assert info == {
'metadata': {},
'targets':
{
'env': {'desc': None, 'parse_type': None, 'required_targets': None, 'required_contexts': None}
},
'contexts': {}
}
def test_collect_when_multiple_par_blocks_present_then_all_blocks_parsed_as_targets(parser, info_collector):
text = '''\
[par env]
assert 1, ''
[par sys]
assert 1, ''
'''
node = parser.parse(text)
info = info_collector.collect(node)
assert info == {
'metadata': {},
'targets':
{
'env': {'desc': None, 'parse_type': None, 'required_targets': None, 'required_contexts': None},
'sys': {'desc': None, 'parse_type': None, 'required_targets': None, 'required_contexts': None}
},
'contexts': {}
}
def test_collect_when_target_requires_context_then_context_and_target_relation_captured(parser, info_collector):
text = '''\
[par env]
assert ${context::a} == 2, ''
'''
node = parser.parse(text)
info = info_collector.collect(node)
assert info == {
'metadata': {},
'targets': {
'env': {'desc': None, 'parse_type': None, 'required_targets': None, 'required_contexts': ['a']}
},
'contexts': {'a': {'desc': None, 'options': [2]}}
}
def test_collect_when_same_target_with_different_context_options_then_options_merged_in_context(parser, info_collector):
text = '''\
[par env]
assert ${context::a} == 2, ''
---
[par env]
assert ${context::a} == 3, ''
'''
node = parser.parse(text)
info = info_collector.collect(node)
assert info == {
'metadata': {},
'targets': {
'env': {'desc': None, 'parse_type': None, 'required_targets': None, 'required_contexts': ['a']}
},
'contexts': {'a': {'desc': None, 'options': [2, 3]}}
}
def test_collect_when_target_defined_in_dependency_block_then_parse_type_and_desc_parsed_correctly(parser, info_collector):
text = '''\
[dependency]
sys : 'System' @ 'json'
---
[par sys]
assert ${env::ABC} == ${context::test}, 'test'
'''
node = parser.parse(text)
info = info_collector.collect(node)
assert info == {
'metadata': {},
'targets':
{
'sys': {'desc': 'System', 'parse_type': 'json', 'required_targets': ['env'], 'required_contexts': ['test']}
},
'contexts': {}
}
def test_collect_complex_scenario_with_metadata_dependency_and_conditional_logic_then_info_collected_correctly(parser, info_collector):
text = '''\
[metadata]
name = 'MindIE 配置项检查'
authors = [{"name": "a", "email": "b"}, {"name": "c"}]
---
[dependency]
mies_config: 'MindIE Service 主配置文件' @ 'json'
deploy_mode: '部署模式标识,用于确定检查规则集'
---
[global]
if ${context::deploy_mode} == 'pd_mix':
dp = ${mies_config::BackendConfig.ModelDeployConfig.ModelConfig[0].dp} or 1
if ${context::model_type} == 'deepseek':
moe_ep = ${mies_config::BackendConfig.ModelDeployConfig.ModelConfig[0].moe_ep} or 1
fi
fi
---
[par mies_config]
if ${context::deploy_mode} == 'pd_mix':
assert ${pp} == 1, 'pp 取值只能等于 1', error
fi
'''
node = parser.parse(text)
info = info_collector.collect(node)
assert info == {
'metadata': {'name': 'MindIE 配置项检查', 'authors': [{'name': 'a', 'email': 'b'}, {'name': 'c'}]},
'targets': {'mies_config': {'desc': 'MindIE Service 主配置文件', 'parse_type': 'json', 'required_targets': None, 'required_contexts': ['deploy_mode']}},
'contexts': {'deploy_mode': {'desc': ('部署模式标识,用于确定检查规则集', None), 'options': ['pd_mix']}}
}
@pytest.fixture()
def data_source(mocker):
return mocker.MagicMock()
@pytest.fixture()
def evaluator(data_source):
return _ExpressionEvaluator(data_source)
def test_eval_complex_arithmetic_expression_in_global_block_then_result_matches_standard(parser, evaluator):
text = '''\
[global]
a = (1 + 2 * 3 // 4 - 5) % 2
---
'''
document_node = parser.parse(text)
global_node = document_node.body[0]
assign_node = global_node.body[0]
standard = (1 + 2 * 3 // 4 - 5) % 2
assert standard == evaluator.evaluate(assign_node.value)
def test_eval_list_concatenation_with_dicts_in_global_block_then_result_matches_standard(parser, evaluator):
text = '''\
[global]
a = [{'a': 2}] + [{'b': 3}]
---
'''
document_node = parser.parse(text)
global_node = document_node.body[0]
assign_node = global_node.body[0]
standard = [{'a': 2}] + [{'b': 3}]
assert standard == evaluator.evaluate(assign_node.value)
def test_eval_complex_comparison_expression_with_many_parentheses_in_global_block_then_result_matches_standard(parser, evaluator):
text = '''\
[global]
a = (((((1 == 2) != 3) <= 4) >= 5) < 6) > 7
---
'''
document_node = parser.parse(text)
global_node = document_node.body[0]
assign_node = global_node.body[0]
standard = (((((1 == 2) != 3) <= 4) >= 5) < 6) > 7
assert standard == evaluator.evaluate(assign_node.value)
def test_eval_assign_value_given_match_operator_when_string_matches_regex_then_return_match_object(parser, evaluator):
text = '''\
[global]
a = 'test_string' =~ 'str'
---
'''
document_node = parser.parse(text)
global_node = document_node.body[0]
assign_node = global_node.body[0]
standard = re.search('str', 'test_string')
assert standard.pos == evaluator.evaluate(assign_node.value).pos
assert standard.endpos == evaluator.evaluate(assign_node.value).endpos
def test_eval_regex_with_repetition_operator_when_string_length_exceeds_limit_then_timeout_occurs(parser, evaluator):
text = '''\
[global]
a = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa!' =~ '^(a+)+$'
---
'''
document_node = parser.parse(text)
global_node = document_node.body[0]
assign_node = global_node.body[0]
with pytest.raises(TimeoutError, match='timed out after'):
evaluator.evaluate(assign_node.value)
def test_eval_contains_check_with_data_source_access_in_global_block_then_result_matches_standard(parser, evaluator, data_source):
text = '''\
[global]
a = ${a.b.c} in set([1, 2, 3])
---
'''
data_source.__getitem__.return_value = 2
document_node = parser.parse(text)
global_node = document_node.body[0]
assign_node = global_node.body[0]
standard = 2 in set([1, 2, 3])
assert standard == evaluator.evaluate(assign_node.value)
assert evaluator.history == [('global::a.b.c', 2)]
def test_eval_raises_cmate_error_when_function_undefined_in_global_scope(parser, evaluator):
text = '''\
[global]
a = unknownfunc(1, 2, 3)
---
'''
document_node = parser.parse(text)
global_node = document_node.body[0]
assign_node = global_node.body[0]
with pytest.raises(CMateError, match='Undefined function'):
evaluator.evaluate(assign_node.value)
def test_eval_reference_to_another_global_variable_then_data_source_access_called(parser, evaluator):
'''访问不带 namespace 的,默认 global'''
text = '''\
[global]
b = ${a}
---
'''
document_node = parser.parse(text)
global_node = document_node.body[0]
assign_node = global_node.body[0]
evaluator.evaluate(assign_node.value)
evaluator.data_source.__getitem__.assert_called_once_with('global::a')
@pytest.fixture(scope='session')
def pretty_formatter():
return ASTFormatter()
def test_format_assert_statement_with_string_comparison_then_output_matches_expected_format(parser, pretty_formatter):
text = '''\
[par env]
assert ${test::ABC} == 'ABC', 'test'
---
'''
document_node = parser.parse(text)
partition_node = document_node.body[0]
rule_node = partition_node.body[0]
assert "test::ABC == 'ABC'" == pretty_formatter.format(rule_node)
def test_format_assert_statement_with_nested_data_structure_then_output_matches_expected_format(parser, pretty_formatter):
text = '''\
[par env]
assert ${test::ABC} == [{int(2): str(3)}] and not false, 'test'
---
'''
document_node = parser.parse(text)
partition_node = document_node.body[0]
rule_node = partition_node.body[0]
assert "test::ABC == [{'int(2)': 'str(3)'}] and not False" == pretty_formatter.format(rule_node)
@pytest.fixture
def input_configs(mocker):
return mocker.MagicMock()
def test_process_variable_assignment_in_global_block_then_data_source_updated_with_correct_value(parser, input_configs, data_source):
text = '''\
[global]
a = 2
---
'''
document_node = parser.parse(text)
processor = AssignmentProcessor(input_configs, data_source)
processor.process(document_node)
data_source.flatten.assert_called_once()
data_source.flatten.assert_called_with('global', {'a': 2})
def test_process_conditional_assignment_in_global_block_when_flag_false_then_else_branch_executed(parser, input_configs, data_source):
text = '''\
[global]
flag = true
if ${flag}:
a = 2
elif ${flag}:
b = 2
else:
c = 2
fi
---
'''
document_node = parser.parse(text)
data_source.__getitem__.return_value = False
processor = AssignmentProcessor(input_configs, data_source)
processor.process(document_node)
data_source.__getitem__.assert_called_with('global::flag')
data_source.flatten.assert_called_with('global', {'c': 2})
def test_process_for_loop_with_conditional_assignment_then_variables_set_correctly(parser, input_configs):
text = '''\
[global]
test_arr = [1, 2, 3]
for item in ${test_arr}:
if ${item} < 2:
b = ${item}
else:
c = ${item}
fi
done
---
'''
data_source = DataSource()
document_node = parser.parse(text)
processor = AssignmentProcessor(input_configs, data_source)
processor.process(document_node)
assert data_source['global::b'] == (1, 'global::test_arr[0]')
assert data_source['global::c'] == (3, 'global::test_arr[2]')
assert 'global::__root__' not in data_source
assert 'global::item' not in data_source
def test_process_global_skips_for_loops_for_targets_not_passed_in_input_configs(parser):
"""[global] must not read or assign from -c targets the user did not supply."""
data_source = DataSource()
data_source.flatten('mindie_ms_coordinator', [10, 20])
text = '''\
[global]
for item in ${mindie_server::__root__}:
from_server = ${item}
done
for item in ${mindie_ms_coordinator::__root__}:
from_coord = ${item}
done
---
'''
document_node = parser.parse(text)
processor = AssignmentProcessor({'mindie_ms_coordinator': '/x.yaml'}, data_source)
processor.process(document_node)
assert 'global::from_server' not in data_source
assert data_source['global::from_coord'][0] == 20
def test_process_loop_with_dictionary_iteration_then_values_correctly_extracted_and_processed(parser, input_configs, data_source):
text = '''\
[global]
for item in [{'name': 'a', 'value': 'b'}, {'name': 'c', 'value': 'd'}]:
if ${item.name} == 'a':
first_value = ${item.value}
elif ${item.name} == 'c':
second_value = ${item.value}
fi
done
---
'''
data_source = DataSource()
document_node = parser.parse(text)
processor = AssignmentProcessor(input_configs, data_source)
processor.process(document_node)
assert data_source['global::first_value'] == ('b', '[{"\'name\'": "\'a\'", "\'value\'": "\'b\'"}, {"\'name\'": "\'c\'", "\'value\'": "\'d\'"}][0]')
assert data_source['global::second_value'] == ('d', '[{"\'name\'": "\'a\'", "\'value\'": "\'b\'"}, {"\'name\'": "\'c\'", "\'value\'": "\'d\'"}][1]')
assert 'global::__root__' not in data_source
assert 'global::item' not in data_source
def test_process_for_loop_with_continue_and_break_then_only_correct_variables_set(parser, input_configs, data_source):
text = '''\
[global]
for item in set([1, 3, 5]):
if ${item} < 4:
continue
fi
a = ${item}
break
b = ${item}
done
---
'''
data_source = DataSource()
document_node = parser.parse(text)
processor = AssignmentProcessor(input_configs, data_source)
processor.process(document_node)
assert 'global::__root__' not in data_source
assert 'global::item' not in data_source
assert data_source['global::a'] == (5, 'set([1, 3, 5])[2]')
def test_process_reference_to_data_source_value_when_key_not_present_then_no_variable_assignment(parser, input_configs, data_source):
text = '''\
[global]
a = ${env::ABC}
b = ${ABC}
c = ${abc}
---
'''
results = {'abc': 2}
def set_item(self, key, value):
results[key] = value
def get_item(self, key):
return results[key]
def del_item(self, key):
del results[key]
document_node = parser.parse(text)
data_source.__setitem__ = set_item
data_source.__getitem__ = get_item
data_source.__delitem__ = del_item
processor = AssignmentProcessor(input_configs, data_source)
processor.process(document_node)
assert results == {'abc': 2}
def test_process_break_outside_loop_then_cmate_error_raised(parser, input_configs, data_source):
text = '''\
[global]
a = ${env::ABC}
break
---
'''
document_node = parser.parse(text)
processor = AssignmentProcessor(input_configs, data_source)
with pytest.raises(CMateError, match="'break' outside loop"):
processor.process(document_node)
def test_process_continue_outside_loop_then_cmate_error_raised(parser, input_configs, data_source):
text = '''\
[global]
a = ${env::ABC}
continue
---
'''
document_node = parser.parse(text)
processor = AssignmentProcessor(input_configs, data_source)
with pytest.raises(CMateError, match="'continue' not properly in loop"):
processor.process(document_node)
def test_process_variable_assignment_in_global_block_then_flatten_called_with_correct_syntax(parser, input_configs, data_source):
text = '''\
[global]
a = 2
b = a
---
'''
document_node = parser.parse(text)
processor = AssignmentProcessor(input_configs, data_source)
processor.process(document_node)
data_source.flatten.assert_called_once_with('global', {'a': 2})
def test_visit_assert_statement_when_input_configs_not_contains_target_then_ruleset_empty(parser, input_configs, data_source):
text = '''\
[par env]
assert ${test::ABC} == [{int(2): str(3)}] and not false, 'test'
---
'''
document_node = parser.parse(text)
input_configs.__contains__.return_value = False
rule_collector = RuleCollector(input_configs, data_source, 'info')
ruleset = rule_collector.collect(document_node)
assert not ruleset
def test_visit_assert_statement_when_input_configs_contains_target_then_ruleset_has_one_rule(parser, input_configs, data_source):
text = '''\
[par env]
assert ${test::ABC} == [{int(2): str(3)}] and not false, 'test'
---
'''
document_node = parser.parse(text)
input_configs.__contains__.return_value = True
rule_collector = RuleCollector(input_configs, data_source, 'info')
ruleset = rule_collector.collect(document_node)
assert len(ruleset) == 1
def test_visit_assert_statement_when_severity_mismatch_with_visitor_config_then_ruleset_empty(parser, input_configs, data_source):
text = '''\
[par env]
assert ${test::ABC} == [{int(2): str(3)}] and not false, 'test', info
---
'''
document_node = parser.parse(text)
input_configs.__contains__.return_value = True
rule_collector = RuleCollector(input_configs, data_source, 'error')
ruleset = rule_collector.collect(document_node)
assert not ruleset
def test_collect_conditional_assert_when_condition_true_then_correct_rule_added_to_ruleset(parser, input_configs, data_source):
text = '''\
[par env]
if True:
assert True, 'true'
else:
assert False, 'false'
fi
'''
document_node = parser.parse(text)
input_configs.__contains__.return_value = True
rule_collector = RuleCollector(input_configs, data_source, 'error')
ruleset = rule_collector.collect(document_node)
assert len(ruleset) == 1
assert list(list(ruleset.values())[0])[0].msg == 'true'
def test_collect_assert_inside_for_loop_when_condition_true_then_multiple_rules_generated(parser, input_configs):
text = '''\
[par env]
for item in [1, 2, 3]:
assert ${item} > 0, 'test'
done
'''
document_node = parser.parse(text)
input_configs.__contains__.return_value = True
data_source = DataSource()
rule_collector = RuleCollector(input_configs, data_source, 'error')
ruleset = rule_collector.collect(document_node)
assert len(ruleset) == 1
assert len(ruleset['env']) == 3
assert 'loopvar' in list(ruleset['env'])[0].test.left.path
assert 'loopvar' in list(ruleset['env'])[0].test.left.path
assert 'loopvar' in list(ruleset['env'])[0].test.left.path
def test_collect_assert_inside_for_loop_with_referenced_array_then_path_reflects_source_data(parser, input_configs):
text = '''\
[global]
test_arr = [1, 2, 3]
---
[par env]
for item in ${test_arr}:
assert ${item} > 0, 'test'
done
'''
document_node = parser.parse(text)
input_configs.__contains__.return_value = True
data_source = DataSource()
data_source.flatten('global', {'test_arr': [1, 2, 3]})
rule_collector = RuleCollector(input_configs, data_source, 'error')
ruleset = rule_collector.collect(document_node)
assert len(ruleset) == 1
assert len(ruleset['env']) == 3
assert 'test_arr[' in list(ruleset['env'])[0].test.left.path
assert 'test_arr[' in list(ruleset['env'])[0].test.left.path
assert 'test_arr[' in list(ruleset['env'])[0].test.left.path
def test_collect_assert_with_break_outside_loop_then_cmate_error_raised(parser, input_configs):
text = '''\
[par env]
assert ${item} > 0, 'test'
break
---
'''
document_node = parser.parse(text)
input_configs.__contains__.return_value = True
data_source = DataSource()
rule_collector = RuleCollector(input_configs, data_source, 'error')
with pytest.raises(CMateError, match="'break' outside loop"):
rule_collector.collect(document_node)
def test_collect_assert_with_break_outside_loop_then_cmate_error_raised(parser, input_configs):
text = '''\
[par env]
assert false, 'test'
continue
---
'''
document_node = parser.parse(text)
input_configs.__contains__.return_value = True
data_source = DataSource()
rule_collector = RuleCollector(input_configs, data_source, 'error')
with pytest.raises(CMateError, match="'continue' not properly in loop"):
rule_collector.collect(document_node)
def test_collect_assert_in_loop_with_continue_and_break_then_only_relevant_assert_captured(parser, input_configs, data_source):
text = '''\
[par env]
for item in set([1, 3, 5]):
if ${item} < 4:
continue
fi
assert false, 'test'
break
assert true, 'test'
done
---
'''
document_node = parser.parse(text)
input_configs.__contains__.return_value = True
data_source = DataSource()
rule_collector = RuleCollector(input_configs, data_source, 'error')
ruleset = rule_collector.collect(document_node)
assert len(ruleset) == 1
assert len(ruleset['env']) == 1
assert list(ruleset['env'])[0].test.value == False