* Copyright (c) 2025 Huawei Technologies Co., Ltd.
* This program is free software, you can redistribute it and/or modify it under the terms and conditions of
* CANN Open Software License Agreement Version 2.0 (the "License").
* Please refer to the License for details. You may not use this file except in compliance with the License.
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED,
* INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE.
* See LICENSE in the root of the software repository for the full text of the License.
*/
* \file mte_comm.h
* \brief quant_all_reduce 与quant_reduce_scatter MTE通信相关 kernel代码公共部分
*/
#ifndef MTE_COMMON_H
#define MTE_COMMON_H
#include "adv_api/hccl/hccl.h"
#include "adv_api/reduce/sum.h"
#include "../../common/op_kernel/moe_distribute_base.h"
namespace QuantMTECommImpl {
using namespace AscendC;
constexpr static uint32_t UB_ALIGN_BYTES = 32U;
constexpr uint32_t FLOAT_UB_ALIGN_NUM = 8U;
constexpr uint32_t BUFFER_NUM = 2U;
constexpr static uint32_t SCALE_BLCOK_BYTES = 32U;
constexpr static uint32_t X_BLOCK_BYTES = 1024U;
constexpr static uint64_t STATE_WIN_SIZE = 1024UL * 1024UL;
constexpr static uint64_t SINGLE_STATE_REGION_SIZE = 462UL * 1024UL;
constexpr static uint32_t ZERONE_STATE_POS = 0U;
constexpr static uint64_t WIN_ADDR_ALIGN = 512UL;
#define TemplateTypeClass typename XType, typename ScalesType, typename OutputType
#define TemplateType XType, ScalesType, OutputType
template<TemplateTypeClass>
class MTECommunication {
public:
__aicore__ inline MTECommunication() {};
__aicore__ inline void InitHcclContext();
__aicore__ inline void InitParams();
__aicore__ inline void InitGMTensor(GM_ADDR x, GM_ADDR scales, GM_ADDR output, uint64_t alignedXSize, uint64_t dataSpaceGmSize);
__aicore__ inline void InitBuffer(TPipe *tPipe);
__aicore__ inline void SetBlockSize(uint32_t elementsPerBlock, uint64_t aivNum, uint64_t lastBlockNum,
uint64_t scaleNumsPerBlock = 0);
template <bool isReduceScatter = false>
__aicore__ inline void CopyDataToWin(uint64_t xSliceSizeNums = 0, uint64_t scaleSliceNums = 0);
__aicore__ inline void WriteStatusToWin();
__aicore__ inline void ReadStatus();
__aicore__ inline void CopyResultToOutput(uint64_t outOffsetGM, LocalTensor<float>& localResultTensor, uint32_t count);
__aicore__ inline void ComputeTailAivId(uint64_t totalAivCount);
__aicore__ inline GM_ADDR GetWinAddrGm(uint32_t rankId, uint64_t offset = 0);
__aicore__ inline GM_ADDR GetWinDataAddrGm(uint32_t rankId, uint32_t winFlag);
__aicore__ inline GM_ADDR GetWinStatusAddrGm(uint32_t rankId, uint32_t winFlag);
__gm__ Mc2Kernel::HcclOpParam *hcclContext_;
uint32_t aivId_{0};
uint64_t aivNum_{0};
uint32_t round_{0};
uint32_t tailBlockNums_{0};
uint32_t assignedBlockNums_{0};
uint64_t scaleNumsPerBlock_{0};
uint64_t xOffset_{0};
uint64_t scaleOffset_{0};
uint64_t lastAivId_{0};
uint32_t winBufferFlags_{0};
private:
uint32_t xNumPerBlock_{0};
uint64_t tailXNums_{0};
__aicore__ inline void CopyDataBlock(uint64_t curXOffset, uint64_t curScaleOffset, uint32_t count);
GlobalTensor<XType> xGMTensor_;
GlobalTensor<ScalesType> scalesGMTensor_;
GlobalTensor<XType> localWinXGMTensor_;
GlobalTensor<ScalesType> localWinScaleGMTensor_;
GlobalTensor<OutputType> outputTensor_;
GlobalTensor<uint32_t> selfWinFlagGMTensor_;
LocalTensor<XType> xTmpTensor_;
LocalTensor<ScalesType> scaleTmpTensor_;
LocalTensor<float> stateResetTensor_;
LocalTensor<OutputType> xOutTensor_;
TQueBind<QuePosition::VECIN, QuePosition::VECOUT, 1> xQueue_, scaleQueue_;
TQue<QuePosition::VECOUT, 1> xOutQueue_;
TBuf<> winFlagsBuf_;
TBuf<> writeStateBuf_;
TBuf<> readStateBuf_;
TBuf<> stateResetBuf_;
};
template <TemplateTypeClass>
__aicore__ inline void MTECommunication<TemplateType>::InitHcclContext()
{
hcclContext_ = (__gm__ Mc2Kernel::HcclOpParam*)GetHcclContext<HCCL_GROUP_ID_0>();
}
template <TemplateTypeClass>
__aicore__ inline void MTECommunication<TemplateType>::InitParams()
{
aivId_ = GetBlockIdx();
assignedBlockNums_ = aivId_ < tailBlockNums_ ? round_ + 1 : round_;
uint64_t blockIdx = aivId_ * round_ + (aivId_ < tailBlockNums_ ? aivId_ : tailBlockNums_);
xOffset_ = blockIdx * xNumPerBlock_;
scaleOffset_ = blockIdx * scaleNumsPerBlock_;
}
template <TemplateTypeClass>
__aicore__ inline void MTECommunication<TemplateType>::InitGMTensor(GM_ADDR x, GM_ADDR scales, GM_ADDR output, uint64_t xSize, uint64_t winSpaceGmSize)
{
xGMTensor_.SetGlobalBuffer((__gm__ XType*)x);
scalesGMTensor_.SetGlobalBuffer((__gm__ ScalesType*)scales);
outputTensor_.SetGlobalBuffer((__gm__ OutputType*)output);
#if defined(ASCENDC_OOM) && ASCENDC_OOM == 1
for(uint64_t curRank = 0; curRank < hcclContext_->rankDim; ++curRank) {
OOMCheckAddrRange(GetWinAddrGm(curRank), winSpaceGmSize);
}
#endif
uint64_t currCoreFlagOffset = 2UL * SINGLE_STATE_REGION_SIZE + aivId_ * WIN_ADDR_ALIGN;
selfWinFlagGMTensor_.SetGlobalBuffer((__gm__ uint32_t*)GetWinAddrGm(hcclContext_->rankId, currCoreFlagOffset));
LocalTensor<uint32_t> winFlagLocalTensor = winFlagsBuf_.Get<uint32_t>();
DataCopy(winFlagLocalTensor, selfWinFlagGMTensor_, UB_ALIGN_BYTES / sizeof(uint32_t));
SyncFunc<AscendC::HardEvent::MTE2_S>();
winBufferFlags_ = winFlagLocalTensor.GetValue(ZERONE_STATE_POS);
winFlagLocalTensor.SetValue(ZERONE_STATE_POS, 1 - winBufferFlags_);
SyncFunc<AscendC::HardEvent::S_MTE3>();
DataCopy(selfWinFlagGMTensor_, winFlagLocalTensor, UB_ALIGN_BYTES / sizeof(uint32_t));
GM_ADDR localDataSpaceGm = GetWinDataAddrGm(hcclContext_->rankId, winBufferFlags_);
localWinXGMTensor_.SetGlobalBuffer((__gm__ XType*)localDataSpaceGm);
localWinScaleGMTensor_.SetGlobalBuffer((__gm__ ScalesType*)(localDataSpaceGm + xSize));
}
template <TemplateTypeClass>
__aicore__ inline void MTECommunication<TemplateType>::InitBuffer(TPipe *tPipe)
{
tPipe->InitBuffer(xQueue_, BUFFER_NUM, xNumPerBlock_ * sizeof(XType));
uint64_t scaleQueueBytes = scaleNumsPerBlock_ * sizeof(ScalesType);
if (scaleQueueBytes < UB_ALIGN_BYTES) {
scaleQueueBytes = UB_ALIGN_BYTES;
}
tPipe->InitBuffer(scaleQueue_, BUFFER_NUM, scaleQueueBytes);
tPipe->InitBuffer(xOutQueue_, BUFFER_NUM, xNumPerBlock_ * sizeof(OutputType));
tPipe->InitBuffer(winFlagsBuf_, UB_ALIGN_BYTES);
tPipe->InitBuffer(writeStateBuf_, UB_ALIGN_BYTES);
tPipe->InitBuffer(readStateBuf_, hcclContext_->rankDim * UB_ALIGN_BYTES);
tPipe->InitBuffer(stateResetBuf_, hcclContext_->rankDim * UB_ALIGN_BYTES);
stateResetTensor_ = stateResetBuf_.Get<float>();
Duplicate<float>(stateResetTensor_, (float)0.0, static_cast<uint32_t>(hcclContext_->rankDim * FLOAT_UB_ALIGN_NUM));
}
* @brief 配置数据块划分参数,用于多核并行计算
*
* @param elementsPerBlock 每个标准数据块包含的X元素数量
* @param aivNum AIV核的总数,用于数据分发和负载均衡
* @param lastBlockNum 最后一个核处理的尾部数据块元素数量
*/
template <TemplateTypeClass>
__aicore__ inline void MTECommunication<TemplateType>::SetBlockSize(uint32_t elementsPerBlock, uint64_t aivNum,
uint64_t lastBlockNum, uint64_t scaleNumsPerBlock)
{
xNumPerBlock_ = elementsPerBlock;
aivNum_ = aivNum;
tailXNums_ = lastBlockNum;
scaleNumsPerBlock_ = (scaleNumsPerBlock != 0) ? scaleNumsPerBlock : (SCALE_BLCOK_BYTES / sizeof(ScalesType));
}
* @brief 计算负责处理尾部数据的AI核ID
*
* @param totalAivCount AIV核的总数量
*
* @note 当数据量较小时,此时计算仅仅只有一轮(round_ == 0),此时处理尾块的aiv并非最后一个,
* 需根据当前数据块数量计算。
*/
template <TemplateTypeClass>
__aicore__ inline void MTECommunication<TemplateType>::ComputeTailAivId(uint64_t totalAivCount)
{
if (round_ == 0) {
lastAivId_ = tailBlockNums_ - 1;
} else {
lastAivId_ = totalAivCount - 1;
}
}
* @brief 复制数据块从GM到Win区
*
* 复制流程:
* 1. 量化数据(x)复制:GM → UB(队列分配缓冲区) → Win区
* 2. 缩放因子(scale)复制:GM → UB(队列分配缓冲区) → Win区
*
* @param curXOffset 量化数据x在全局内存(GM)中的偏移量
* @param curScaleOffset 缩放因子scale在全局内存(GM)中的偏移量
* @param count 当前x数据块dataCopy的元素数量
*/
template <TemplateTypeClass>
__aicore__ inline void MTECommunication<TemplateType>::CopyDataBlock(uint64_t curXOffset, uint64_t curScaleOffset, uint32_t count)
{
xTmpTensor_ = xQueue_.AllocTensor<XType>();
DataCopy(xTmpTensor_, xGMTensor_[curXOffset], count);
xQueue_.EnQue(xTmpTensor_);
xTmpTensor_ = xQueue_.DeQue<XType>();
DataCopy(localWinXGMTensor_[curXOffset], xTmpTensor_, count);
xQueue_.FreeTensor<XType>(xTmpTensor_);
scaleTmpTensor_ = scaleQueue_.AllocTensor<ScalesType>();
DataCopyParams scaleGmToUbParams;
scaleGmToUbParams.blockLen = scaleNumsPerBlock_ * sizeof(ScalesType);
scaleGmToUbParams.blockCount = 1;
DataCopyPadParams scaleGmToUbPadParams;
DataCopyPad(scaleTmpTensor_, scalesGMTensor_[curScaleOffset], scaleGmToUbParams, scaleGmToUbPadParams);
scaleQueue_.EnQue(scaleTmpTensor_);
scaleTmpTensor_ = scaleQueue_.DeQue<ScalesType>();
DataCopyParams scaleUbToWinParams;
scaleUbToWinParams.blockLen = scaleNumsPerBlock_ * sizeof(ScalesType);
scaleUbToWinParams.blockCount = 1;
DataCopyPad(localWinScaleGMTensor_[curScaleOffset], scaleTmpTensor_, scaleUbToWinParams);
scaleQueue_.FreeTensor<ScalesType>(scaleTmpTensor_);
}
* @brief 将数据复制到Win区(支持ReduceScatter和AllReduce两种通信模式)
*
* 该函数根据通信模式将数据从全局内存GM全部复制到本卡Win区,支持两种分布式通信模式:
* 1. ReduceScatter模式:由于数据按Rank(设备)分片,需按rankDim遍历以搬运所有数据
* 2. AllReduce模式:数据全局复制,每个核直接将数据复制到Win区
*
* @tparam isReduceScatter 通信模式标识,true表示ReduceScatter模式,false表示AllReduce模式
*
* @param[in] xSliceSizeNums 每个Rank的数据分片大小(元素数量),ReduceScatter模式使用
* @param[in] scaleSliceNums 每个Rank的缩放因子分片大小(元素数量),ReduceScatter模式使用
*/
template <TemplateTypeClass>
template <bool isReduceScatter>
__aicore__ inline void MTECommunication<TemplateType>::CopyDataToWin(uint64_t xSliceSizeNums, uint64_t scaleSliceNums)
{
for (uint64_t curBlock = 0; curBlock < assignedBlockNums_; ++curBlock) {
uint64_t curXOffset = xOffset_ + curBlock * xNumPerBlock_;
uint64_t curScaleOffset = scaleOffset_ + curBlock * scaleNumsPerBlock_;
uint32_t copyBlockNum = xNumPerBlock_;
if ((aivId_ == lastAivId_) && (curBlock == assignedBlockNums_ - 1)) {
copyBlockNum = tailXNums_;
}
if constexpr (isReduceScatter) {
for(uint64_t curRank = 0; curRank < hcclContext_->rankDim; ++curRank) {
uint64_t curRankXOffset = curXOffset + curRank * xSliceSizeNums;
uint64_t curRankScaleOffset = curScaleOffset + curRank * scaleSliceNums;
CopyDataBlock(curRankXOffset, curRankScaleOffset, copyBlockNum);
}
} else {
CopyDataBlock(curXOffset, curScaleOffset, copyBlockNum);
}
}
PipeBarrier<PIPE_ALL>();
}
* @brief 向Win区状态区写入本核完成数据搬运状态
*
* 该函数负责将当前AI Core(核)的数据搬运完成状态写入到所有Rank的状态Win区中,
* 通知其他设备当前核的数据准备已完成。这是集群通信软同步机制的一部分,
* 每个核需要向所有Rank(包括本机和其他设备)的状态窗口写入标识,
* 确保所有设备都能感知到当前核的完成状态。
*/
template <TemplateTypeClass>
__aicore__ inline void MTECommunication<TemplateType>::WriteStatusToWin()
{
uint32_t coreOffset = aivId_ * hcclContext_->rankDim;
for (uint32_t curRank = 0; curRank < hcclContext_->rankDim; ++curRank) {
LocalTensor<float> statusTensor = writeStateBuf_.Get<float>();
DataCopy<float>(statusTensor, stateResetTensor_, FLOAT_UB_ALIGN_NUM);
SyncFunc<AscendC::HardEvent::MTE2_S>();
statusTensor(0) = (float)1.0;
GM_ADDR remoteWinStateGM = GetWinStatusAddrGm(curRank, winBufferFlags_);
GlobalTensor<float> stateGMTensor;
stateGMTensor.SetGlobalBuffer((__gm__ float*)remoteWinStateGM);
uint64_t curOffset = (coreOffset + hcclContext_->rankId) * FLOAT_UB_ALIGN_NUM;
SyncFunc<AscendC::HardEvent::S_MTE3>();
DataCopy(stateGMTensor[curOffset], statusTensor, FLOAT_UB_ALIGN_NUM);
SyncFunc<AscendC::HardEvent::MTE3_S>();
}
}
* @brief 读取Win区状态区并等待同步状态
*
* 该函数从本Rank的状态Win区读取同步标识,并在本地进行忙等待,直到所有参与通信的Rank
* 都已完成状态设置。这种软同步机制确保所有Rank上当前核(AI Core)所需的数据都已
* 准备就绪,从而避免跨设备数据不一致性问题。
*/
template <TemplateTypeClass>
__aicore__ inline void MTECommunication<TemplateType>::ReadStatus()
{
GM_ADDR stateGM = GetWinStatusAddrGm(hcclContext_->rankId, winBufferFlags_);
GlobalTensor<float> selfStatusWinTensor;
uint32_t offset = aivId_ * hcclContext_->rankDim * FLOAT_UB_ALIGN_NUM;
selfStatusWinTensor.SetGlobalBuffer((__gm__ float*)(stateGM));
LocalTensor<float> statusTensor = readStateBuf_.Get<float>();
float flag = 0;
uint32_t statusCnt = hcclContext_->rankDim * FLOAT_UB_ALIGN_NUM;
SumParams sumParams{1, statusCnt, statusCnt};
float minTarget = hcclContext_->rankDim - (float)0.5;
float maxTarget = hcclContext_->rankDim + (float)0.5;
while ((flag < minTarget) || (flag > maxTarget)) {
SyncFunc<AscendC::HardEvent::S_MTE2>();
DataCopy<float>(statusTensor, selfStatusWinTensor[offset], statusCnt);
SyncFunc<AscendC::HardEvent::MTE2_V>();
Sum(statusTensor, statusTensor, sumParams);
SyncFunc<AscendC::HardEvent::V_S>();
flag = statusTensor(0);
}
SyncFunc<AscendC::HardEvent::S_MTE3>();
DataCopy<float>(selfStatusWinTensor[offset], stateResetTensor_, statusCnt);
}
* @brief 将计算得到的UB上的结果Tensor的数据复制到GM上的OutPutTensor,并进行类型转换(如果需要)。
*
* @param outputOffset 输出OutputTensor的GM偏移量,用于指定目标位置。
* @param sourceTensor 本地UB上计算结果Tensor,包含计算完成的数据。
* @param count 当前每次处理数据块的元素数量
*/
template <TemplateTypeClass>
__aicore__ inline void MTECommunication<TemplateType>::CopyResultToOutput(uint64_t outOffsetGM, LocalTensor<float>& localResultTensor, uint32_t count)
{
xOutTensor_ = xOutQueue_.AllocTensor<OutputType>();
if constexpr (AscendC::IsSameType<OutputType, float>::value) {
DataCopy(xOutTensor_, localResultTensor, count);
} else {
Cast(xOutTensor_, localResultTensor, RoundMode::CAST_RINT, count);
}
xOutQueue_.EnQue(xOutTensor_);
xOutTensor_ = xOutQueue_.DeQue<OutputType>();
DataCopy(outputTensor_[outOffsetGM], xOutTensor_, count);
xOutQueue_.FreeTensor(xOutTensor_);
}
template <TemplateTypeClass>
__aicore__ inline GM_ADDR MTECommunication<TemplateType>::GetWinAddrGm(uint32_t rankId, uint64_t offset)
{
return (GM_ADDR)(hcclContext_->windowsIn[rankId] + offset);
}
template <TemplateTypeClass>
__aicore__ inline GM_ADDR MTECommunication<TemplateType>::GetWinDataAddrGm(uint32_t rankId, uint32_t winFlag)
{
if (winFlag == 0U) {
return GetWinAddrGm(rankId, STATE_WIN_SIZE);
}
else {
return (GM_ADDR)(hcclContext_->windowsOut[rankId]);
}
}
template <TemplateTypeClass>
__aicore__ inline GM_ADDR MTECommunication<TemplateType>::GetWinStatusAddrGm(uint32_t rankId, uint32_t winFlag)
{
if (winFlag == 0U) {
return GetWinAddrGm(rankId);
}
else {
return GetWinAddrGm(rankId, SINGLE_STATE_REGION_SIZE);
}
}
}
#endif