* Copyright (c) 2025 Huawei Technologies Co., Ltd.
* This program is free software, you can redistribute it and/or modify it under the terms and conditions of
* CANN Open Software License Agreement Version 2.0 (the "License").
* Please refer to the License for details. You may not use this file except in compliance with the License.
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED,
* INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE.
* See LICENSE in the root of the software repository for the full text of the License.
*/
#include "deploy/execfwk/builtin_executor_client.h"
#include <signal.h>
#include "common/compile_profiling/ge_call_wrapper.h"
#include "common/data_flow/queue/heterogeneous_exchange_service.h"
#include "common/mem_grp/memory_group_manager.h"
#include "common/subprocess/subprocess_manager.h"
#include "common/utils/rts_api_utils.h"
#include "dflow/base/utils/process_utils.h"
#include "mmpa/mmpa_api.h"
#include "dflow/inc/data_flow/model/pne_model.h"
#include "dflow/base/deploy/deploy_planner.h"
#include "deploy/flowrm/flowgw_client.h"
#include "prof_common.h"
namespace ge {
namespace {
constexpr uint32_t kMaxExecutorShutdownWaitTimeInSec = 60;
constexpr const char_t * const kArgsKeyBaseDir = "--base_dir";
constexpr const char_t * const kArgsKeyMsgQueueDeviceId = "--msg_queue_device_id";
PneExecutorClientCreatorRegistrar<BuiltinExecutorClient> __attribute__((unused)) npu_client_reg(PNE_ID_NPU);
PneExecutorClientCreatorRegistrar<HostCpuExecutorClient> __attribute__((unused)) cpu_client_reg(PNE_ID_CPU);
}
BuiltinExecutorClient::BuiltinExecutorClient(int32_t device_id, bool is_host)
: PneExecutorClient(device_id), is_host_(is_host),
sub_proc_stat_(ProcStatus::NORMAL) {
}
Status BuiltinExecutorClient::Initialize() {
if (GetContext().device_type != static_cast<int32_t>(CPU)) {
MsprofConfigParam param = {};
param.deviceId = static_cast<uint32_t>(GetDeviceId());
param.type = MsprofConfigParamType::DEV_CHANNEL_RESOURCE;
param.value = 1;
(void) MsprofSetConfig(0, reinterpret_cast<const char *>(¶m), sizeof(param));
}
GE_CHK_STATUS_RET(ForAndInit(GetDeviceId(), message_client_), "Failed to fork and init");
heartbeat_listening_ = true;
return SUCCESS;
}
void BuiltinExecutorClient::Shutdown() {
(void)SubprocessManager::GetInstance().ShutdownSubprocess(pid_, kMaxExecutorShutdownWaitTimeInSec);
}
Status BuiltinExecutorClient::Finalize() {
GEEVENT("BuiltinExecutorClient begin, executor pid is %d.", GetPid());
heartbeat_listening_ = false;
if (message_client_ != nullptr) {
(void) message_client_->NotifyFinalize();
}
Shutdown();
message_client_.reset();
return SUCCESS;
}
Status BuiltinExecutorClient::ForAndInit(int32_t device_id, std::unique_ptr<ExecutorMessageClient> &message_client) {
int32_t msg_queue_device_id = 0;
message_client = CreateExecutorMessageClient(msg_queue_device_id);
GE_CHECK_NOTNULL(message_client);
const std::string name_suffix = "executor_" + std::to_string(device_id);
uint32_t req_msg_queue_id = UINT32_MAX;
uint32_t rsp_msg_queue_id = UINT32_MAX;
GE_CHK_STATUS_RET(message_client->CreateMessageQueue(name_suffix, req_msg_queue_id, rsp_msg_queue_id),
"Failed to create message queue");
const auto &group_name = MemoryGroupManager::GetInstance().GetQsMemGroupName();
GE_CHK_STATUS_RET_NOLOG(DoForkChildProcess(device_id, req_msg_queue_id, rsp_msg_queue_id, group_name));
GE_CHK_STATUS_RET(MemoryGroupManager::GetInstance().MemGrpAddProc(group_name, pid_, false, true),
"Failed to add group, pid = %d", pid_);
GELOGD("[Fork][Process] succeeded, pid = %d.", pid_);
const auto &get_stat_func = [this, device_id]() -> Status {
return (sub_proc_stat_.load() == ProcStatus::EXITED) ? FAILED : SUCCESS;
};
GE_CHK_STATUS_RET(message_client->Initialize(pid_, get_stat_func), "Failed to initialize executor process");
return SUCCESS;
}
Status BuiltinExecutorClient::DoForkChildProcess(int32_t device_id,
uint32_t req_msg_queue_id,
uint32_t rsp_msg_queue_id,
const std::string &group_name) {
std::string bin_dir;
SubprocessManager::SubprocessConfig config{};
config.process_type = is_host_ ? PNE_ID_CPU : PNE_ID_NPU;
config.death_signal = SIGKILL;
std::string process_name = is_host_ ? "host_cpu_executor" : "npu_executor";
auto process_cfg = GetDevMaintenanceCfg();
if (process_cfg != nullptr) {
GE_CHK_STATUS_RET(process_cfg->DecodeConfig(config.envs, config.kv_args), "Decode config failed.");
}
const std::string enable = "1";
config.envs.emplace("AICPU_ADD_BUFFERGROUP", enable);
config.args = {process_name,
group_name,
std::to_string(req_msg_queue_id),
std::to_string(rsp_msg_queue_id),
std::to_string(device_id)};
(void) GenerateKvArgs(config.kv_args);
GE_CHK_STATUS_RET_NOLOG(SubprocessManager::GetInstance().ForkSubprocess(config, pid_));
GELOGI("Start to watch subprocess, pid[%d].", static_cast<int32_t>(pid_));
std::function<void(const ProcStatus &)> excpt_handle_callback = [this](const ProcStatus &proc_status) {
GEEVENT("Executor process status is %d.", static_cast<int32_t>(proc_status));
sub_proc_stat_ = proc_status;
};
SubprocessManager::GetInstance().RegExcptHandleCallback(pid_, excpt_handle_callback);
GELOGI("Fork subprocess successfully, engine_type = %s, pid = %d", process_name.c_str(), pid_);
return SUCCESS;
}
Status BuiltinExecutorClient::LoadModel(deployer::ExecutorRequest_BatchLoadModelMessage load_model_desc) {
GELOGD("[Load][Model] begin.");
GE_CHECK_NOTNULL(message_client_);
GE_CHK_STATUS_RET_NOLOG(GrantQueues(load_model_desc));
GE_CHK_STATUS_RET_NOLOG(GrantDynamicSchedQueues(load_model_desc));
GE_TIMESTAMP_START(PreLoadModel);
GE_CHK_STATUS_RET(PreLoadProcess(load_model_desc), "Failed to pre load process.");
GE_TIMESTAMP_EVENT_END(PreLoadModel, "PreLoadModel");
GE_MAKE_GUARD(after_process, ([this, &load_model_desc]() { AfterLoadProcess(load_model_desc); }));
deployer::ExecutorRequest executor_request;
GEEVENT("Load model on executor, model_type = %s, pid = %d, graph_id = %u, deployer pid = %d, device_id = %d.",
PNE_ID_NPU.c_str(), GetPid(), load_model_desc.graph_id(), GetDeployerPid(), GetDeviceId());
*executor_request.mutable_batch_load_model_message() = load_model_desc;
deployer::ExecutorResponse executor_response;
constexpr int64_t kTimeoutSec = 8400;
GE_CHK_STATUS_RET(message_client_->SendRequest(executor_request, executor_response, kTimeoutSec),
"[Load][Model] Failed to send request");
if (executor_response.error_code() != SUCCESS) {
GELOGE(FAILED, "[Load][Model] failed, error_message = %s", executor_response.error_message().c_str());
return FAILED;
}
return SUCCESS;
}
Status BuiltinExecutorClient::UpdateProfilingFromExecutor(deployer::ExecutorRequest_UpdateProfRequest &prof_message) {
GELOGD("[Update][profiling] begin.");
if (!is_host_) {
GELOGI("[Update][profiling] only support host for now.");
return SUCCESS;
}
deployer::ExecutorRequest executor_request;
*executor_request.mutable_update_prof_message() = prof_message;
deployer::ExecutorResponse executor_response;
constexpr int64_t kTimeoutSec = 300;
GE_CHK_STATUS_RET(message_client_->SendRequest(executor_request, executor_response, kTimeoutSec),
"[Update][profiling] Failed to send request to executor, device_id = %d", GetDeviceId());
GE_CHK_BOOL_RET_STATUS(executor_response.error_code() == SUCCESS, FAILED,
"[Update][profiling] failed, error message=%s.",
executor_response.error_message().c_str());
return SUCCESS;
}
Status BuiltinExecutorClient::UnloadModel(uint32_t model_id) {
GELOGD("[Unload][Model] begin.");
deployer::ExecutorRequest executor_request;
auto exec_req_body = executor_request.mutable_unload_model_message();
GE_CHECK_NOTNULL(exec_req_body);
exec_req_body->set_model_id(model_id);
deployer::ExecutorResponse executor_response;
constexpr int64_t kTimeoutSec = 300;
GE_CHK_STATUS_RET(message_client_->SendRequest(executor_request, executor_response, kTimeoutSec),
"[Unload][Model] Failed to send request to executor, device_id = %d", GetDeviceId());
if (executor_response.error_code() != SUCCESS) {
GELOGE(FAILED, "[Unload][Model] failed, request = %s, error_message = %s",
exec_req_body->DebugString().c_str(), executor_response.error_message().c_str());
return FAILED;
}
return SUCCESS;
}
Status BuiltinExecutorClient::ClearModelRunningData(uint32_t model_id, int32_t type,
const std::set<int32_t> &device_ids) {
GELOGD("[ClearModelExceptionData] begin.");
(void)device_ids;
if (!is_valid_) {
GELOGI("Executor is not valid.");
return SUCCESS;
}
deployer::ExecutorRequest executor_request;
auto clear_req_body = executor_request.mutable_clear_model_message();
GE_CHECK_NOTNULL(clear_req_body);
clear_req_body->set_model_id(model_id);
clear_req_body->set_clear_msg_type(type);
deployer::ExecutorResponse executor_response;
constexpr int64_t kTimeoutSec = 60;
GE_CHK_STATUS_RET(message_client_->SendRequest(executor_request, executor_response, kTimeoutSec),
"[ClearModelExceptionData] Failed to send request to executor, device_id = %d, "
"root_model_id = %u, msg_type = %d", GetDeviceId(), model_id, type);
if (executor_response.error_code() != SUCCESS) {
GELOGE(FAILED, "[ClearModelExceptionData] failed, request = %s, error_message = %s",
clear_req_body->DebugString().c_str(), executor_response.error_message().c_str());
return FAILED;
}
return SUCCESS;
}
Status BuiltinExecutorClient::DataFlowExceptionNotify(const deployer::DataFlowExceptionNotifyRequest &req_body) {
if (!is_valid_) {
GELOGI("Executor is not valid.");
return SUCCESS;
}
deployer::ExecutorRequest executor_request;
executor_request.set_type(deployer::ExecutorRequestType::kExecutorExceptionNotify);
auto exception_notify_req_body = executor_request.mutable_exception_notify_request();
GE_CHECK_NOTNULL(exception_notify_req_body);
*exception_notify_req_body = req_body;
deployer::ExecutorResponse executor_response;
GE_CHK_STATUS_RET(
message_client_->SendRequest(executor_request, executor_response),
"[DataFlowExceptionNotify] Failed to send request to executor, device_id = %d, trans_id = %lu, type=%u",
GetDeviceId(), exception_notify_req_body->exception_notify().trans_id(),
exception_notify_req_body->exception_notify().type());
if (executor_response.error_code() != SUCCESS) {
GELOGE(FAILED, "[DataFlowExceptionNotify] failed, request = %s, error_code=%u, error_message = %s",
executor_request.DebugString().c_str(), executor_response.error_code(),
executor_response.error_message().c_str());
return FAILED;
}
GELOGI("send request to executor end, device_id = %d, trans_id = %lu, type=%u", GetDeviceId(),
exception_notify_req_body->exception_notify().trans_id(),
exception_notify_req_body->exception_notify().type());
return SUCCESS;
}
ProcStatus BuiltinExecutorClient::GetSubProcStat() {
if (!heartbeat_listening_) {
return ProcStatus::NORMAL;
}
is_valid_ = is_valid_ ? (sub_proc_stat_.load() == ProcStatus::NORMAL) : is_valid_;
return sub_proc_stat_.load();
}
Status BuiltinExecutorClient::GetPidOwningIoQueues(int32_t &pid) {
if (is_host_) {
GELOGD("Get cpu schedule pid = %d success", pid_);
pid = pid_;
return SUCCESS;
}
if (aicpu_pid_ != -1) {
pid = aicpu_pid_;
return SUCCESS;
}
GE_CHK_STATUS_RET(RtsApiUtils::GetAicpuSchedulePid(GetDeviceId(), pid_, pid),
"Query aicpu schedule failed, deviceId=%d, hostPid=%d.", GetDeviceId(), pid_);
aicpu_pid_ = pid;
GELOGD("Get cpu schedule pid = %d success.", pid);
if (GetContext().device_type != static_cast<int32_t>(CPU)) {
const auto &group_name = MemoryGroupManager::GetInstance().GetRemoteMemGroupName(GetDeviceId());
GE_CHK_STATUS_RET(MemoryGroupManager::GetInstance().RemoteMemGrpAddProc(GetDeviceId(),
group_name,
aicpu_pid_, false, true),
"Failed to add group for aicpu, pid = %d", aicpu_pid_);
}
return SUCCESS;
}
Status BuiltinExecutorClient::SyncVarManager(deployer::ExecutorRequest_SyncVarManageRequest sync_var_manage_desc) {
deployer::ExecutorRequest executor_request;
*executor_request.mutable_sync_var_manager_message() = std::move(sync_var_manage_desc);
deployer::ExecutorResponse executor_response;
GE_CHK_STATUS_RET(message_client_->SendRequest(executor_request, executor_response),
"[Sync][VarManager] Failed to send request to executor, device_id = %d", GetDeviceId());
if (executor_response.error_code() != SUCCESS) {
GELOGE(FAILED, "[Sync][VarManager] failed, request = %s, error_message = %s",
executor_request.DebugString().c_str(), executor_response.error_message().c_str());
return FAILED;
}
GELOGD("[Sync][VarManager] succeeded, device_id = %d.", GetDeviceId());
return SUCCESS;
}
Status BuiltinExecutorClient::DoGrantQueues(int32_t pid, const std::vector<DeployQueueAttr> &queue_attrs) {
std::lock_guard<std::mutex> lock(mutex_);
auto &queue_info = grant_queues_map_[pid];
for (const auto &queue_attr : queue_attrs) {
const auto &key = queue_attr.GetKey();
const auto &it = queue_info.find(key);
if (it != queue_info.cend()) {
continue;
}
GE_CHK_STATUS_RET(FlowGwClient::GrantQueue(static_cast<uint32_t>(queue_attr.device_id), queue_attr.queue_id,
pid, GrantType::kReadAndWrite),
"Grant queue failed, device id=%d, queue id=%u, pid = %d",
queue_attr.device_id, queue_attr.queue_id, pid);
queue_info.emplace(key);
}
return SUCCESS;
}
Status BuiltinExecutorClient::GrantQueues(const deployer::ExecutorRequest_BatchLoadModelMessage &load_model_desc) {
for (const auto &model : load_model_desc.models()) {
int32_t pid = 0;
GE_CHK_STATUS_RET(GetPidOwningIoQueues(pid), "Failed to get pid");
GE_CHK_STATUS_RET(GrantQueuesForProcess(pid, GetContext().device_type, model.model_queues_attrs()),
"Failed to grant queues, pid = %d", pid);
}
return SUCCESS;
}
Status BuiltinExecutorClient::GrantDynamicSchedQueues(
const deployer::ExecutorRequest_BatchLoadModelMessage &load_model_desc) {
for (const auto &model : load_model_desc.models()) {
int32_t pid = 0;
GE_CHK_STATUS_RET(GetPidOwningIoQueues(pid), "Failed to get pid");
for (auto input_queue : model.status_queues().input_queues_attrs()) {
GELOGI("DynamicSched Grant queues, status input queue id=%u, pid=%d", input_queue.queue_id(), pid);
}
for (auto output_queue : model.status_queues().output_queues_attrs()) {
GELOGI("DynamicSched Grant queues, status output queue id=%u, pid=%d", output_queue.queue_id(), pid);
}
GE_CHK_STATUS_RET(GrantQueuesForProcess(pid, GetContext().device_type, model.status_queues()),
"Failed to grant queues, pid = %d", pid);
}
return SUCCESS;
}
Status BuiltinExecutorClient::GenerateKvArgs(std::map<std::string, std::string> &kv_args) {
const auto &context = GetContext();
kv_args[kArgsKeyBaseDir] = context.base_dir;
int32_t msg_queue_device_id = 0;
kv_args[kArgsKeyMsgQueueDeviceId] = std::to_string(msg_queue_device_id);
const auto &mode_it = context.options.find(OPTION_EXEC_PROFILING_MODE);
if (mode_it != context.options.cend()) {
kv_args[OPTION_EXEC_PROFILING_MODE] = mode_it->second;
}
const auto &options_it = context.options.find(OPTION_EXEC_PROFILING_OPTIONS);
if (options_it != context.options.cend()) {
kv_args[OPTION_EXEC_PROFILING_OPTIONS] = options_it->second;
}
return SUCCESS;
}
Status BuiltinExecutorClient::DoBindHostPid(const int32_t pid) {
std::lock_guard<std::mutex> guard(pid_mutex_);
const auto &it = bind_pids_.find(pid);
if (it == bind_pids_.cend()) {
GE_CHK_STATUS_RET(BindHostPid(pid), "Failed to bind host pid");
bind_pids_.emplace(pid);
}
return SUCCESS;
}
HostCpuExecutorClient::HostCpuExecutorClient(int32_t device_id) : BuiltinExecutorClient(device_id, true) {}
}