from collections.abc import Callable, Sequence
from copy import deepcopy
from typing import Any
import torch.fx as fx
from torch._inductor.decomposition import select_decomp_table
from vllm.compilation.passes.fx_utils import OpOverload
from vllm.config import get_current_vllm_config
from vllm_ascend.compilation.compiler_interface import compile_fx
class TestBackend:
"""
A custom compilation backend for testing operator fusion passes.
It applies the AddRMSNormQuantFusionPass during graph compilation and
records the FX graph before and after the transformation.
"""
def __init__(self, custom_passes: list[Any] | None = None):
vllm_config = get_current_vllm_config()
compile_config = vllm_config.compilation_config
self.inductor_config = compile_config.inductor_compile_config
self.inductor_config["graph_fusion_manager"] = self.post_pass
self.custom_passes = custom_passes
self.graph_pre_pass = None
self.graph_post_pass = None
def post_pass(self, graph: fx.Graph, runtime_shape: int | None = None) -> fx.Graph:
"""
Apply custom graph transformation passes.
"""
self.graph_pre_pass = deepcopy(graph)
if self.custom_passes is not None:
for pass_ in self.custom_passes:
pass_(graph)
self.graph_post_pass = deepcopy(graph)
return graph
def compile(
self,
graph: fx.GraphModule,
example_inputs: list[Any],
compiler_config: dict[str, Any],
runtime_shape: int | None = None,
key: str | None = None,
) -> tuple[Callable | None, Any | None]:
"""
Compile the FX graph using vLLM's Ascend compiler interface.
Wraps the post-pass logic into the inner_compile callback.
"""
def compile_inner(graph, example_inputs):
current_pass_manager = compiler_config["graph_fusion_manager"]
return current_pass_manager(graph, runtime_shape)
decompositions = select_decomp_table()
compiled_fn = compile_fx(
graph=graph,
example_inputs=example_inputs,
inner_compile=compile_inner,
decompositions=decompositions,
)
return compiled_fn, None
def __call__(self, gm: fx.GraphModule, example_inputs: list[Any] | None):
"""
Make the backend callable by torch.compile().
Returns a compiled executable function.
"""
assert example_inputs is not None
compiled_fn, _ = self.compile(
gm,
example_inputs,
compiler_config={"graph_fusion_manager": self.post_pass},
runtime_shape=None,
key=None,
)
return compiled_fn
def find_nodes_by_target(self, graph: fx.GraphModule, target: OpOverload) -> list[fx.Node]:
"""Helper to find all FX nodes that call a specific operator."""
return [node for node in graph.graph.nodes if hasattr(node, "target") and node.target == target]
def op_count(self, op: OpOverload, before: bool = False) -> int:
"""Return the number of nodes that call the given operator."""
graph = self.graph_pre_pass if before else self.graph_post_pass
return len(self.find_nodes_by_target(graph, op))
def check_before_ops(self, ops: Sequence[OpOverload], fully_replaced: bool = True):
"""
Verify that the original (unfused) operators exist before the pass
and are fully removed afterward (if fully_replaced=True).
"""
for op in ops:
num_pre = len(self.find_nodes_by_target(self.graph_pre_pass, op))
num_post = len(self.find_nodes_by_target(self.graph_post_pass, op))
print(f"Op {op}: pre={num_pre}, post={num_post}")
assert num_pre > 0, f"Op {op} not found in pre-pass graph"
if fully_replaced:
assert num_post == 0, f"Unexpected op {op} in post-pass graph: {num_post} nodes remain"
def check_after_ops(self, ops: Sequence[OpOverload]):
"""Verify that the fused operator appears in the transformed graph."""
for op in ops:
num_post = len(self.find_nodes_by_target(self.graph_post_pass, op))
print(f"Op {op}: post={num_post}")
assert num_post > 0, f"Op {op} not found in post-pass graph"