* Copyright (c) 2025 Huawei Technologies Co., Ltd.
* This program is free software, you can redistribute it and/or modify it under the terms and conditions of
* CANN Open Software License Agreement Version 2.0 (the "License").
* Please refer to the License for details. You may not use this file except in compliance with the License.
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED,
* INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE.
* See LICENSE in the root of the software repository for the full text of the License.
*/
#include "executor/engine_daemon.h"
#include <csignal>
#include "mmpa/mmpa_api.h"
#include "exec_runtime/execution_runtime_utils.h"
#include "proto/deployer.pb.h"
#include "event_handler.h"
#include "common/compile_profiling/ge_call_wrapper.h"
#include "common/profiling/profiling_init.h"
#include "dflow/base/utils/process_utils.h"
#include "common/utils/memory_statistic_manager.h"
#include "common/config/device_debug_config.h"
#include "common/profiling/command_handle.h"
#include "common/data_flow/queue/heterogeneous_exchange_service.h"
#include "common/ge_inner_attrs.h"
#include "common/string_util.h"
#include "graph/ge_local_context.h"
#include "graph/ge_context.h"
#include "executor/cpu_sched_event_dispatcher.h"
#include "adx_datadump_server.h"
#include "acl/acl.h"
#include "common/df_chk.h"
namespace ge {
namespace {
constexpr int32_t kDefaultTimeout = 10 * 1000;
constexpr int32_t kAdxErrorNone = 0;
constexpr uint32_t kModelExeErr = 507018U;
constexpr char_t kDumpoff[] = "off";
constexpr char_t kProfilingPath[] = "/var/log/npu/profiling/";
const char_t * const kArgsKeyBaseDir = "--base_dir";
const char_t * const kArgsKeyDeviceId = "--device_id";
const char_t * const kArgsKeyMsgQueueDeviceId = "--msg_queue_device_id";
std::atomic<bool> kLoopFlag(true);
std::atomic<bool> acl_initialized{false};
}
EngineDaemon::EngineDaemon(bool is_host_cpu) : is_host_cpu_(is_host_cpu) {}
void EngineDaemon::PrintLogLevel() {
const char_t *env_value = nullptr;
MM_SYS_GET_ENV(MM_ENV_ASCEND_GLOBAL_LOG_LEVEL, env_value);
GEEVENT("[PrintLogLevel] pid = %d, ASCEND_GLOBAL_LOG_LEVEL = [%s].", getpid(), env_value);
}
void EngineDaemon::SignalHandler(int32_t sig_num) {
(void) sig_num;
kLoopFlag.store(false);
}
Status EngineDaemon::InitializeWithArgs(int32_t argc, char_t **argv) {
(void)std::signal(SIGTERM, static_cast<sighandler_t>(&EngineDaemon::SignalHandler));
PrintLogLevel();
GE_CHK_STATUS_RET_NOLOG(ParseCmdLineArgs(argc, argv));
GE_CHK_STATUS_RET_NOLOG(InitializeMaintenanceFromOption());
GE_CHK_STATUS_RET_NOLOG(RtsApiUtils::MemGrpAttach(mem_group_name_, kDefaultTimeout));
GE_CHK_STATUS_RET_NOLOG(RtsApiUtils::MbufInit());
GE_CHK_STATUS_RET_NOLOG(RtsApiUtils::EschedAttachDevice(msg_queue_device_id_));
GE_CHK_STATUS_RET_NOLOG(InitializeExecutor());
rtMemQueueSetInputPara para = {};
(void) rtMemQueueSet(msg_queue_device_id_, RT_MQ_QUEUE_ENABLE_LOCAL_QUEUE, ¶);
executor_message_server_ = MakeShared<MessageServer>(msg_queue_device_id_, req_msg_queue_id_, rsp_msg_queue_id_);
GE_CHECK_NOTNULL(executor_message_server_);
GE_CHK_STATUS_RET_NOLOG(executor_message_server_->Initialize());
event_handler_.SetBaseDir(base_dir_);
GE_CHK_STATUS_RET_NOLOG(event_handler_.Initialize());
GE_CHK_STATUS_RET_NOLOG(NotifyInitialized());
MemoryStatisticManager::Instance().Initialize(mem_group_name_);
if (!acl_initialized) {
aclError ret = aclInit(nullptr);
if (ret != ACL_SUCCESS) {
GELOGE(FAILED, "ACL init failed.");
return FAILED;
} else {
GELOGI("ACL init success.");
acl_initialized.store(true);
}
}
return SUCCESS;
}
Status EngineDaemon::InitializeExecutor() {
if (is_host_cpu_) {
GE_CHK_STATUS_RET(CpuSchedEventDispatcher::GetInstance().Initialize(device_id_, is_host_cpu_),
"Failed to init event dispatcher");
}
GE_CHK_STATUS_RET_NOLOG(RtsApiUtils::SetDevice(device_id_));
DF_CHK_ACL(aclrtCreateContext(&rt_context_, device_id_));
GE_CHK_STATUS_RET_NOLOG(InitializeGeExecutor());
const auto reg_ret = rtRegTaskFailCallbackByModule("NpuExe", [](rtExceptionInfo *excpt_info) {
if (excpt_info != nullptr) {
uint32_t retcode = excpt_info->retcode;
if (retcode == ACL_ERROR_RT_SOCKET_CLOSE) {
GELOGI("Aicpu sd dosen't exist, npu exe will exit.");
kLoopFlag.store(false);
} else if (retcode == kModelExeErr) {
GELOGW("Model execute failed.");
} else {
GELOGD("Callback function is called, retcode[%u]", retcode);
}
}
});
GELOGI("Register callback, ret = %d.", static_cast<int32_t>(reg_ret));
return SUCCESS;
}
void EngineDaemon::Finalize() {
if (acl_initialized) {
aclFinalize();
acl_initialized.store(false);
}
MemoryStatisticManager::Instance().Finalize();
(void)FinalizeMaintenance();
(void)ge_executor_.Finalize();
event_handler_.Finalize();
CpuSchedEventDispatcher::GetInstance().Finalize();
if (executor_message_server_ != nullptr) {
executor_message_server_->Finalize();
executor_message_server_.reset();
}
if (rt_context_ != nullptr) {
is_finish_.store(false);
std::thread th(&EngineDaemon::FinalizeThread, this);
(void)aclrtDestroyContext(rt_context_);
is_finish_.store(true);
cond_var_.notify_one();
th.join();
}
(void)aclrtResetDevice(device_id_);
kLoopFlag.store(false);
GEEVENT("Engine daemon finalized, device id = %d", device_id_);
}
void EngineDaemon::FinalizeThread() {
SET_THREAD_NAME(pthread_self(), "ge_dpl_edfin");
std::unique_lock<std::mutex> lk(single_mutex_);
constexpr int32_t defaultTimeout = 1;
cond_var_.wait_for(lk, std::chrono::minutes(defaultTimeout), [this] { return is_finish_.load(); });
if (!is_finish_.load()) {
GELOGE(FAILED, "EngineDaemon finalize timeout.");
_exit(-1);
}
}
Status EngineDaemon::InitProfilingFromOption(const std::map<std::string, std::string> &options_all) const {
auto options = options_all;
if (!is_host_cpu_) {
GE_CHK_STATUS_RET(ge::ProfilingInit::Instance().Init(options), "Failed to init profiling");
return SUCCESS;
}
rtProfCtrlHandle callback = ProfCtrlHandle;
const rtError_t ret = rtProfRegisterCtrlCallback(GE, callback);
if (ret != RT_ERROR_NONE) {
GELOGE(FAILED, "Register CtrlCallback failed.");
return FAILED;
}
bool is_execute_profiling = true;
const auto &cfg_data = options[kProfilingDeviceConfigData];
const auto &is_execute_on = options[kProfilingIsExecuteOn];
if (cfg_data.empty()) {
GELOGI("kProfilingDeviceConfigData is not set.");
is_execute_profiling = false;
}
if (is_execute_on.empty() || (is_execute_on == "0")) {
is_execute_profiling = false;
GELOGI("kProfilingIsExecuteOn is not enable.");
}
if (is_execute_profiling) {
struct MsprofCommandHandleParams prof_conf = {};
if (strncpy_s(prof_conf.profData, sizeof(prof_conf.profData), cfg_data.c_str(), cfg_data.length()) != EOK) {
GELOGE(INTERNAL_ERROR, "[copy][ProfilingConfigOption]Failed, data %s.", cfg_data.c_str());
REPORT_INNER_ERR_MSG("E19999", "Copy profiling data %s failed.", cfg_data.c_str());
return INTERNAL_ERROR;
}
prof_conf.profDataLen = strlen(prof_conf.profData);
if (strncpy_s(prof_conf.path, sizeof(prof_conf.path), kProfilingPath, strlen(kProfilingPath) + 1) != EOK) {
GELOGE(INTERNAL_ERROR, "[copy][ProfilingConfigOption]Failed, path %s.", kProfilingPath);
REPORT_INNER_ERR_MSG("E19999", "Copy profiling_options %s failed.", kProfilingPath);
return INTERNAL_ERROR;
}
prof_conf.pathLen = strlen(prof_conf.path);
prof_conf.storageLimit = UINT32_MAX;
uint32_t data_type = is_host_cpu_ ? static_cast<uint32_t>(MSPROF_CTRL_INIT_PURE_CPU) :
static_cast<uint32_t>(MSPROF_CTRL_INIT_HELPER);
const auto df_ret = MsprofInit(data_type, &prof_conf, sizeof(prof_conf));
if (df_ret != 0) {
GELOGE(INTERNAL_ERROR, "[Call][msprofCtrlCallback]Failed, type %u, return %d", data_type, df_ret);
REPORT_INNER_ERR_MSG("E19999", "Call msprofCtrlCallback failed, type %u, return %d", data_type, df_ret);
return INTERNAL_ERROR;
}
GELOGI("Profiling init in binary, return %d.", df_ret);
} else {
const auto df_ret = MsprofInit(MSPROF_CTRL_INIT_DYNA, nullptr, 0);
GELOGI("Default profiling init, return %d.", df_ret);
}
return SUCCESS;
}
Status EngineDaemon::InitDumpFromOption(std::map<std::string, std::string> &options) {
DumpConfig dump_cfg;
args_option_[kExecutorDevId] = std::to_string(device_id_);
const bool is_enable = DumpManager::GetInstance().GetCfgFromOption(options, dump_cfg);
const Status ret = ge_executor_.SetDump(dump_cfg);
if (!is_enable) {
GELOGI("Dump is not open, do not need to init dump server, ret=%d.", ret);
return ret;
}
const int32_t adx_ret = AdxDataDumpServerInit();
if (adx_ret != kAdxErrorNone) {
GELOGE(ge::INTERNAL_ERROR, "[AdxDataDumpServer][Init]dump server run failed, adx result[%d].", adx_ret);
return ge::INTERNAL_ERROR;
}
is_dump_inited_ = true;
return ret;
}
Status EngineDaemon::InitializeMaintenanceFromOption() {
GE_CHK_STATUS_RET(InitProfilingFromOption(args_option_), "InitProfilingFromOption failed.");
GE_CHK_STATUS_RET(InitDumpFromOption(args_option_), "InitDumpFromOption failed.");
return SUCCESS;
}
Status EngineDaemon::LoopEvents() {
GELOGD("Event loop started");
bool is_finalize = false;
while (kLoopFlag) {
deployer::ExecutorRequest request;
const auto ret = executor_message_server_->WaitRequest(request, is_finalize);
if (ret == RT_ERROR_TO_GE_STATUS(ACL_ERROR_RT_QUEUE_EMPTY)) {
GELOGD("No event was received, continue");
continue;
}
GE_CHK_STATUS_RET(ret, "Failed to wait request event");
if (is_finalize) {
break;
}
deployer::ExecutorResponse response;
GE_CHK_STATUS_RET_NOLOG(HandleEvent(request, response));
}
GELOGI("Event loop end.");
return SUCCESS;
}
Status EngineDaemon::FinalizeMaintenance() {
const auto ret = MsprofFinalize();
GE_CHK_STATUS_RET(ret, "MsprofFinalize failed, ret=%d.", ret);
if (is_dump_inited_) {
const int32_t adx_ret = AdxDataDumpServerUnInit();
GE_CHK_STATUS_RET(adx_ret, "AdxDataDumpServerUnInit failed, ret=%d.", adx_ret);
DumpConfig dump_cfg;
dump_cfg.dump_status = kDumpoff;
dump_cfg.dump_debug = kDumpoff;
const Status dump_ret = ge_executor_.SetDump(dump_cfg);
GE_CHK_STATUS_RET(ret, "Executor set dump failed, ret=%d.", dump_ret);
}
is_dump_inited_ = false;
return SUCCESS;
}
Status EngineDaemon::HandleEvent(deployer::ExecutorRequest &request, deployer::ExecutorResponse &response) {
event_handler_.HandleEvent(request, response);
if (response.error_code() == SUCCESS) {
GELOGD("[Handle][Event] succeeded");
response.set_error_message("[Handle][Event] succeeded");
} else {
GELOGD("[Handle][Event] failed, error_code = %u, error_msg = %s", response.error_code(),
response.error_message().c_str());
}
GE_CHK_STATUS_RET(executor_message_server_->SendResponse(response), "[Handle][Event] send response failed.");
return SUCCESS;
}
Status EngineDaemon::InitializeWithKVArgs() {
const auto &dir_it = args_option_.find(kArgsKeyBaseDir);
if (dir_it != args_option_.cend()) {
base_dir_ = dir_it->second;
GELOGI("Get arg base_dir succeeded, dir = %s.", base_dir_.c_str());
}
const auto &dev_it = args_option_.find(kArgsKeyDeviceId);
if (dev_it != args_option_.cend()) {
GE_CHK_STATUS_RET_NOLOG(ToNumber(dev_it->second.c_str(), device_id_));
GELOGI("Get arg device_id succeeded, device_id_ = %d.", device_id_);
}
const auto &queue_dev_it = args_option_.find(kArgsKeyMsgQueueDeviceId);
if (queue_dev_it != args_option_.cend()) {
GE_CHK_STATUS_RET_NOLOG(ToNumber(queue_dev_it->second.c_str(), msg_queue_device_id_));
GELOGI("Get arg msg_queue_device_id succeeded, msg_queue_device_id_ = %d.", msg_queue_device_id_);
}
return SUCCESS;
}
void EngineDaemon::TransArray2ArgsOption(const int32_t start, const int32_t end, char_t **argv) {
std::map<std::string, std::string> args_option;
GELOGI("TransArray2ArgsOption enter, start=%d, end=%d.", start, end);
for (int32_t var_id = start; var_id < end; var_id++) {
const std::string str(argv[var_id]);
constexpr size_t kPairArgsNum = 2UL;
constexpr size_t kKeyPos = 0UL;
constexpr size_t kValuePos = 1UL;
std::vector<std::string> pair = StringUtils::Split(str, '=');
if (pair.size() != kPairArgsNum) {
GELOGW("Cannot parse args in %s.", argv[var_id]);
continue;
}
args_option.emplace(pair[kKeyPos], pair[kValuePos]);
}
args_option_ = std::move(args_option);
}
void EngineDaemon::GetGlobalEnvOptions(std::map<std::string, std::string> &env_option) {
const std::string kLogLevelEnvName = "ASCEND_GLOBAL_LOG_LEVEL";
const std::string kLogEventEnableEnvName = "ASCEND_GLOBAL_EVENT_ENABLE";
const std::string kLogHostFileNumEnvName = "ASCEND_HOST_LOG_FILE_NUM";
const std::vector<std::string> kLogEnvNames = {kLogLevelEnvName, kLogEventEnableEnvName, kLogHostFileNumEnvName};
for (const auto &log_name : kLogEnvNames) {
constexpr size_t kMaxClusterEnvStrLen = 1024UL;
char_t env_value[kMaxClusterEnvStrLen] = {};
const int32_t ret = mmGetEnv(log_name.c_str(), env_value, kMaxClusterEnvStrLen);
if (ret != EN_OK) {
GELOGW("[Check][Env]Get env[%s] failed.", log_name.c_str());
continue;
}
GELOGD("Get env, key=%s, val=%s.", log_name.c_str(), env_value);
env_option.emplace(log_name, std::string(env_value));
}
}
Status EngineDaemon::ParseCmdLineArgs(int32_t argc, char_t **argv) {
const int32_t kExpectedArgCount = 5;
if (argc < kExpectedArgCount) {
GELOGE(PARAM_INVALID, "[Parse][Args] failed, arg count (%d) is invalid", argc);
return PARAM_INVALID;
}
const char_t *memory_group_name = argv[1];
GE_CHECK_NOTNULL(memory_group_name);
mem_group_name_ = std::string(memory_group_name);
const char_t *req_msg_queue_id = argv[2];
GE_CHECK_NOTNULL(req_msg_queue_id);
GE_CHK_STATUS_RET_NOLOG(ToNumber(req_msg_queue_id, req_msg_queue_id_));
const char_t *rsp_msg_queue_id = argv[3];
GE_CHECK_NOTNULL(rsp_msg_queue_id);
GE_CHK_STATUS_RET_NOLOG(ToNumber(rsp_msg_queue_id, rsp_msg_queue_id_));
const char_t *device_id = argv[4];
GE_CHECK_NOTNULL(device_id);
GE_CHK_STATUS_RET_NOLOG(ToNumber(device_id, device_id_));
TransArray2ArgsOption(kExpectedArgCount - 1, argc, argv);
GE_CHK_STATUS_RET(InitializeWithKVArgs(), "Failed to init with kv args.");
std::map<std::string, std::string> env_option;
GetGlobalEnvOptions(env_option);
for (const auto &env_it : env_option) {
const auto &key = env_it.first;
const auto &val = env_it.second;
GELOGI("Get global env options, env=%s, env_val=%s.", key.c_str(), val.c_str());
}
GELOGD("[Parse][Args] succeeded, get env_option size=%zu, mem_grp_name = %s, req_msg_queue_id = %s, "
"rsp_msg_queue_id = %s, device_id = %d, msg_queue_device_id = %d",
env_option.size(), mem_group_name_.c_str(),
req_msg_queue_id, rsp_msg_queue_id, device_id_, msg_queue_device_id_);
return SUCCESS;
}
Status EngineDaemon::NotifyInitialized() const {
deployer::ExecutorResponse response;
response.set_error_code(SUCCESS);
response.set_error_message("Executor initialized success.");
GE_CHK_STATUS_RET(executor_message_server_->SendResponse(response), "[Notify][Initialized] failed, device_id = %d.",
msg_queue_device_id_);
GELOGD("[Notify][Initialized] success");
return SUCCESS;
}
Status EngineDaemon::InitializeGeExecutor() {
ExecutionRuntimeUtils::EnableGlobalInHeterogeneousExecutor();
args_option_.emplace(OPTION_EXEC_IS_USEHCOM, "1");
args_option_.emplace(OPTION_GRAPH_RUN_MODE, "0");
std::string hccl_flag = "1";
if (is_host_cpu_) {
hccl_flag = "0";
}
args_option_.emplace(OPTION_EXEC_HCCL_FLAG, hccl_flag);
GE_CHK_STATUS_RET(ge_executor_.Initialize(args_option_), "Failed to init ge executor");
if (is_host_cpu_) {
std::map<std::string, std::string> options {
{"ge.exec.placement", "HOST"},
};
ge::GetThreadLocalContext().SetGlobalOption(options);
GetContext().SetCtxDeviceId(static_cast<uint32_t>(-1));
} else {
GetContext().SetCtxDeviceId(static_cast<uint32_t>(device_id_));
}
return SUCCESS;
}
}