import unittest
import pandas as pd
import numpy as np
from unittest.mock import patch
from ms_service_profiler.plugins.plugin_kvcache import PluginKVCacheMetrics
class TestPluginKVCacheMetrics(unittest.TestCase):
def test_parse_empty_data(self):
"""测试空数据输入"""
data = {'tx_data_df': pd.DataFrame()}
result = PluginKVCacheMetrics.parse(data)
self.assertEqual(result, data)
self.assertTrue(result['tx_data_df'].empty)
def test_parse_no_tx_data_df(self):
"""测试没有tx_data_df键的数据"""
data = {}
result = PluginKVCacheMetrics.parse(data)
self.assertEqual(result, data)
def test_parse_no_kvcache_events(self):
"""测试没有KVCache相关事件的数据"""
df = pd.DataFrame({
'domain': ['OtherDomain', 'AnotherDomain'],
'name': ['Event1', 'Event2']
})
data = {'tx_data_df': df}
result = PluginKVCacheMetrics.parse(data)
self.assertEqual(result, data)
def test_parse_kvcache_events_no_existing_columns(self):
"""测试KVCache事件,但DataFrame中没有所需列"""
df = pd.DataFrame({
'domain': ['KVCache', 'Schedule.KVCache', 'OtherDomain'],
'name': ['Event1', 'Event2', 'Event3'],
'some_other_col': [1, 2, 3]
})
data = {'tx_data_df': df}
result = PluginKVCacheMetrics.parse(data)
expected_columns = ['total_blocks', 'used_blocks', 'free_blocks',
'blocks_allocated', 'blocks_freed', 'kvcache_usage_rate']
for col in expected_columns:
self.assertIn(col, result['tx_data_df'].columns)
kvcache_rows = result['tx_data_df']['domain'].isin(['KVCache', 'Schedule.KVCache'])
self.assertTrue((result['tx_data_df'].loc[kvcache_rows, 'kvcache_usage_rate'] == 0.0).all())
self.assertTrue((result['tx_data_df'].loc[kvcache_rows, 'total_blocks'] == 0).all())
def test_parse_with_total_blocks_field(self):
"""测试包含TotalBlocks=字段的数据"""
df = pd.DataFrame({
'domain': ['KVCache', 'Schedule.KVCache', 'KVCache'],
'name': ['Event1', 'Event2', 'Event3'],
'TotalBlocks=': [100, 200, 0]
})
data = {'tx_data_df': df}
result = PluginKVCacheMetrics.parse(data)
expected_total_blocks = [100, 200, 0]
pd.testing.assert_series_equal(
result['tx_data_df']['total_blocks'],
pd.Series(expected_total_blocks, name=result['tx_data_df']['total_blocks'].name),
check_names=False
)
def test_parse_with_free_blocks_after_field(self):
"""测试包含FreeBlocksAfter=字段的数据"""
df = pd.DataFrame({
'domain': ['KVCache', 'Schedule.KVCache', 'KVCache'],
'name': ['Event1', 'Event2', 'Event3'],
'total_blocks': [100, 200, 300],
'FreeBlocksAfter=': [30, 50, 25],
'free_blocks': [0, 0, 0]
})
data = {'tx_data_df': df}
result = PluginKVCacheMetrics.parse(data)
expected_free_blocks = [30, 50, 25]
pd.testing.assert_series_equal(
result['tx_data_df']['free_blocks'],
pd.Series(expected_free_blocks, name=result['tx_data_df']['free_blocks'].name),
check_names=False
)
def test_parse_with_free_blocks_field(self):
"""测试包含FreeBlocks=字段的数据(当FreeBlocksAfter=为0时)"""
df = pd.DataFrame({
'domain': ['KVCache', 'Schedule.KVCache', 'KVCache'],
'name': ['Event1', 'Event2', 'Event3'],
'total_blocks': [100, 200, 300],
'free_blocks': [0, 0, 0],
'FreeBlocks=': [40, 60, 80]
})
data = {'tx_data_df': df}
result = PluginKVCacheMetrics.parse(data)
expected_free_blocks = [40, 60, 80]
pd.testing.assert_series_equal(
result['tx_data_df']['free_blocks'],
pd.Series(expected_free_blocks, name=result['tx_data_df']['free_blocks'].name),
check_names=False
)
def test_parse_with_usage_percent_field_high_threshold(self):
"""测试包含UsagePercent=字段的数据(高于阈值,需要转换)"""
df = pd.DataFrame({
'domain': ['KVCache', 'Schedule.KVCache'],
'name': ['Event1', 'Event2'],
'UsagePercent=': [85.0, 95.0]
})
data = {'tx_data_df': df}
result = PluginKVCacheMetrics.parse(data)
expected_usage_rate = [0.85, 0.95]
pd.testing.assert_series_equal(
result['tx_data_df']['kvcache_usage_rate'],
pd.Series(expected_usage_rate, name=result['tx_data_df']['kvcache_usage_rate'].name),
check_names=False
)
def test_parse_with_usage_percent_field_low_threshold(self):
"""测试包含UsagePercent=字段的数据(低于阈值,不需要转换)"""
df = pd.DataFrame({
'domain': ['KVCache', 'Schedule.KVCache'],
'name': ['Event1', 'Event2'],
'UsagePercent=': [0.8, 0.9]
})
data = {'tx_data_df': df}
result = PluginKVCacheMetrics.parse(data)
expected_usage_rate = [0.8, 0.9]
pd.testing.assert_series_equal(
result['tx_data_df']['kvcache_usage_rate'],
pd.Series(expected_usage_rate, name=result['tx_data_df']['kvcache_usage_rate'].name),
check_names=False
)
def test_parse_with_allocated_blocks_field_positive_negative(self):
"""测试包含AllocatedBlocks=字段的数据(正值和负值)"""
df = pd.DataFrame({
'domain': ['KVCache', 'Schedule.KVCache', 'KVCache', 'KVCache'],
'name': ['Event1', 'Event2', 'Event3', 'Event4'],
'AllocatedBlocks=': [10, -5, 0, 15],
'blocks_allocated': [0, 0, 0, 0],
'blocks_freed': [0, 0, 0, 0]
})
data = {'tx_data_df': df}
result = PluginKVCacheMetrics.parse(data)
expected_allocated = [10, 0, 0, 15]
expected_freed = [0, 5, 0, 0]
pd.testing.assert_series_equal(
result['tx_data_df']['blocks_allocated'],
pd.Series(expected_allocated, name=result['tx_data_df']['blocks_allocated'].name),
check_names=False
)
pd.testing.assert_series_equal(
result['tx_data_df']['blocks_freed'],
pd.Series(expected_freed, name=result['tx_data_df']['blocks_freed'].name),
check_names=False
)
def test_parse_with_free_blocks_before_after_fields(self):
"""测试包含FreeBlocksBefore=和FreeBlocksAfter=字段的数据"""
df = pd.DataFrame({
'domain': ['KVCache', 'Schedule.KVCache', 'KVCache'],
'name': ['Event1', 'Event2', 'Event3'],
'FreeBlocksBefore=': [100, 150, 200],
'FreeBlocksAfter=': [80, 120, 100],
'blocks_allocated': [0, 0, 0],
'blocks_freed': [0, 0, 0]
})
data = {'tx_data_df': df}
result = PluginKVCacheMetrics.parse(data)
expected_allocated = [20, 30, 100]
expected_freed = [0, 0, 0]
pd.testing.assert_series_equal(
result['tx_data_df']['blocks_allocated'],
pd.Series(expected_allocated, name=result['tx_data_df']['blocks_allocated'].name),
check_names=False
)
pd.testing.assert_series_equal(
result['tx_data_df']['blocks_freed'],
pd.Series(expected_freed, name=result['tx_data_df']['blocks_freed'].name),
check_names=False
)
@patch('ms_service_profiler.plugins.plugin_kvcache.logger')
def test_parse_exception_handling(self, mock_logger):
"""测试异常处理"""
df = pd.DataFrame({
'domain': ['KVCache'],
'name': ['Event1'],
'FreeBlocksAfter=': [float('inf')]
})
data = {'tx_data_df': df}
result = PluginKVCacheMetrics.parse(data)
self.assertEqual(result, data)
mock_logger.error.assert_called()
def test_calculate_and_update_metrics_empty_indices(self):
"""测试空索引列表的情况"""
df = pd.DataFrame({
'domain': ['OtherDomain'],
'name': ['Event1']
})
PluginKVCacheMetrics._ensure_required_columns_exist(df)
empty_indices = pd.Index([])
PluginKVCacheMetrics._calculate_and_update_metrics(df, empty_indices)
def test_calculate_metrics_vectorized_empty_input(self):
"""测试空输入数据的向量化计算"""
empty_df = pd.DataFrame(columns=['domain', 'name'])
result_df = PluginKVCacheMetrics._calculate_metrics_vectorized(empty_df)
self.assertEqual(len(result_df), 0)
required_columns = ['total_blocks', 'used_blocks', 'free_blocks',
'blocks_allocated', 'blocks_freed', 'kvcache_usage_rate']
for col in required_columns:
self.assertIn(col, result_df.columns)
def test_ensure_required_columns_exist_already_present(self):
"""测试所需列已经存在的情况"""
df = pd.DataFrame({
'domain': ['KVCache'],
'name': ['Event1'],
'total_blocks': [100],
'used_blocks': [50],
'free_blocks': [50],
'blocks_allocated': [10],
'blocks_freed': [5],
'kvcache_usage_rate': [0.5]
})
original_values = df.copy()
PluginKVCacheMetrics._ensure_required_columns_exist(df)
pd.testing.assert_frame_equal(df, original_values)
if __name__ == '__main__':
unittest.main()