import pandas as pd
from ms_service_profiler.plugins.base import PluginBase
from ms_service_profiler.utils.timer import timer
from ms_service_profiler.utils.log import logger
class PluginKVCacheMetrics(PluginBase):
name = "plugin_kvcache_metrics"
depends = ["plugin_timestamp"]
PERCENTAGE_THRESHOLD = 1.0
PERCENTAGE_CONVERSION_FACTOR = 100.0
@classmethod
@timer(logger.debug)
def parse(cls, data):
"""
处理KVCache数据并计算指标,直接添加到原始DataFrame中
"""
tx_data_df = data.get('tx_data_df')
if tx_data_df is None:
logger.debug("Skip KVCache metrics calculation because tx_data_df is unavailable for current input")
return data
if tx_data_df.empty:
logger.debug("Skip KVCache metrics calculation because tx_data_df is empty")
return data
kvcache_domains = ['KVCache', 'Schedule.KVCache']
kvcache_mask = tx_data_df['domain'].isin(kvcache_domains)
if not kvcache_mask.any():
logger.debug("No KVCache related events found")
return data
try:
cls._ensure_required_columns_exist(tx_data_df)
kvcache_indices = tx_data_df[kvcache_mask].index
cls._calculate_and_update_metrics(tx_data_df, kvcache_indices)
logger.debug(f"KVCache metrics calculated for {len(kvcache_indices)} rows")
logger.debug(f"Available columns after plugin: {tx_data_df.columns.tolist()}")
except Exception as e:
logger.error(f"Error calculating KVCache metrics: {e}")
return data
@classmethod
def _ensure_required_columns_exist(cls, tx_data_df):
"""确保所需列存在"""
required_columns = ['total_blocks', 'used_blocks', 'free_blocks',
'blocks_allocated', 'blocks_freed', 'kvcache_usage_rate']
for col in required_columns:
if col not in tx_data_df.columns:
if col == 'kvcache_usage_rate':
tx_data_df[col] = 0.0
else:
tx_data_df[col] = 0
@classmethod
def _calculate_and_update_metrics(cls, tx_data_df, kvcache_indices):
if len(kvcache_indices) == 0:
return
kvcache_data = tx_data_df.loc[kvcache_indices]
metrics_df = cls._calculate_metrics_vectorized(kvcache_data)
for col in metrics_df.columns:
tx_data_df.loc[kvcache_indices, col] = metrics_df[col]
@classmethod
def _calculate_metrics_vectorized(cls, kvcache_data):
"""向量化计算所有KVCache指标"""
metrics_df = pd.DataFrame(index=kvcache_data.index)
metrics_df['total_blocks'] = 0
metrics_df['used_blocks'] = 0
metrics_df['free_blocks'] = 0
metrics_df['blocks_allocated'] = 0
metrics_df['blocks_freed'] = 0
metrics_df['kvcache_usage_rate'] = 0.0
cls._calculate_total_blocks_vectorized(kvcache_data, metrics_df)
cls._calculate_used_and_free_blocks_vectorized(kvcache_data, metrics_df)
cls._calculate_block_changes_vectorized(kvcache_data, metrics_df)
cls._calculate_usage_rate_vectorized(kvcache_data, metrics_df)
return metrics_df
@classmethod
def _calculate_total_blocks_vectorized(cls, kvcache_data, metrics_df):
"""向量化计算总块数"""
if 'TotalBlocks=' in kvcache_data.columns:
total_blocks_series = kvcache_data['TotalBlocks=']
converted_values = pd.to_numeric(total_blocks_series, errors='coerce').fillna(0).astype(int)
metrics_df['total_blocks'] = converted_values
@classmethod
def _calculate_used_and_free_blocks_vectorized(cls, kvcache_data, metrics_df):
"""向量化计算已使用和空闲块数"""
if 'FreeBlocksAfter=' in kvcache_data.columns:
free_after_series = kvcache_data['FreeBlocksAfter=']
converted_values = pd.to_numeric(free_after_series, errors='coerce')
valid_mask = converted_values.notna()
if valid_mask.any():
converted_values_int = converted_values.astype(int)
metrics_df.loc[valid_mask, 'free_blocks'] = converted_values_int[valid_mask]
metrics_df.loc[valid_mask, 'used_blocks'] = (
metrics_df.loc[valid_mask, 'total_blocks'] - converted_values_int[valid_mask]
)
if 'FreeBlocks=' in kvcache_data.columns:
free_blocks_series = kvcache_data['FreeBlocks=']
converted_values = pd.to_numeric(free_blocks_series, errors='coerce')
free_blocks_is_zero = (metrics_df['free_blocks'] == 0)
free_blocks_valid = converted_values.notna()
mask = free_blocks_is_zero & free_blocks_valid
if mask.any():
converted_values_int = converted_values[mask].astype(int)
metrics_df.loc[mask, 'free_blocks'] = converted_values_int[mask]
metrics_df.loc[mask, 'used_blocks'] = (
metrics_df.loc[mask, 'total_blocks'] - converted_values_int[mask]
)
@classmethod
def _calculate_block_changes_vectorized(cls, kvcache_data, metrics_df):
"""向量化计算块数变化"""
if 'FreeBlocksBefore=' in kvcache_data.columns and 'FreeBlocksAfter=' in kvcache_data.columns:
free_before_series = kvcache_data['FreeBlocksBefore=']
free_after_series = kvcache_data['FreeBlocksAfter=']
free_before_converted = pd.to_numeric(free_before_series, errors='coerce')
free_after_converted = pd.to_numeric(free_after_series, errors='coerce')
valid_mask = (free_before_converted.notna() & free_after_converted.notna())
if valid_mask.any():
free_before_int = free_before_converted.astype(int)
free_after_int = free_after_converted.astype(int)
net_block_change = free_before_int - free_after_int
metrics_df.loc[valid_mask, 'blocks_allocated'] = net_block_change[valid_mask].clip(lower=0)
metrics_df.loc[valid_mask, 'blocks_freed'] = (-net_block_change[valid_mask]).clip(lower=0)
if 'AllocatedBlocks=' in kvcache_data.columns:
allocated_series = kvcache_data['AllocatedBlocks=']
allocated_converted = pd.to_numeric(allocated_series, errors='coerce')
allocated_valid = allocated_converted.notna()
if allocated_valid.any():
allocated_values = allocated_converted[allocated_valid]
positive_mask = allocated_values > 0
positive_indices = allocated_values[positive_mask].index
metrics_df.loc[positive_indices, 'blocks_allocated'] = allocated_values[positive_mask].astype(int)
negative_mask = allocated_values < 0
negative_indices = allocated_values[negative_mask].index
metrics_df.loc[negative_indices, 'blocks_freed'] = (-allocated_values[negative_mask]).astype(int)
@classmethod
def _calculate_usage_rate_vectorized(cls, kvcache_data, metrics_df):
"""向量化计算使用率"""
if 'UsagePercent=' in kvcache_data.columns:
usage_percent_series = kvcache_data['UsagePercent=']
usage_value_converted = pd.to_numeric(usage_percent_series, errors='coerce')
usage_value_valid = usage_value_converted.notna()
convert_mask = usage_value_converted > cls.PERCENTAGE_THRESHOLD
mask_to_convert = usage_value_valid & convert_mask
if mask_to_convert.any():
metrics_df.loc[mask_to_convert, 'kvcache_usage_rate'] = (
usage_value_converted[mask_to_convert] / cls.PERCENTAGE_CONVERSION_FACTOR
)
mask_no_convert = usage_value_valid & ~convert_mask
if mask_no_convert.any():
metrics_df.loc[mask_no_convert, 'kvcache_usage_rate'] = usage_value_converted[mask_no_convert]
if 'UsagePercent=' in kvcache_data.columns:
no_usage_mask = kvcache_data['UsagePercent='].isna() | kvcache_data['UsagePercent='].isnull()
else:
no_usage_mask = pd.Series(True, index=kvcache_data.index)
if no_usage_mask.any():
total_blocks = metrics_df.loc[no_usage_mask, 'total_blocks']
used_blocks = metrics_df.loc[no_usage_mask, 'used_blocks']
total_blocks_safe = total_blocks.where(total_blocks > 0, 1)
calculated_rate = used_blocks / total_blocks_safe
metrics_df.loc[no_usage_mask, 'kvcache_usage_rate'] = calculated_rate