import unittest
from unittest.mock import patch, MagicMock
from msprof_analyze.advisor.analyzer.cluster.slow_rank_analyzer import SlowRankAnalyzer
from msprof_analyze.advisor.dataset.cluster.cluster_dataset import ClusterStepTraceTimeDataset
class TestSlowRankAnalyzer(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.collection_path = "test_collection_path"
cls.mock_step_trace_dict = {
'-1_0': [59862.52, 115034.42, 1252571.94],
'-1_1': [60029.1, 971241.44, 396442.14],
'-1_2': [60123.45, 116789.32, 1256789.12],
'-1_3': [59987.32, 115678.90, 1253456.78]
}
cls.mock_stages = [[0, 1, 2, 3]]
cls.mock_dataset = MagicMock()
cls.mock_dataset.get_key.return_value = ClusterStepTraceTimeDataset.get_key()
cls.mock_dataset.get_data.return_value = cls.mock_step_trace_dict
cls.mock_dataset.get_stages.return_value = cls.mock_stages
def setUp(self):
with patch('msprof_analyze.advisor.analyzer.base_analyzer.BaseAnalyzer.init_dataset_list'), \
patch('msprof_analyze.advisor.analyzer.base_analyzer.BaseAnalyzer.get_first_data_by_key',
return_value=self.mock_dataset):
self.analyzer = SlowRankAnalyzer(self.collection_path)
def test_init_with_valid_data_then_initialize_correctly(self):
self.assertEqual(self.analyzer.step_trace_dict, self.mock_step_trace_dict)
self.assertEqual(self.analyzer.stages, self.mock_stages)
self.assertEqual(self.analyzer.bottelneck, '')
self.assertEqual(self.analyzer.suggestion, '')
self.assertEqual(self.analyzer._steps, set(['-1']))
def test_compute_max_gap_ratio_with_non_zero_mean_then_return_correct_ratio(self):
data = [14242056.739999993, 14311412.460000006]
mean = (14242056.739999993 + 14311412.460000006) / 2
expected_ratio = (14311412.460000006 - 14242056.739999993) / mean
self.assertAlmostEqual(self.analyzer.compute_max_gap_ratio(data, mean), expected_ratio)
def test_compute_max_gap_ratio_with_zero_mean_then_return_zero(self):
data = [0, 0, 0, 0]
mean = 0
self.assertEqual(self.analyzer.compute_max_gap_ratio(data, mean), 0)
def test_format_details_with_valid_data_then_return_formatted_details(self):
details = self.analyzer.format_details()
expected_headers = ["step", "rank_id", "compute(us)", "communication(us)", "free(us)"]
self.assertEqual(details["headers"], expected_headers)
self.assertEqual(len(details["data"]), 4)
self.assertEqual(len(details["data"][0]), 5)
self.assertEqual(self.analyzer._steps, {'-1'})
def test_get_step_duration_with_valid_rank_then_return_correct_duration(self):
duration = self.analyzer.get_step_duration(0, -1)
expected_duration = 59862.52 + 115034.42 + 1252571.94
self.assertAlmostEqual(duration, expected_duration)
def test_get_step_duration_with_invalid_rank_then_return_zero(self):
duration = self.analyzer.get_step_duration(999)
self.assertEqual(duration, 0.0)
def test_get_global_step_rank_with_free_dimension_then_return_rank_info(self):
result = self.analyzer.get_global_step_rank("free(us)")
self.assertIn("maximum", result)
self.assertIn("minimum", result)
self.assertEqual(result["maximum"]["rank_id"], 2)
self.assertEqual(result["minimum"]["rank_id"], 1)
def test_process_with_significant_differences_then_identify_bottlenecks(self):
self.analyzer.process()
self.assertIn("通信", self.analyzer.bottelneck)
self.assertIn("空闲", self.analyzer.bottelneck)
self.assertIn("集群中的通信有问题", self.analyzer.bottelneck)
self.assertIn("因为通信时间的最大差距已经达到", self.analyzer.bottelneck)
self.assertIn("856.207ms", self.analyzer.bottelneck)
self.assertIn("集群中的空闲有问题", self.analyzer.bottelneck)
self.assertIn("因为空闲时间的最大差距已经达到", self.analyzer.bottelneck)
self.assertIn("860.347ms", self.analyzer.bottelneck)
def test_process_with_no_significant_differences_then_report_no_issues(self):
mock_no_diff = {
'-1_0': [100, 100, 100],
'-1_1': [100, 100, 100],
'-1_2': [100, 100, 100],
'-1_3': [100, 100, 100]
}
self.analyzer.step_trace_dict = mock_no_diff
self.analyzer.bottelneck = ''
self.analyzer.process()
self.assertIn("没有慢节点问题", self.analyzer.bottelneck)
def test_optimize_with_valid_data_then_return_optimize_result(self):
expected_problem_header = ['category', 'description', 'suggestion', 'problem count', 'total_time(us)',
'time ratio', 'income(us)', 'income ratio']
expected_details_header = ['step', 'rank_id', 'compute(us)', 'communication(us)', 'free(us)']
result = self.analyzer.optimize(template_key="overall")
slow_rank_res = dict(result.data)
problems = slow_rank_res.get("问题综述", {})
self.assertEqual(len(problems), 2)
self.assertEqual(problems.get("headers"), expected_problem_header)
details = slow_rank_res.get("慢卡分析", {})
self.assertEqual(len(details), 2)
self.assertEqual(details.get("headers"), expected_details_header)
def test_get_stage_step_rank_with_free_dimension_then_return_stage_rank_info(self):
details = self.analyzer.format_details()
result = self.analyzer.get_stage_step_rank("free(us)")
self.assertIn("stage-0", result)
stage_result = result["stage-0"]
self.assertIn("maximum", stage_result)
self.assertIn("minimum", stage_result)
self.assertIn("rank_id", stage_result["maximum"])
self.assertIn("step", stage_result["maximum"])
self.assertIn("rank_id", stage_result["minimum"])
self.assertIn("step", stage_result["minimum"])
self.assertEqual(stage_result["maximum"]["rank_id"], 2)
self.assertEqual(stage_result["maximum"]["step"], -1)
self.assertEqual(stage_result["minimum"]["rank_id"], 1)
self.assertEqual(stage_result["minimum"]["step"], -1)
def test_get_stage_step_rank_with_invalid_dimension_then_return_empty_dict(self):
result = self.analyzer.get_stage_step_rank("invalid_dimension")
self.assertEqual(result, {})
def test_get_stage_step_rank_with_empty_format_datas_then_return_empty_dict(self):
self.analyzer.format_datas = {}
result = self.analyzer.get_stage_step_rank("compute(us)")
self.assertEqual(result, {})
def test_get_stage_step_rank_no_significant_difference(self):
mock_no_diff = {
'-1_0': [100, 100, 100],
'-1_1': [100, 100, 100],
'-1_2': [100, 100, 100],
'-1_3': [100, 100, 100]
}
self.analyzer.step_trace_dict = mock_no_diff
self.analyzer.format_datas = self.analyzer.format_details()
result = self.analyzer.get_stage_step_rank("compute(us)")
self.assertEqual(result, {})