import pytest
import torch
import triton
import triton.language as tl
from mindspeed.ops.triton.utils import prepare_chunk_indices, exp, assert_close, is_nvidia_hopper
from mindspeed.ops.triton.chunk_o import chunk_fwd_o
NUM_WARPS = [2, 4] if is_nvidia_hopper else [2, 4, 8]
@triton.heuristics({
'USE_G': lambda args: args['g'] is not None,
'USE_G_GAMMA': lambda args: args['g_gamma'] is not None,
'IS_VARLEN': lambda args: args['cu_seqlens'] is not None,
})
@triton.autotune(
configs=[
triton.Config({'BK': 128, 'BV': 128}, num_warps=8, num_stages=3),
triton.Config({'BK': 64, 'BV': 64}, num_warps=4, num_stages=3),
triton.Config({'BK': 32, 'BV': 32}, num_warps=2, num_stages=3),
],
key=['H', 'K', 'V', 'BT'],
)
@triton.jit(do_not_specialize=['T'])
def chunk_fwd_kernel_o(
q,
k,
v,
h,
g,
g_gamma,
o,
cu_seqlens,
chunk_indices,
scale,
T,
H: tl.constexpr,
K: tl.constexpr,
V: tl.constexpr,
BT: tl.constexpr,
BK: tl.constexpr,
BV: tl.constexpr,
USE_G: tl.constexpr,
USE_G_GAMMA: tl.constexpr,
IS_VARLEN: tl.constexpr,
):
i_v, i_t, i_bh = tl.program_id(0), tl.program_id(1), tl.program_id(2)
i_b, i_h = i_bh // H, i_bh % H
if IS_VARLEN:
i_tg = i_t
i_n, i_t = tl.load(chunk_indices + i_t * 2).to(tl.int32), tl.load(chunk_indices + i_t * 2 + 1).to(tl.int32)
bos, eos = tl.load(cu_seqlens + i_n).to(tl.int32), tl.load(cu_seqlens + i_n + 1).to(tl.int32)
T = eos - bos
NT = tl.cdiv(T, BT)
else:
NT = tl.cdiv(T, BT)
i_tg = i_b * NT + i_t
bos, eos = i_b * T, i_b * T + T
q += (bos * H + i_h) * K
k += (bos * H + i_h) * K
v += (bos * H + i_h) * V
o += (bos * H + i_h) * V
h += (i_tg * H + i_h).to(tl.int64) * K * V
b_o = tl.zeros([BT, BV], dtype=tl.float32)
b_A = tl.zeros([BT, BT], dtype=tl.float32)
for i_k in range(tl.cdiv(K, BK)):
p_q = tl.make_block_ptr(q, (T, K), (H * K, 1), (i_t * BT, i_k * BK), (BT, BK), (1, 0))
p_k = tl.make_block_ptr(k, (K, T), (1, H * K), (i_k * BK, i_t * BT), (BK, BT), (0, 1))
p_h = tl.make_block_ptr(h, (K, V), (V, 1), (i_k * BK, i_v * BV), (BK, BV), (1, 0))
b_q = tl.load(p_q, boundary_check=(0, 1))
b_k = tl.load(p_k, boundary_check=(0, 1))
b_h = tl.load(p_h, boundary_check=(0, 1))
b_o += tl.dot(b_q, b_h)
b_A += tl.dot(b_q, b_k)
if USE_G:
g += bos * H + i_h
p_g = tl.make_block_ptr(g, (T,), (H,), (i_t * BT,), (BT,), (0,))
b_g = tl.load(p_g, boundary_check=(0,))
b_o = b_o * exp(b_g)[:, None]
b_A = b_A * exp(b_g[:, None] - b_g[None, :])
if USE_G_GAMMA:
b_gamma = tl.load(g_gamma + i_h)
b_g = b_gamma * (tl.arange(0, BT) + 1)
b_o = b_o * exp(b_g)[:, None]
b_A = b_A * exp(b_g[:, None] - b_g[None, :])
o_t = i_t * BT + tl.arange(0, BT)
m_t = o_t < T
m_A = (o_t[:, None] >= o_t[None, :]) & (m_t[:, None] & m_t)
b_A = tl.where(m_A, b_A, 0)
p_v = tl.make_block_ptr(v, (T, V), (H * V, 1), (i_t * BT, i_v * BV), (BT, BV), (1, 0))
p_o = tl.make_block_ptr(o, (T, V), (H * V, 1), (i_t * BT, i_v * BV), (BT, BV), (1, 0))
b_v = tl.load(p_v, boundary_check=(0, 1))
b_o = b_o * scale + tl.dot(b_A.to(b_v.dtype), b_v) * scale
tl.store(p_o, b_o.to(p_o.dtype.element_ty), boundary_check=(0, 1))
def chunk_fwd_o_ori(
q: torch.Tensor,
k: torch.Tensor,
v: torch.Tensor,
h: torch.Tensor,
g: torch.Tensor | None = None,
g_gamma: torch.Tensor | None = None,
scale: float | None = None,
cu_seqlens: torch.LongTensor | None = None,
chunk_size: int = 64,
) -> torch.Tensor:
B, T, H, K, V = *q.shape, v.shape[-1]
BT = chunk_size
chunk_indices = prepare_chunk_indices(cu_seqlens, BT) if cu_seqlens is not None else None
NT = triton.cdiv(T, BT) if cu_seqlens is None else len(chunk_indices)
if scale is None:
scale = k.shape[-1] ** -0.5
o = torch.empty_like(v)
def grid(meta):
return (
triton.cdiv(V, meta['BV']),
NT,
B * H
)
chunk_fwd_kernel_o[grid](
q=q,
k=k,
v=v,
h=h,
g=g,
g_gamma=g_gamma,
o=o,
cu_seqlens=cu_seqlens,
chunk_indices=chunk_indices,
scale=scale,
T=T,
H=H,
K=K,
V=V,
BT=BT,
)
return o
@pytest.mark.parametrize(
('B', 'T', 'H', 'K', 'HT', 'chunk_size', 'cu_seqlens'),
[
pytest.param(*test, id="B{}-T{}-H{}-K{}-HT{}-chunk_size{}-cu_seqlens{}".format(*test))
for test in [
(1, 1024, 32, 128, 64, 16, None),
(2, 1024, 32, 128, 64, 16, None),
(1, 4096, 32, 128, 256, 16, None),
(1, 1024, 32, 128, 64, 16, [0, 10, 66, 140, 229, 351, 401, 574, 684, 819, 874, 922, 1024]),
]
]
)
def test_chunk_o(B, T, H, K, HT, chunk_size, cu_seqlens):
device = "npu:0"
device_dtype = torch.float32
torch.manual_seed(42)
torch.npu.manual_seed(42)
q = torch.randn((B, T, H, K), device=device, dtype=device_dtype)
k = torch.randn((B, T, H, K), device=device, dtype=device_dtype)
v_new = torch.randn((B, T, H, K), device=device, dtype=device_dtype)
h = torch.randn((B, HT, H, K, K), device=device, dtype=device_dtype)
g = torch.randn((B, T, H), device=device, dtype=device_dtype)
scale = 0.08838834764831845
if cu_seqlens is not None:
cu_seqlens = torch.LongTensor(cu_seqlens).to(device)
ref_o = chunk_fwd_o_ori(
q=q,
k=k,
v=v_new,
h=h,
g=g,
scale=scale,
cu_seqlens=cu_seqlens,
chunk_size=chunk_size,
)
out_o = chunk_fwd_o(
q=q,
k=k,
v=v_new,
h=h,
g=g,
scale=scale,
cu_seqlens=cu_seqlens,
chunk_size=chunk_size,
)
print("o diff:", torch.max(torch.abs(ref_o - out_o)))
assert_close('o', ref_o, out_o, 0.001)