* Copyright (c) 2025 Huawei Technologies Co., Ltd.
* This program is free software, you can redistribute it and/or modify it under the terms and conditions of
* CANN Open Software License Agreement Version 2.0 (the "License").
* Please refer to the License for details. You may not use this file except in compliance with the License.
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED,
* INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE.
* See LICENSE in the root of the software repository for the full text of the License.
*/
#include "daemon/deployer_daemon_client.h"
#include <signal.h>
#include "common/data_flow/queue/heterogeneous_exchange_service.h"
#include "common/utils/rts_api_utils.h"
namespace ge {
namespace {
constexpr int64_t kHeartbeatExpireSec = 30;
constexpr int64_t kHeartbeatTimeoutSec = 15;
constexpr uint32_t kMsgQueueDepth = 3U;
const std::string kSubDeployer = "deployer_daemon";
constexpr uint32_t kMaxDeployerShutdownWaitTimeInSec = 50;
}
DeployerDaemonClient::~DeployerDaemonClient() {
(void)Finalize();
}
DeployerDaemonClient::DeployerDaemonClient(int64_t client_id)
: client_id_(client_id), sub_deployer_proc_stat_(ProcStatus::NORMAL) {}
Status DeployerDaemonClient::Initialize(const std::map<std::string, std::string> &deployer_envs) {
deployer_envs_ = deployer_envs;
GE_CHK_STATUS_RET(ForkAndInit(), "Failed to init sub deployer process, client_id:%ld", client_id_);
OnHeartbeat();
GEEVENT("[Initialize] deployer daemon client success, client id:%ld, sub deployer pid:%d", client_id_,
static_cast<int32_t>(pid_));
return SUCCESS;
}
Status DeployerDaemonClient::Finalize() {
if (pid_ == -1) {
return SUCCESS;
}
Shutdown();
if (deployer_msg_client_ != nullptr) {
(void)deployer_msg_client_->Finalize();
}
GEEVENT("[Finalize] deployer daemon client success, client id:%ld, sub deployer pid:%d", client_id_,
static_cast<int32_t>(pid_));
pid_ = -1;
return SUCCESS;
}
void DeployerDaemonClient::Shutdown() const {
if (deployer_msg_client_ != nullptr) {
deployer::DeployerRequest request;
request.set_type(deployer::kDisconnect);
(void)deployer_msg_client_->SendRequestWithoutResponse(request);
}
(void)SubprocessManager::GetInstance().ShutdownSubprocess(pid_, kMaxDeployerShutdownWaitTimeInSec);
GELOGI("Success to shutdown sub deployer process, pid:%d", static_cast<int32_t>(pid_));
}
bool DeployerDaemonClient::IsExpired() {
std::lock_guard<std::mutex> lk(mu_);
int64_t diff =
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - last_heartbeat_ts_).count();
return diff > kHeartbeatExpireSec;
}
bool DeployerDaemonClient::IsExecuting() {
std::lock_guard<std::mutex> lk(mu_);
return is_executing_;
}
void DeployerDaemonClient::SetIsExecuting(bool is_executing) {
std::lock_guard<std::mutex> lk(mu_);
is_executing_ = is_executing;
}
void DeployerDaemonClient::OnHeartbeat() {
std::lock_guard<std::mutex> lk(mu_);
last_heartbeat_ts_ = std::chrono::steady_clock::now();
}
Status DeployerDaemonClient::ForkSubDeployerProcess(const std::string &group_name) {
SubprocessManager::SubprocessConfig config{};
config.process_type = kSubDeployer;
config.death_signal = SIGKILL;
const std::string process_name = "sub_deployer_" + std::to_string(client_id_);
config.args = {process_name, group_name, std::to_string(req_msg_queue_id_), std::to_string(rsp_msg_queue_id_)};
for (const auto &env : deployer_envs_) {
config.envs.emplace(env.first, env.second);
}
GE_CHK_STATUS_RET(SubprocessManager::GetInstance().ForkSubprocess(config, pid_), "Failed to fork %s",
process_name.c_str());
std::function<void(const ProcStatus &)> excpt_handle_callback = [this](const ProcStatus &proc_status) {
GEEVENT("Sub deployer process status is %d.", static_cast<int32_t>(proc_status));
sub_deployer_proc_stat_ = proc_status;
};
SubprocessManager::GetInstance().RegExcptHandleCallback(pid_, excpt_handle_callback);
GELOGI("Fork sub deployer success, process name:%s, pid:%d", process_name.c_str(), static_cast<int32_t>(pid_));
return SUCCESS;
}
Status DeployerDaemonClient::ForkAndInit() {
deployer_msg_client_ = CreateMessageClient();
GE_CHECK_NOTNULL(deployer_msg_client_);
const std::string queue_name_suffix = "sub_deployer_" + std::to_string(client_id_);
GE_CHK_STATUS_RET(deployer_msg_client_->CreateMessageQueue(queue_name_suffix, req_msg_queue_id_, rsp_msg_queue_id_),
"Failed to create message queue for sub deployer");
const auto &group_name = MemoryGroupManager::GetInstance().GetQsMemGroupName();
GE_CHK_STATUS_RET(ForkSubDeployerProcess(group_name), "Failed to fork sub deployer process");
GE_CHK_STATUS_RET(MemoryGroupManager::GetInstance().MemGrpAddProc(group_name, pid_, true, true),
"Failed to add group, pid:%d", static_cast<int32_t>(pid_));
const auto get_stat_func = [this]() -> Status {
return (sub_deployer_proc_stat_.load() == ProcStatus::EXITED) ? FAILED : SUCCESS;
};
GE_CHK_STATUS_RET(deployer_msg_client_->Initialize(pid_, get_stat_func), "Failed to initialize message client");
return SUCCESS;
}
Status DeployerDaemonClient::ProcessHeartbeatRequest(const deployer::DeployerRequest &request,
deployer::DeployerResponse &response) {
GE_CHECK_NOTNULL(deployer_msg_client_);
if (sub_deployer_proc_stat_ == ProcStatus::NORMAL) {
GELOGI("[Process][Request] client heartbeat dose not expired, client_id = %ld.", client_id_);
GE_CHK_STATUS_RET(deployer_msg_client_->SendRequest(request, response, kHeartbeatTimeoutSec));
} else if (sub_deployer_proc_stat_ == ProcStatus::EXITED) {
response.set_error_code(FAILED);
response.set_error_message("Sub deployer process does not exist");
GELOGE(FAILED, "Sub deployer process does not exist, client id:%ld", client_id_);
return FAILED;
} else if (sub_deployer_proc_stat_ == ProcStatus::STOPPED) {
response.set_error_code(static_cast<int32_t>(ProcStatus::STOPPED));
response.set_error_message("Sub deployer process stopped");
GELOGE(FAILED, "Sub deployer process stopped, client id:%ld", client_id_);
return FAILED;
} else {
}
return SUCCESS;
}
Status DeployerDaemonClient::ProcessDeployRequest(const deployer::DeployerRequest &request,
deployer::DeployerResponse &response) {
GELOGD("Begin to process deploy request, type:%s", deployer::DeployerRequestType_Name(request.type()).c_str());
GE_CHECK_NOTNULL(deployer_msg_client_);
GE_CHK_STATUS_RET_NOLOG(deployer_msg_client_->SendRequest(request, response, -1));
GELOGD("Success to process deploy request, type:%s", deployer::DeployerRequestType_Name(request.type()).c_str());
return SUCCESS;
}
}