* Copyright (c) 2025 Huawei Technologies Co., Ltd.
* This program is free software, you can redistribute it and/or modify it under the terms and conditions of
* CANN Open Software License Agreement Version 2.0 (the "License").
* Please refer to the License for details. You may not use this file except in compliance with the License.
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED,
* INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE.
* See LICENSE in the root of the software repository for the full text of the License.
*/
#include "deploy/execfwk/udf_proxy_client.h"
#include "common/debug/log.h"
#include "common/checker.h"
#include "deploy/flowrm/tsd_client.h"
#include "dflow/base/utils/process_utils.h"
#include "dflow/inc/data_flow/model/pne_model.h"
#include "common/mem_grp/memory_group_manager.h"
#include "toolchain/prof_api.h"
#include "prof_common.h"
#include "common/compile_profiling/ge_call_wrapper.h"
#include "graph_metadef/graph/utils/file_utils.h"
namespace ge {
namespace {
const std::string kProfilingExecuteOn = "1";
}
PneExecutorClientCreatorRegistrar<UdfProxyClient> __attribute__((unused)) udf_proxy_reg(PNE_ID_UDF, true);
UdfProxyClient::UdfProxyClient(int32_t device_id) : UdfExecutorClient(device_id) {
is_proxy_ = true;
}
Status UdfProxyClient::Initialize() {
MsprofConfigParam param = {};
param.deviceId = static_cast<uint32_t>(GetDeviceId());
param.type = MsprofConfigParamType::HELPER_HOST_SERVER;
param.value = 1;
(void) MsprofSetConfig(0, reinterpret_cast<const char *>(¶m), sizeof(param));
return SUCCESS;
}
ProcStatus UdfProxyClient::GetSubProcStat() {
return GetSubProcStatStartByPm();
}
Status UdfProxyClient::LoadModel(deployer::ExecutorRequest_BatchLoadModelMessage load_model_desc) {
GEEVENT("[Load][Model] begin, model size = %d.", load_model_desc.models_size());
std::vector<deployer::ExecutorRequest_BatchLoadModelMessage> load_model_descs;
std::vector<std::string> msg_file_paths;
GE_CHK_STATUS_RET(DistributeAndSerializeModelDescs(load_model_desc, load_model_descs, msg_file_paths),
"Distribute and serialize model desc failed.");
GE_TIMESTAMP_START(PreLoadProcess);
GE_CHK_STATUS_RET(PreLoadProcess(load_model_descs, msg_file_paths));
GE_TIMESTAMP_EVENT_END(PreLoadProcess, "preloading proxy udf models during deploying");
GE_TIMESTAMP_START(LoadProcess);
const auto device_id = GetDeviceId();
const auto &group_name = MemoryGroupManager::GetInstance().GetRemoteMemGroupName(device_id);
GE_CHK_STATUS_RET(TsdClient::GetInstance().CheckSupportBuiltinUdfLaunch(device_id, is_support_builtin_udf_launch_),
"Failed to check tsd support builtin udf launch, device_id=%d.", device_id);
for (size_t i = 0U; i < load_model_descs.size(); ++i) {
const auto &model_desc = load_model_descs[i];
const auto &msg_file_path = msg_file_paths[i];
GE_CHK_STATUS_RET(LoadProcess(model_desc, msg_file_path, group_name),
"Failed to load model.");
}
GE_TIMESTAMP_EVENT_END(LoadProcess, "starting poxy udf and loading models during deploying");
GEEVENT("[Load][Model] success.");
return SUCCESS;
}
void UdfProxyClient::NotifySubprocessShutdown(pid_t pid) const {
(void)pid;
}
Status UdfProxyClient::ShutdownSubprocess(pid_t pid, uint32_t wait_time_in_sec) const {
(void)wait_time_in_sec;
return TsdClient::GetInstance().ShutdownSubprocess(GetDeviceId(), pid, PNE_ID_UDF);
}
Status UdfProxyClient::PreLoadProcess(
const std::vector<deployer::ExecutorRequest_BatchLoadModelMessage> &load_model_descs,
const std::vector<std::string> &load_model_msg_paths) const {
const auto &context = GetContext();
bool is_tsd_support_unpack = false;
GE_CHK_STATUS_RET(TsdClient::GetInstance().CheckSupportInnerPackUnpack(context.device_id, is_tsd_support_unpack),
"Failed to check tsd support inner pack unpack, device_id=%d.", context.device_id);
GE_ASSERT_TRUE(!load_model_msg_paths.empty(), "Load model message should not be empty.");
auto model_path = load_model_msg_paths[0].substr(0, load_model_msg_paths[0].rfind('/') + 1);
const auto &base_path = context.base_dir;
auto tar_path = base_path + model_path;
GE_CHK_STATUS_RET(ProcessUtils::IsValidPath(tar_path), "The path[%s] is not valid.", tar_path.c_str());
std::vector<std::string> saved_model_paths;
std::vector<std::string> local_builtin_model_paths;
std::string link_cmd;
for (size_t i = 0UL; i < load_model_descs.size(); ++i) {
const auto &model_file_path = base_path + load_model_descs[i].models(0).model_path();
const auto &saved_model_path = load_model_descs[i].models(0).saved_model_file_path();
if (!load_model_descs[i].models(0).is_builtin_udf()) {
const std::string linked_model_name = "Ascend_udf_submodel_" + std::to_string(GetDeviceId()) +
"_" + std::to_string(i) + ".tar.gz";
link_cmd += "ln -sf " + saved_model_path + " " + tar_path + linked_model_name + ";";
saved_model_paths.emplace_back(model_path + linked_model_name);
GELOGD("Linked model model in [%s].", (tar_path + linked_model_name).c_str());
} else {
local_builtin_model_paths.emplace_back(load_model_descs[i].models(0).model_path());
GELOGD("Saved builtin model in [%s]", model_path.c_str());
}
}
auto tar_name = std::to_string(TsdClient::GetLoadFileCount()) + "_Ascend-flow_model.tar.gz";
std::string tar_cmd = link_cmd + (is_tsd_support_unpack ? "tar cf " : "tar czf ") + tar_path + tar_name;
std::string path_list_str;
GE_CHK_STATUS_RET(GenerateCmdStrSupportUnpack(load_model_msg_paths, saved_model_paths,
local_builtin_model_paths, path_list_str), "Generate cmd str for tsd support unpack failed.");
tar_cmd += path_list_str;
GE_TIMESTAMP_START(TarCmdExecute);
GE_CHK_STATUS_RET(ProcessUtils::System(tar_cmd), "Failed to tar om files, cmd = %s.", tar_cmd.c_str());
GELOGI("Process tar[%s] to dir[%s] success.", tar_name.c_str(), tar_path.c_str());
GE_TIMESTAMP_EVENT_END(TarCmdExecute, "Link and tar cmd");
GE_TIMESTAMP_START(TransTar);
GE_CHK_STATUS_RET(TsdClient::GetInstance().LoadFile(context.device_id, tar_path, tar_name),
"Failed to transfer om files, device_id = %d, tar path = %s, tar name = %s.", context.device_id,
tar_path.c_str(), tar_name.c_str());
GELOGI("Send tar[%s] to device[%d] success.", tar_name.c_str(), context.device_id);
GE_TIMESTAMP_EVENT_END(TransTar, "Tsd transport tar");
return SUCCESS;
}
Status UdfProxyClient::GenerateCmdStrSupportUnpack(const std::vector<std::string> &load_model_msg_paths,
const std::vector<std::string> &saved_model_paths, const std::vector<std::string> &local_builtin_model_paths,
std::string &path_list_str) const {
const auto &base_path = GetContext().base_dir;
path_list_str = " -C " + base_path + " ";
for (const auto &msg_path : load_model_msg_paths) {
path_list_str += msg_path + " ";
}
for (const auto &builtin_path : local_builtin_model_paths) {
path_list_str += builtin_path + " ";
}
path_list_str += " -h ";
for (const auto &tar_path : saved_model_paths) {
path_list_str += tar_path + " ";
}
return SUCCESS;
}
Status UdfProxyClient::LoadProcess(const deployer::ExecutorRequest_BatchLoadModelMessage &load_model_desc,
const std::string &msg_file_path,
const std::string &group_name) {
GEEVENT("Fork udf process to load model on executor start.");
pid_t child_pid = -1;
GE_CHK_STATUS_RET(ForkAndInit(load_model_desc, group_name, msg_file_path, child_pid),
"Failed to fork child process.");
const int32_t device_id = GetDeviceId();
GEEVENT("Fork udf process to load model on executor success. "
"model_type = %s, pid = %d, graph_id = %u, deployer pid = %d, device_id = %d.",
PNE_ID_UDF.c_str(), child_pid, load_model_desc.graph_id(), GetDeployerPid(), device_id);
return SUCCESS;
}
Status UdfProxyClient::ForkChildProcess(const deployer::ExecutorRequest_LoadModelRequest &model_req,
const std::string &file_path, const std::string &group_name,
const SubProcessParams ¶ms, pid_t &child_pid) {
const std::string &model_path = model_req.model_path();
int32_t phy_device_id = model_req.phy_device_id();
SubprocessManager::SubprocessConfig config{};
std::string ld_library_path;
if (params.is_builtin && is_support_builtin_udf_launch_) {
config.process_type = "BUILTIN_UDF";
} else {
config.process_type = PNE_ID_UDF;
ld_library_path = model_path + "_dir";
GELOGD("LD_LIBRARY_PATH is been set to %s", ld_library_path.c_str());
}
config.args = {"udf_executor"};
config.kv_args = {
{"--load_path", file_path}, {"--group_name", group_name},
{"--phy_device_id", std::to_string(phy_device_id)},
{"--req_queue_id", std::to_string(params.request_queue_id)},
{"--rsp_queue_id", std::to_string(params.response_queue_id)}};
config.kv_args.emplace("PROFILING_RENEW_PATH", "0");
const auto process_cfg = GetDevMaintenanceCfg();
if (process_cfg != nullptr) {
GE_CHK_STATUS_RET(process_cfg->DecodeConfig(config.envs, config.kv_args), "Decode config failed.");
}
auto step = config.kv_args.find(OPTION_EXEC_DUMP_STEP);
if (step != config.kv_args.cend()) {
std::string dump_step = step->second;
replace(dump_step.begin(), dump_step.end(), '|', '_');
step->second = dump_step;
GELOGI("ForkChildProcess dump step[%s] trans success.", dump_step.c_str());
}
GE_CHK_STATUS_RET(TsdClient::GetInstance().ForkSubprocess(GetDeviceId(), config, ld_library_path, child_pid),
"Failed to fork udf_executor on device[%d], model_path=%s, is_builtin=%d.", GetDeviceId(),
model_path.c_str(), static_cast<int32_t>(params.is_builtin));
GELOGI("for child process success, model_path=%s, is_builtin=%d", model_path.c_str(),
static_cast<int32_t>(params.is_builtin));
return SUCCESS;
}
Status UdfProxyClient::DoMemGrpAddProc(const std::string &group_name, const pid_t child_pid) {
const int32_t device_id = GetDeviceId();
GE_CHK_STATUS_RET(
MemoryGroupManager::GetInstance().RemoteMemGrpAddProc(device_id, group_name, child_pid, false, true),
"Failed to add group[%s] to process[%d], device id[%d].", group_name.c_str(), child_pid, device_id);
return SUCCESS;
}
}