* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* MindIE is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#include "communicator.h"
#include <algorithm>
#include <chrono>
#include <functional>
#include <future>
#include <thread>
#include "config_manager.h"
#include "log.h"
#include "msServiceProfiler/msServiceProfiler.h"
#include "string_utils.h"
namespace mindie_llm {
Communicator::Communicator(std::unordered_map<std::string, std::string> &config, bool isMultiNodesInfer, int dpRankIdx,
int remoteDPRankIdx, bool intraNodeTP)
: isMultiNodesInfer_(isMultiNodesInfer),
dpRankIdx_(dpRankIdx),
remoteDPRankIdx_(remoteDPRankIdx),
intraNodeTP_(intraNodeTP) {
std::string lwdRoletype = "";
auto lwdIt = config.find("layerwiseDisaggregated");
if (lwdIt != config.end() && lwdIt->second == "true") {
layerwiseDisaggregated_ = true;
lwdRoletype = config.at("layerwiseDisaggregatedRoleType");
}
auto lwdMultiIt = config.find("layerwiseDisaggregatedMultiNodesInferEnabled");
if (lwdMultiIt != config.end() && lwdMultiIt->second == "true") {
isLwdMultiNodesInfer_ = true;
}
msRole_ = MasterSlaveRole::MASTER;
if ((isMultiNodesInfer && config.at("isMaster") == "0") or (lwdRoletype == "slave")) {
msRole_ = MasterSlaveRole::SLAVE;
}
if (isMultiNodesInfer_ && msRole_ == MasterSlaveRole::MASTER) {
std::vector<std::string> slaveIPs;
mindie_llm::Split(config.at("slaveIPs"), ",", slaveIPs);
size_t slaveCount = slaveIPs.size();
numExpectedResponses_ = intraNodeTP ? slaveCount : 1;
size_t dpNumPerNode = intraNodeTP ? 1 : std::stoul(config.at("dp")) / (slaveCount + 1);
if (intraNodeTP || static_cast<std::size_t>(dpRankIdx_) < dpNumPerNode) {
remoteSlaveIP_ = "";
} else {
remoteSlaveIP_ = slaveIPs.at(static_cast<std::size_t>(dpRankIdx_) / dpNumPerNode - 1);
}
}
if (isLwdMultiNodesInfer_ && msRole_ == MasterSlaveRole::MASTER && std::stoul(config.at("dp")) > 1) {
std::vector<std::string> slaveIPs;
mindie_llm::Split(config.at("slaveIPs"), ",", slaveIPs);
remoteSlaveIP_ = slaveIPs.at(static_cast<std::size_t>(dpRankIdx_));
}
PROF(INFO, AddMetaInfo("msRole", static_cast<int>(msRole_)));
}
bool Communicator::InitIPCCommunicators(const std::string &sharedMemPrefix, uint32_t localWorldSize) {
ShmSizeConfig executeShmConfig{SHARED_MEMORY_256MB, DEFAULT_SHARED_MEMORY_SIZE};
SemaphoreConfig semConfig{localWorldSize, localWorldSize};
ipcCommunicatorExecute_ = InitSingleIPCCommunicator(sharedMemPrefix + "_execute", semConfig, executeShmConfig);
if (ipcCommunicatorExecute_ == nullptr) {
MINDIE_LLM_LOG_ERROR("Failed to initialize IPC Communicator for Execute channel.");
return false;
}
ShmSizeConfig sharedSyncShmConfig{DEFAULT_SHARED_MEMORY_SIZE, DEFAULT_SHARED_MEMORY_SIZE};
ipcCommunicatorSharedSync_ =
InitSingleIPCCommunicator(sharedMemPrefix + "_shared_sync_link", semConfig, sharedSyncShmConfig);
if (ipcCommunicatorSharedSync_ == nullptr) {
MINDIE_LLM_LOG_ERROR("Failed to initialize IPC Communicator for Shared Link channel.");
return false;
}
ShmSizeConfig recoverCommandShmConfig{RECOVER_SHARED_MEMORY_SIZE, RECOVER_SHARED_MEMORY_SIZE};
ipcCommunicatorRecoverCommand_ =
InitSingleIPCCommunicator(sharedMemPrefix + "_recover_command", semConfig, recoverCommandShmConfig);
if (ipcCommunicatorRecoverCommand_ == nullptr) {
MINDIE_LLM_LOG_ERROR("Failed to initialize IPC Communicator for Recover Command channel.");
return false;
}
ShmSizeConfig kvTransferShmConfig{SHARED_MEMORY_256MB, DEFAULT_SHARED_MEMORY_SIZE};
ipcCommunicatorKVTransfer_ =
InitSingleIPCCommunicator(sharedMemPrefix + "_transfer", semConfig, kvTransferShmConfig, true);
if (ipcCommunicatorKVTransfer_ == nullptr) {
MINDIE_LLM_LOG_ERROR("Failed to initialize IPC Communicator for KV Transfer channel.");
return false;
}
ShmSizeConfig executeErrorShmConfig{0, ERROR_SHARED_MEMORY_SIZE};
SemaphoreConfig executeErrorSemConfig{0, 1};
ipcCommunicatorExecuteError_ =
InitSingleIPCCommunicator(sharedMemPrefix + "_execute_error", executeErrorSemConfig, executeErrorShmConfig);
if (ipcCommunicatorExecuteError_ == nullptr) {
MINDIE_LLM_LOG_ERROR("Failed to initialize IPC Communicator for Execute Error channel.");
return false;
}
return true;
}
bool Communicator::LwdGRPCCommunicatorInit(std::unordered_map<std::string, std::string> &config,
uint32_t grpcCommunicatorNum) {
auto itrFindDp = config.find("dp");
uint32_t dpNum = std::stol(itrFindDp->second);
if (dpNum == 0) {
MINDIE_LLM_LOG_ERROR("Lwd The value of dp_num must be greater than 0.");
return false;
}
if (dpNum == grpcCommunicatorNum) {
if (!grpcCommunicator_->Init(grpcCommunicatorNum)) {
MINDIE_LLM_LOG_ERROR("Lwd Failed to initialize GRPC Communicator.");
return false;
}
return true;
}
auto itrFindSlaveIPs = config.find("slaveIPs");
std::string slaveIPsStr = std::string(itrFindSlaveIPs->second);
uint32_t slaveNum = std::count(slaveIPsStr.begin(), slaveIPsStr.end(), ',') + 1;
uint32_t multiGrpcNumPerExcutor = slaveNum / dpNum;
std::vector<std::future<bool>> futures;
for (uint32_t i = 0; i < multiGrpcNumPerExcutor; i++) {
futures.push_back(std::async(std::launch::async, [&, i, grpcCommunicatorNum]() {
if (!grpcCommunicator_->Init(grpcCommunicatorNum)) {
MINDIE_LLM_LOG_ERROR("Lwd Failed to initialize GRPC Communicator:" << i << ".");
return false;
}
return true;
}));
}
for (auto &fut : futures) {
if (!fut.get()) {
MINDIE_LLM_LOG_ERROR("Lwd Failed to initialize GRPC Communicator, one of Communicator failed.");
return false;
}
}
return true;
}
bool Communicator::InitGRPCCommunicator(std::unordered_map<std::string, std::string> &config,
ResponseHandler responseFromSlaveHandler, uint32_t grpcCommunicatorNum) {
grpcCommunicator_ = GRPCCommunicator::GetInstance(config);
if (msRole_ == MasterSlaveRole::MASTER) {
if (!grpcCommunicator_->RegisterResponseHandler(responseFromSlaveHandler, dpRankIdx_)) {
MINDIE_LLM_LOG_ERROR("Failed to register response handler for master node.");
return false;
}
} else if (msRole_ == MasterSlaveRole::SLAVE) {
RequestHandler requestFromMasterHandler =
std::bind(&Communicator::SlaveNodeGRPCRequestHandler, this, std::placeholders::_1);
if (!grpcCommunicator_->RegisterRequestHandler(requestFromMasterHandler, dpRankIdx_)) {
MINDIE_LLM_LOG_ERROR("Failed to register request handler for slave node.");
return false;
}
RequestHandler recoverRequestFromMasterHandler =
std::bind(&Communicator::SlaveNodeGRPCRecoverRequestHandler, this, std::placeholders::_1);
if (!grpcCommunicator_->RegisterRecoverRequestHandler(recoverRequestFromMasterHandler, dpRankIdx_)) {
MINDIE_LLM_LOG_ERROR("Failed to register recover request handler for slave node.");
return false;
}
}
auto itrFindLwdMultiNodesEn = config.find("lwd_multi_nodes_enable");
if (itrFindLwdMultiNodesEn != config.end() && itrFindLwdMultiNodesEn->second == "true") {
return LwdGRPCCommunicatorInit(config, grpcCommunicatorNum);
}
if (!grpcCommunicator_->Init(grpcCommunicatorNum)) {
MINDIE_LLM_LOG_ERROR("Failed to initialize GRPC Communicator.");
return false;
}
return true;
}
void Communicator::RegisterModelInitReqHandler(SlaveModelInitReqHandler handler) {
slaveModelInitReqHandler_ = handler;
}
bool Communicator::SendModelInitRequestAndReceive(ExecuteRequest &request, std::vector<ExecuteResponse> &responses) {
if (!ipcCommunicatorExecute_->SendMessageViaSM(request)) {
MINDIE_LLM_LOG_ERROR("Failed to send MODEL_INIT request to local executors.");
return false;
}
if (!ipcCommunicatorExecute_->ReceiveInitResponses(responses)) {
MINDIE_LLM_LOG_ERROR("Failed to receive MODEL_INIT responses from local executors.");
return false;
}
return true;
}
bool Communicator::SendSharedSyncRequest(ExecuteRequest &request) {
if (ipcCommunicatorSharedSync_ != nullptr) {
if (!ipcCommunicatorSharedSync_->SendMessageViaSM(request)) {
MINDIE_LLM_LOG_ERROR("Failed to send a sync request to local workers.");
return false;
}
}
if (grpcCommunicator_ != nullptr) {
if (!grpcCommunicator_->SendRequest(request, dpRankIdx_, remoteDPRankIdx_, remoteSlaveIP_)) {
MINDIE_LLM_LOG_ERROR("Failed to send a sync request to remote slave node.");
return false;
}
}
return true;
}
bool Communicator::SendSharedSyncRequestAndReceive(ExecuteRequest &request, std::vector<ExecuteResponse> &responses) {
if (!SendSharedSyncRequest(request)) {
return false;
}
if (ipcCommunicatorSharedSync_ != nullptr) {
ExecuteResponse ipcResponse;
if (!ipcCommunicatorSharedSync_->ReceiveResponse(ipcResponse)) {
MINDIE_LLM_LOG_ERROR("Failed to receive a sync response from local workers.");
return false;
}
responses.emplace_back(std::move(ipcResponse));
}
if (grpcCommunicator_ != nullptr) {
if (!ReceiveSyncResponsesFromRemote(responses)) {
return false;
}
}
return true;
}
bool Communicator::ReceiveSyncResponsesFromRemote(std::vector<ExecuteResponse> &responses) {
for (uint32_t i = 0; i < numExpectedResponses_; i++) {
ExecuteResponse grpcResponse;
if (!grpcCommunicator_->GetSyncResponse(grpcResponse, dpRankIdx_)) {
MINDIE_LLM_LOG_ERROR("Failed to receive a sync response from remote slave node.");
return false;
}
responses.emplace_back(std::move(grpcResponse));
}
return true;
}
bool Communicator::SendRecoverCommandRequestAndReceive(ExecuteRequest &request,
std::vector<ExecuteResponse> &responses) {
if (ipcCommunicatorRecoverCommand_ != nullptr) {
if (!ipcCommunicatorRecoverCommand_->SendMessageViaSM(request)) {
MINDIE_LLM_LOG_ERROR("Failed to send a sync recover command request to local workers.");
return false;
}
}
if (grpcCommunicator_ != nullptr) {
if (!grpcCommunicator_->SendRequest(request, dpRankIdx_, remoteDPRankIdx_, remoteSlaveIP_)) {
MINDIE_LLM_LOG_ERROR("Failed to send a sync recover command request to remote slave node.");
return false;
}
}
if (ipcCommunicatorRecoverCommand_ != nullptr) {
if (!ipcCommunicatorRecoverCommand_->ReceiveAllRankResponses(responses)) {
MINDIE_LLM_LOG_ERROR("Failed to receive a sync recover command responses from local executors.");
return false;
}
}
if (grpcCommunicator_ != nullptr) {
if (!ReceiveSyncResponsesFromRemote(responses)) {
return false;
}
}
return true;
}
bool Communicator::SendAsyncReponseToRemote(ExecuteResponse &response) {
if (grpcCommunicator_ == nullptr) {
MINDIE_LLM_LOG_ERROR("grpcCommunicator_ is null, cannot send response to master.");
return false;
}
if (!grpcCommunicator_->SendResponse(response, dpRankIdx_, remoteDPRankIdx_)) {
MINDIE_LLM_LOG_ERROR("Slave Node: failed to send response to remote master node.");
return false;
}
return true;
}
bool Communicator::LaunchIPCHandleResponseThreads(ResponseHandler handler) {
ResponseHandler responseHandler = nullptr;
if ((isMultiNodesInfer_ || layerwiseDisaggregated_) && msRole_ == MasterSlaveRole::SLAVE) {
responseHandler = std::bind(&Communicator::SlaveNodeIPCResponseHandler, this, std::placeholders::_1);
} else {
responseHandler = handler;
}
if (!RegisterAndStartIPCHandler(ipcCommunicatorExecute_, responseHandler)) {
MINDIE_LLM_LOG_ERROR("Failed to register and start handler for Execute channel.");
return false;
}
if (!RegisterAndStartIPCHandler(ipcCommunicatorExecuteError_, responseHandler)) {
MINDIE_LLM_LOG_ERROR("Failed to register and start handler for Execute Error channel.");
return false;
}
if (!RegisterAndStartIPCHandler(ipcCommunicatorKVTransfer_, responseHandler)) {
MINDIE_LLM_LOG_ERROR("Failed to register and start handler for KV Transfer channel.");
return false;
}
if (msRole_ == MasterSlaveRole::SLAVE) {
if (!RegisterAndStartIPCHandler(ipcCommunicatorSharedSync_, responseHandler)) {
MINDIE_LLM_LOG_ERROR("Failed to register and start handler for Shared Sync channel.");
return false;
}
}
return true;
}
bool Communicator::RegisterAndStartIPCHandler(std::shared_ptr<IPCCommunicator> ipcCommunicator,
ResponseHandler handler) const {
if (!ipcCommunicator->RegisterResponseHandler(handler)) {
MINDIE_LLM_LOG_ERROR("Failed to register response handler for IPC Communicator.");
return false;
}
if (!ipcCommunicator->StartHandleResponseThread()) {
MINDIE_LLM_LOG_ERROR("Failed to start handle response thread for IPC Communicator.");
return false;
}
return true;
}
bool Communicator::SlaveNodeGRPCRequestHandler(ExecuteRequest &request) {
if (request.execute_type() == REMOTE_MODEL_INIT) {
std::map<std::string, std::string> pdInfo;
auto &initRequest = request.remote_model_init_request();
for (const auto &pair : initRequest.pd_info()) {
pdInfo[pair.first] = pair.second;
}
if (slaveModelInitReqHandler_ == nullptr || !slaveModelInitReqHandler_(pdInfo)) {
MINDIE_LLM_LOG_ERROR("Slave Node: failed to handle model init request from master node.");
return false;
}
} else {
if (!SendAsyncRequestToLocal(request)) {
MINDIE_LLM_LOG_ERROR("Slave Node: failed to send asynchronous request to local workers.");
return false;
}
}
return true;
}
bool Communicator::SlaveNodeGRPCRecoverRequestHandler(ExecuteRequest &request) {
if (ipcCommunicatorRecoverCommand_ != nullptr) {
if (!ipcCommunicatorRecoverCommand_->SendMessageViaSM(request)) {
MINDIE_LLM_LOG_ERROR("Failed to send a sync recover command request to local workers.");
return false;
}
}
std::vector<ExecuteResponse> responses;
if (ipcCommunicatorRecoverCommand_ != nullptr) {
if (!ipcCommunicatorRecoverCommand_->ReceiveAllRankResponses(responses)) {
MINDIE_LLM_LOG_ERROR("Failed to receive a sync recover command responses from local executors.");
return false;
}
}
ExecuteResponse response = AggregateToOneResponse(responses);
return SendAsyncReponseToRemote(response);
}
ExecuteResponse Communicator::AggregateToOneResponse(const std::vector<ExecuteResponse> &responses) {
uint32_t success_result = 0;
for (const auto &response : responses) {
if (response.recover_command_response().command_result() != success_result) {
return response;
}
}
return responses[0];
}
bool Communicator::SlaveNodeIPCResponseHandler(ExecuteResponse &response) {
if (layerwiseDisaggregated_) {
return true;
}
if (intraNodeTP_ && response.msg_type() != PD_LINK && response.msg_type() != PD_LINK_STATUS_QUERY &&
response.msg_type() != RECOVER_COMMAND_EXEC && response.msg_type() != START_COMMAND_EXEC &&
response.msg_type() != PAUSE_COMMAND_EXEC && response.msg_type() != CLEAR_COMMAND_EXEC &&
response.msg_type() != EXECUTE_ERROR) {
return true;
}
return SendAsyncReponseToRemote(response);
}
bool Communicator::GRPCGetSyncResponse(ExecuteResponse &response) {
if (grpcCommunicator_ == nullptr) {
MINDIE_LLM_LOG_ERROR("grpcCommunicator_ is null, cannot get sync response.");
return false;
}
return grpcCommunicator_->GetSyncResponse(response, dpRankIdx_);
}
std::unique_ptr<IPCCommunicator> Communicator::InitSingleIPCCommunicator(const std::string &sharedMemName,
const SemaphoreConfig &semConfig,
const ShmSizeConfig &shmSizeConfig,
bool receiveAllRank) const {
std::unique_ptr<IPCCommunicator> ipcCommunicator =
std::make_unique<IPCCommunicator>(sharedMemName, semConfig, receiveAllRank);
if (!ipcCommunicator->SetupChannel(shmSizeConfig)) {
MINDIE_LLM_LOG_ERROR("Failed to initialize Execute channel.");
return nullptr;
}
return ipcCommunicator;
}
bool Communicator::SendAsyncRequest(ExecuteRequest &request) {
if (isMultiNodesInfer_ && msRole_ == MasterSlaveRole::SLAVE) {
MINDIE_LLM_LOG_ERROR("Slave nodes cannot call SendAsyncRequest themselves.");
return false;
}
if (ipcCommunicatorExecute_ != nullptr) {
auto spanLocal = PROF(INFO, Domain("Executor").SpanStart("SendRequestToLocal"));
if (!SendAsyncRequestToLocal(request)) {
MINDIE_LLM_LOG_ERROR("Failed to send asynchronous request to local workers.");
PROF(spanLocal.SpanEnd());
return false;
}
PROF(spanLocal.SpanEnd());
}
if (grpcCommunicator_ != nullptr) {
auto spanRemote = PROF(INFO, Domain("Executor").SpanStart("SendRequestToRemote"));
if (request.execute_type() == MODEL_INFER && remoteDPRankIdx_ != 0) {
PROF(spanRemote.SpanEnd());
return true;
}
if (!SendAsyncRequestToRemote(request)) {
MINDIE_LLM_LOG_ERROR("Failed to send asynchronous request to remote workers.");
PROF(spanRemote.SpanEnd());
return false;
}
PROF(spanRemote.SpanEnd());
}
return true;
}
bool Communicator::SendAsyncRequestToLocal(ExecuteRequest &request) {
std::vector<IPCCommunicator *> targets;
if (request.execute_type() == MODEL_FINALIZE) {
targets = {ipcCommunicatorExecute_.get(), ipcCommunicatorKVTransfer_.get(), ipcCommunicatorSharedSync_.get()};
} else if (request.execute_type() == MODEL_INFER || request.execute_type() == TEXT_GENERATOR_CLEANUP ||
request.execute_type() == EOS_CLEANUP) {
targets = {ipcCommunicatorExecute_.get()};
} else if (request.execute_type() == KV_TRANSFER) {
targets = {ipcCommunicatorKVTransfer_.get()};
} else if (request.execute_type() == PD_LINK || request.execute_type() == PD_LINK_STATUS_QUERY ||
request.execute_type() == RECOVER_COMMAND_EXEC || request.execute_type() == START_COMMAND_EXEC ||
request.execute_type() == PAUSE_COMMAND_EXEC || request.execute_type() == CLEAR_COMMAND_EXEC) {
targets = {ipcCommunicatorSharedSync_.get()};
} else {
MINDIE_LLM_LOG_ERROR("Unsupported execute type for asynchronous request: " << request.execute_type());
return false;
}
for (IPCCommunicator *comm : targets) {
if (!comm->SendMessageViaSM(request)) {
MINDIE_LLM_LOG_ERROR(
"Failed to send asynchronous request to local workers (type: " << request.execute_type() << ").");
return false;
}
}
return true;
}
bool Communicator::SendAsyncRequestToRemote(ExecuteRequest &request) {
auto &configManager = mindie_llm::ConfigManager::GetInstance();
if (configManager.IslayerwiseDisaggregated() && request.has_execute_model_request()) {
model_execute_data::ExecuteModelRequest *modelReq = request.mutable_execute_model_request();
for (int i = 0; i < modelReq->seq_group_metadata_list_size(); ++i) {
model_execute_data::SequenceGroupMetadata *meta = modelReq->mutable_seq_group_metadata_list(i);
if (!meta->has_prompt_token_ids()) {
continue;
}
const std::string &raw = meta->prompt_token_ids();
size_t num = raw.size() / sizeof(TokenId);
const TokenId fill_value = 100L;
std::vector<TokenId> tmp(num, fill_value);
meta->set_prompt_token_ids(tmp.data(), tmp.size() * sizeof(TokenId));
}
}
if (layerwiseDisaggregated_ && !isLwdMultiNodesInfer_ && dpRankIdx_ > 0) {
return true;
}
if (!grpcCommunicator_->SendRequest(request, dpRankIdx_, remoteDPRankIdx_, remoteSlaveIP_)) {
MINDIE_LLM_LOG_ERROR("Failed to send request from DP " << dpRankIdx_ << " to remote DP " << remoteDPRankIdx_);
return false;
}
return true;
}
void Communicator::CleanUp() {
if (handleExecuteErrorThread_ && handleExecuteErrorThread_->joinable()) {
handleExecuteErrorThread_->join();
handleExecuteErrorThread_.reset();
}
if (ipcCommunicatorExecute_) {
ipcCommunicatorExecute_->CleanUp();
ipcCommunicatorExecute_.reset();
}
if (ipcCommunicatorExecuteError_) {
ipcCommunicatorExecuteError_->CleanUp();
ipcCommunicatorExecuteError_.reset();
}
if (ipcCommunicatorSharedSync_) {
ipcCommunicatorSharedSync_->CleanUp();
ipcCommunicatorSharedSync_.reset();
}
if (ipcCommunicatorKVTransfer_) {
ipcCommunicatorKVTransfer_->CleanUp();
ipcCommunicatorKVTransfer_.reset();
}
if (ipcCommunicatorRecoverCommand_) {
ipcCommunicatorRecoverCommand_->CleanUp();
ipcCommunicatorRecoverCommand_.reset();
}
grpcCommunicator_.reset();
}
Communicator::~Communicator() {}
}