import logging
import pandas as pd
from serving_cast.utils import rank_pd_ratio_rows
logger = logging.getLogger(__name__)
class PDRatioThroughputOptimizer:
"""Optimizer for Prefill-Decode ratio throughput optimization.
This optimizer combines independent P and D optimization results,
calculates QPS and PD ratio, and outputs Top N configurations.
QPS Formulas:
P QPS = p_concurrency / ttft * 1000 (req/s)
D QPS = d_concurrency / (tpot * max(output_length - 1, 1)) * 1000 (req/s)
PD Ratio Calculation:
PD ratio = D_QPS / P_QPS
"""
def __init__(self, output_length: int):
"""Initialize the PD ratio optimizer.
Args:
output_length: The expected output length for D QPS calculation.
"""
self.output_length = output_length
self._p_df: pd.DataFrame = None
self._d_df: pd.DataFrame = None
self._result_df: pd.DataFrame = None
def set_p_results(self, df: pd.DataFrame):
self._p_df = df
def set_d_results(self, df: pd.DataFrame):
self._d_df = df
def optimize(self) -> pd.DataFrame:
"""Run PD ratio optimization.
Combines all P and D results, calculates QPS and PD ratio for each
combination, and returns sorted Top N results.
Returns:
DataFrame with PD ratio results sorted by PD_RATIO_RANK_KEYS.
"""
if self._p_df is None or self._p_df.empty:
self._result_df = pd.DataFrame()
return self._result_df
if self._d_df is None or self._d_df.empty:
self._result_df = pd.DataFrame()
return self._result_df
p_df = self._p_df.copy()
p_df = p_df[p_df["ttft"] > 0]
p_df["p_qps"] = p_df["concurrency"] / p_df["ttft"] * 1000
p_df = p_df[p_df["p_qps"] > 0]
d_df = self._d_df.copy()
d_df = d_df[d_df["tpot"] > 0]
d_df["d_qps"] = d_df["concurrency"] / (d_df["tpot"] * max(self.output_length - 1, 1)) * 1000
d_df = d_df[d_df["d_qps"] > 0]
if p_df.empty or d_df.empty:
self._result_df = pd.DataFrame()
return self._result_df
merged = p_df.merge(d_df, how="cross", suffixes=("_p", "_d"))
merged["pd_ratio"] = merged["d_qps"] / merged["p_qps"]
merged["balanced_qps"] = merged[["p_qps", "d_qps"]].min(axis=1)
result_cols = [
"pd_ratio",
"p_qps",
"d_qps",
"balanced_qps",
"ttft_p",
"tpot_d",
"parallel_p",
"parallel_d",
"num_devices_p",
"num_devices_d",
"batch_size_p",
"batch_size_d",
"concurrency_p",
"concurrency_d",
]
self._result_df = rank_pd_ratio_rows(merged[result_cols]).reset_index(drop=True)
return self._result_df