* Copyright (c) 2025 Huawei Technologies Co., Ltd.
* This program is free software, you can redistribute it and/or modify it under the terms and conditions of
* CANN Open Software License Agreement Version 2.0 (the "License").
* Please refer to the License for details. You may not use this file except in compliance with the License.
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED,
* INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE.
* See LICENSE in the root of the software repository for the full text of the License.
*/
#include "deploy/model_send/flow_model_sender.h"
#include <fstream>
#include <algorithm>
#include "common/thread_pool/thread_pool.h"
#include "graph/manager/graph_var_manager.h"
#include "graph/ge_context.h"
#include "graph/debug/ge_attr_define.h"
#include "graph/utils/graph_utils.h"
#include "graph/utils/tensor_utils.h"
#include "common/file_constant_utils/file_constant_utils.h"
#include "framework/common/framework_types_internal.h"
#include "securec.h"
#include "deploy/deployer/deployer_proxy.h"
#include "deploy/flowrm/flow_route_planner.h"
#include "common/data_flow/queue/heterogeneous_exchange_service.h"
#include "dflow/base/utils/process_utils.h"
#include "deploy/resource/resource_manager.h"
#include "deploy/flowrm/flow_route_manager.h"
#include "deploy/resource/heterogeneous_deploy_planner.h"
#include "executor/executor_context.h"
namespace ge {
namespace {
constexpr int32_t kEnqueueTimeout = 5 * 60 * 1000;
const std::string STATIC_MODEL_ADDR_FIXED = "ge.exec.static_model_addr_fixed";
constexpr const char *OPTION_MOMORY_POOL_THRESHOLD = "ge.experiment.memory_pool_threshold";
constexpr const char *OPTION_FLOAT_OVERFLOW_MODE = "ge.exec.float_overflow_mode";
constexpr const char *OPTION_FLOAT_OVERFLOW_MODE_SATURATION = "saturation";
constexpr const char *ATTR_NAME_IS_DATA_FLOW_GRAPH = "_dflow_is_data_flow_graph";
constexpr const char_t *ATTR_NAME_DATA_FLOW_DATA_FLOW_SCOPE = "_dflow_data_flow_scope";
const std::string kUdfBuildInFuncNamePrefix = "_BuiltIn_";
const std::string kUdfResourceSubDir = "udf_resource";
constexpr size_t kMaxTransferPoolSize = 12U;
constexpr size_t kMaxSerializePoolSize = 8U;
const std::unordered_set<string> kTransferOptionsWhiteList{
STATIC_MEMORY_POLICY, FILE_CONSTANT_PATH, OP_WAIT_TIMEOUT, OP_EXECUTE_TIMEOUT, OPTION_EXEC_REUSE_ZERO_COPY_MEMORY,
STATIC_MODEL_ADDR_FIXED, OPTION_EXEC_STREAM_SYNC_TIMEOUT, OPTION_MOMORY_POOL_THRESHOLD,
OPTION_EXEC_DYNAMIC_GRAPH_PARALLEL_MODE, OPTION_EXEC_PROFILING_MODE, OPTION_EXEC_PROFILING_OPTIONS};
}
FlowModelSender::~FlowModelSender() {
for (const auto &it : node_id_to_plan_) {
const auto &remote_plan = it.second;
FlowRouteManager::GetInstance().UndeployRoute(FlowRouteType::kTransferFlowRoute, remote_plan.route_id(),
DeployContext::LocalContext().GetFlowGwClientManager());
}
}
Status FlowModelSender::SerializeModel(const PneModelPtr &model, ModelBufferData &model_buff) {
return model->SerializeModel(model_buff);
}
Status FlowModelSender::DeployRemoteVarManager(DeployState &deploy_state) {
std::map<std::string, std::vector<const DeployPlan::SubmodelInfo *>> model_groups;
for (const auto &it : deploy_state.GetDeployPlan().GetSubmodels()) {
const auto &submodel = it.second;
model_groups[submodel.device_info.GetKey()].emplace_back(&submodel);
}
return DeployRemoteVarManager(model_groups);
}
Status FlowModelSender::DeployRemoteVarManager(const std::map<std::string,
std::vector<const DeployPlan::SubmodelInfo *>> &models) {
std::map<int32_t, std::set<uint64_t>> sessions;
std::map<int32_t, std::map<uint64_t, std::map<OpDescPtr, std::set<int32_t>>>> node_need_transfer_memory;
std::map<int32_t, std::set<int32_t>> device_ids;
for (const auto &it : models) {
const auto &submodels = it.second;
GE_CHK_BOOL_RET_STATUS(!submodels.empty(), FAILED, "The submodels must be not empty.");
const auto &target_device = submodels[0]->device_info;
GELOGD("[Deploy][RemoteVarManager] started, target_device = %s, submodel count = %zu.",
target_device.GetDesc().c_str(),
submodels.size());
GE_CHK_STATUS_RET(GetAllRelatedVarManager(target_device, submodels, sessions,
node_need_transfer_memory, device_ids),
"Failed to GetAllRelatedVarManager");
GELOGD("[Deploy][RemoteVarManager] Success, target_device = %s.", target_device.GetDesc().c_str());
}
GE_CHK_STATUS_RET(TransferFileConstants(device_ids, node_need_transfer_memory),
"Failed to GetVarManagerAndSendToRemote.");
return SUCCESS;
}
Status FlowModelSender::GetAllRelatedVarManager(
const DeployPlan::DeviceInfo &device_info,
const std::vector<const DeployPlan::SubmodelInfo *> &submodels,
std::map<int32_t, std::set<uint64_t>> &sessions,
std::map<int32_t, std::map<uint64_t, std::map<OpDescPtr, std::set<int32_t>>>> &node_need_transfer_memory,
std::map<int32_t, std::set<int32_t>> &device_ids) {
const auto node_id = device_info.GetNodeId();
int32_t local_node_id = ResourceManager::GetInstance().GetLocalNodeId();
for (const auto &submodel : submodels) {
auto model = submodel->model;
if (model == nullptr) {
continue;
}
uint64_t session = GetContext().SessionId();
(void) sessions[node_id].emplace(session);
device_ids[node_id].emplace(device_info.GetDeviceId());
auto root_graph = model->GetRootGraph();
GE_CHECK_NOTNULL(root_graph);
if (node_id != local_node_id) {
for (const auto &node : root_graph->GetAllNodes()) {
if (node->GetType() == FILECONSTANT) {
node_need_transfer_memory[node_id][session][node->GetOpDesc()].emplace(device_info.GetDeviceId());
GELOGI("FileConstant[%s] need to transfer to device[%d].",
node->GetOpDesc()->GetName().c_str(), device_info.GetDeviceId());
}
}
}
}
return SUCCESS;
}
Status FlowModelSender::GetOpFileInfo(const OpDescPtr &op_desc,
const std::map<std::string, std::string> &file_id_to_path,
std::string &file_path,
size_t &offset,
size_t &length) {
ge::ConstGeTensorDescPtr tensor_desc = op_desc->GetOutputDescPtr(0U);
GE_CHECK_NOTNULL(tensor_desc);
int64_t total_length = 0;
GE_CHK_STATUS_RET(TensorUtils::GetTensorSizeInBytes(*tensor_desc, total_length),
"Failed to get size of file constant(%s).", op_desc->GetName().c_str());
GE_CHK_STATUS_RET(FileConstantUtils::GetFilePath(op_desc, file_id_to_path, file_path, offset, length),
"Failed to get file path.");
length = (length == 0U ? total_length : length);
return SUCCESS;
}
Status FlowModelSender::TransferFileConstants(
const std::map<int32_t, std::set<int32_t>> &device_ids,
const std::map<int32_t, std::map<uint64_t, std::map<OpDescPtr, std::set<int32_t>>>> &node_need_transfer_memory) {
std::map<std::string, std::string> file_id_to_path;
GE_CHK_STATUS_RET(FileConstantUtils::GetFileIdToPathMapFromOption(file_id_to_path), "Failed to get file path");
for (const auto &it : node_need_transfer_memory) {
GELOGI("[VarManager] process shared memory.");
auto node_id = it.first;
auto session_op_desc_map = it.second;
auto dev_iter = device_ids.find(node_id);
if (dev_iter == device_ids.end()) {
continue;
}
for (const auto &session_iter : session_op_desc_map) {
auto session_id = session_iter.first;
auto op_desc_map = session_iter.second;
std::map<std::string, std::set<int32_t>> op_transfer_device_list;
for (const auto &op_it : op_desc_map) {
const auto &op_desc = op_it.first;
std::string file_path;
size_t offset = 0U;
size_t length = 0U;
GE_CHK_STATUS_RET(GetOpFileInfo(op_desc, file_id_to_path, file_path, offset, length),
"Failed to get file path");
auto send_key = op_desc->GetName() + "_" +
file_path + "_" +
std::to_string(offset) + "_" +
std::to_string(length) + "_" +
std::to_string(node_id);
op_transfer_device_list[send_key].insert(op_it.second.begin(), op_it.second.end());
GELOGI("FileConstant[%s] need to transfer to device, device size = %zu, send key = %s.",
op_desc->GetName().c_str(), op_it.second.size(), send_key.c_str());
}
for (const auto &op_desc_to_device : op_desc_map) {
const auto &op_desc = op_desc_to_device.first;
std::string file_path;
size_t offset = 0U;
size_t length = 0U;
GE_CHK_STATUS_RET(GetOpFileInfo(op_desc, file_id_to_path, file_path, offset, length),
"Failed to get file path");
auto send_key = op_desc->GetName() + "_" +
file_path + "_" +
std::to_string(offset) + "_" +
std::to_string(length) + "_" +
std::to_string(node_id);
ge::ConstGeTensorDescPtr tensor_desc = op_desc->GetOutputDescPtr(0U);
GE_CHECK_NOTNULL(tensor_desc);
SendInfo send_info;
send_info.session_id = session_id;
send_info.node_id = node_id;
send_info.device_ids = std::vector<int32_t>(dev_iter->second.begin(), dev_iter->second.end());
std::unique_ptr<std::istream> input_stream;
GE_CHK_STATUS_RET_NOLOG(CreateInputStream(file_path, offset, input_stream));
GE_CHK_STATUS_RET(CopyOneWeightToTransfer(send_info,
*input_stream,
length,
op_desc,
op_transfer_device_list[send_key]),
"Failed to send data.");
}
}
}
return SUCCESS;
}
Status FlowModelSender::TransferWeightWithQ(std::istream &input_stream,
int64_t file_constant_size,
const DeployQueueAttr &queue_attr) const {
std::string compress_nodes;
size_t used_memory = 0U;
size_t copy_len_once = 0U;
auto file_size = static_cast<size_t>(file_constant_size);
const int64_t kBlockSize = 1024 * 1024 * 10;
compress_nodes.reserve(kBlockSize);
input_stream.seekg(0, input_stream.end);
int64_t stream_size = input_stream.tellg();
GE_CHK_BOOL_RET_STATUS(stream_size == file_constant_size, FAILED,
"The file size[%ld] is inconsistent with the specified size[%ld] of tensor dec",
stream_size, file_constant_size);
input_stream.seekg(0, input_stream.beg);
auto &exchange_service = HeterogeneousExchangeService::GetInstance();
ExchangeService::ControlInfo control_info{};
control_info.timeout = kEnqueueTimeout;
while ((!input_stream.eof()) && (used_memory != file_size)) {
input_stream.read(&compress_nodes[0], kBlockSize);
copy_len_once = input_stream.gcount();
if (file_size - used_memory < copy_len_once) {
copy_len_once = file_size - used_memory;
}
used_memory += copy_len_once;
GELOGD("Enqueue shared content size[%zu] of total size[%lu] to queue[%u] in device[%d]",
copy_len_once, file_constant_size, queue_attr.queue_id, queue_attr.device_id);
GE_CHK_STATUS_RET(exchange_service.Enqueue(queue_attr.device_id,
queue_attr.queue_id,
&compress_nodes[0],
copy_len_once,
control_info),
"Failed to enqueue weight, device_id = %d, queue_id = %d",
queue_attr.device_id, queue_attr.queue_id);
}
return SUCCESS;
}
Status FlowModelSender::GetOrCreateFlowRoutePlan(const SendInfo &send_info,
deployer::FlowRoutePlan &remote_route) {
auto it = node_id_to_plan_.find(send_info.node_id);
if (it != node_id_to_plan_.end()) {
remote_route = it->second;
return SUCCESS;
}
ExchangeRoute local_route;
int64_t route_id = -1;
GE_CHK_STATUS_RET_NOLOG(TransferPreDeploy(send_info, local_route, remote_route));
GE_CHK_STATUS_RET_NOLOG(FlowRouteManager::GetInstance().AddRoute(local_route, FlowRouteType::kTransferFlowRoute,
route_id));
GELOGD("Add local transfer route:%ld", route_id);
remote_route.set_route_id(route_id);
node_id_to_plan_[send_info.node_id] = remote_route;
return SUCCESS;
}
Status FlowModelSender::CopyOneWeightToTransfer(const SendInfo &send_info,
std::istream &input_stream,
int64_t file_constant_size,
const OpDescPtr &op_desc,
const std::set<int32_t> &devices) {
auto session_id = static_cast<int64_t>(send_info.session_id);
GELOGI("Enter to CopyOneWeightToTransfer, file constant size = %ld", file_constant_size);
deployer::FlowRoutePlan remote_route;
GE_CHK_STATUS_RET(GetOrCreateFlowRoutePlan(send_info, remote_route),
"Get or create transfer weight flow route plan failed.");
auto local_route =
FlowRouteManager::GetInstance().QueryRoute(FlowRouteType::kTransferFlowRoute, remote_route.route_id());
GE_CHECK_NOTNULL(local_route);
std::vector<DeployQueueAttr> queue_attrs;
local_route->GetQueueAttrs(queue_attrs);
GE_CHK_BOOL_RET_STATUS(queue_attrs.size() >= 1U, FAILED,
"Check transfer pre deploy queue size[%zu] failed.",
queue_attrs.size());
auto f = std::async(std::launch::async, [this, &input_stream, file_constant_size, &queue_attrs]() {
GE_CHK_STATUS_RET(TransferWeightWithQ(input_stream, file_constant_size, queue_attrs[0]),
"Failed to transfer weight, file size = %ld.", file_constant_size);
return SUCCESS;
});
auto tensor_desc = op_desc->MutableOutputDesc(0);
GE_CHECK_NOTNULL(tensor_desc);
rtMemType_t memory_type = RT_MEMORY_HBM;
auto mem_type = static_cast<uint32_t>(RT_MEMORY_DEFAULT);
if (AttrUtils::GetInt(op_desc, ATTR_OUTPUT_MEMORY_TYPE, mem_type) && (mem_type == 1)) {
memory_type = RT_MEMORY_RDMA_HBM;
}
deployer::DeployerRequest request;
request.set_type(deployer::kDownloadSharedContent);
auto shared_content_desc_request = request.mutable_shared_content_desc_request();
GE_CHECK_NOTNULL(shared_content_desc_request);
shared_content_desc_request->mutable_device_ids()->Add(devices.begin(), devices.end());
*shared_content_desc_request->mutable_flow_route() = remote_route;
auto shared_content_description = shared_content_desc_request->mutable_shared_content_desc();
GE_CHECK_NOTNULL(shared_content_description);
shared_content_description->set_session_id(session_id);
shared_content_description->set_node_name(op_desc->GetName());
shared_content_description->set_total_length(file_constant_size);
shared_content_description->set_mem_type(memory_type);
proto::TensorDescriptor *tensor_desc_proto = shared_content_description->mutable_tensor_desc();
GeTensorSerializeUtils::GeTensorDescAsProto(*tensor_desc, tensor_desc_proto);
deployer::DeployerResponse response;
GE_CHK_STATUS_RET(DeployerProxy::GetInstance().SendRequest(send_info.node_id, request, response),
"[Send] [shared_content] failed.");
if ((response.error_code() != SUCCESS) || (f.get() != SUCCESS)) {
GELOGE(FAILED, "[CopyOneWeightToTransfer] failed, node_id = %d, error code = %u, error message = %s",
send_info.node_id, response.error_code(), response.error_message().c_str());
return FAILED;
}
return SUCCESS;
}
Status FlowModelSender::CreateInputStream(const std::string &constant_file_path, size_t offset,
unique_ptr<std::istream> &in_stream) {
std::string real_path = RealPath(constant_file_path.c_str());
auto file_stream = MakeUnique<std::ifstream>(real_path, std::ifstream::binary);
GE_CHECK_NOTNULL(file_stream);
if (!file_stream->is_open()) {
GELOGE(GRAPH_FAILED, "[Open][File] %s failed.", real_path.c_str());
REPORT_INNER_ERR_MSG("E19999", "open file:%s failed.", real_path.c_str());
return GRAPH_FAILED;
}
file_stream->clear();
file_stream->seekg(offset, file_stream->beg);
in_stream = std::move(file_stream);
return SUCCESS;
}
Status FlowModelSender::DeployDevMaintenanceCfg(const DeployState &deploy_state) {
for (const auto &it : deploy_state.GetDeployPlan().GetSubmodels()) {
auto target_node_id = it.second.device_info.GetNodeId();
GE_CHK_STATUS_RET(DownloadDevMaintenanceCfg(target_node_id), "[Download][DevMaintenanceCfg] failed, node_id[%d].",
target_node_id);
}
return SUCCESS;
}
Status FlowModelSender::DownloadDevMaintenanceCfg(int32_t dev_id) {
GELOGD("[Download][Device debug Config] start, device id[%d]", dev_id);
GE_MAKE_GUARD(close_config, [dev_id]() {
DeviceMaintenanceCfgManager::GetInstance().CloseDevMaintenanceConfig(dev_id);
});
GE_CHK_STATUS_RET(DeviceMaintenanceCfgManager::GetInstance().CreateDevMaintenanceConfig(dev_id),
"[Create][MaintenanceCfg] failed, device id[%d].", dev_id);
GE_CHK_STATUS_RET(DeployDevCfg(dev_id, DeviceDebugConfig::ConfigType::kLogConfigType),
"[Download][LogConfig] failed, device id[%d].", dev_id);
GE_CHK_STATUS_RET(DeployDevCfg(dev_id, DeviceDebugConfig::ConfigType::kDumpConfigType),
"[Download][DumpConfig] failed, device id[%d].", dev_id);
GE_CHK_STATUS_RET(DeployDevCfg(dev_id, DeviceDebugConfig::ConfigType::kProfilingConfigType),
"[Download][ProfilingConfig] failed, device id[%d].", dev_id);
return SUCCESS;
}
Status FlowModelSender::DeployDevCfg(int32_t dev_id, DeviceDebugConfig::ConfigType conf_type) {
std::map<DeviceDebugConfig::ConfigType, deployer::DeviceConfigType> conf_type_map = {
{DeviceDebugConfig::ConfigType::kLogConfigType, deployer::kLogConfig},
{DeviceDebugConfig::ConfigType::kDumpConfigType, deployer::kDumpConfig},
{DeviceDebugConfig::ConfigType::kProfilingConfigType, deployer::kProfilingConfig},
};
if (conf_type_map.find(conf_type) == conf_type_map.end()) {
GELOGW("Init device config failed, cannot find config type=%d.", static_cast<int32_t>(conf_type));
return SUCCESS;
}
std::string conf_data;
const auto &conf = DeviceMaintenanceCfgManager::GetInstance().GetDevMaintenanceConfig(dev_id);
GE_CHECK_NOTNULL(conf);
if (conf->GetJsonDataByType(conf_type, conf_data) != SUCCESS) {
GELOGI("Do not have device cfg, cfg type[%d].", static_cast<int32_t>(conf_type));
return SUCCESS;
}
deployer::DeployerRequest request;
request.set_type(deployer::kDownloadConf);
deployer::DeployerResponse response;
auto download_config_request = request.mutable_download_config_request();
download_config_request->set_sub_type(conf_type_map[conf_type]);
download_config_request->set_device_id(dev_id);
download_config_request->set_config_data(&conf_data[0], conf_data.size());
if (DeployerProxy::GetInstance().SendRequest(dev_id, request, response) != SUCCESS) {
REPORT_INNER_ERR_MSG("E19999",
"Send disconnect request failed, response info:%s",
response.error_message().c_str());
GELOGE(FAILED, "[Send][Request]Send disconnect request failed, response info:%s", response.error_message().c_str());
return FAILED;
}
auto error_code = response.error_code();
if (error_code != SUCCESS) {
REPORT_INNER_ERR_MSG("E19999", "Check response failed. error code =%u, error message=%s", error_code,
response.error_message().c_str());
GELOGE(FAILED, "[Check][Response]Check response failed. error code =%u, error message=%s", error_code,
response.error_message().c_str());
return FAILED;
}
GELOGI("DeployDevCfg successfully, conf_type=%d, dev_id=%d.", static_cast<int32_t>(conf_type), dev_id);
return SUCCESS;
}
Status FlowModelSender::SendDatagwSchedInfo(std::map<std::string, const DeployPlan::DeviceInfo *> &datagw_devices_used,
std::map<std::string, deployer::DeployerRequest> &datagw_sched_infos) {
for (const auto &it : datagw_devices_used) {
const auto &target_device = *it.second;
deployer::DeployerResponse response;
GE_CHK_STATUS_RET(DeployerProxy::GetInstance().SendRequest(target_device.GetNodeId(),
datagw_sched_infos[it.first], response),
"DynamicSched Failed to send request to target_device = %s", target_device.GetDesc().c_str());
auto ret = response.error_code();
if (ret != SUCCESS) {
GELOGE(ret, "DynamicSched [Transfer][Sched info] failed, target_device = %s, request = %s, error_message = %s",
target_device.GetDesc().c_str(), datagw_sched_infos[it.first].DebugString().c_str(),
response.error_message().c_str());
return ret;
}
GEEVENT("DynamicSched [Transfer][Sched info] success, target_device = %s",
target_device.GetDesc().c_str());
}
return SUCCESS;
}
Status FlowModelSender::TransferDataGwDeployPlan(DeployState &deploy_state) {
if (!deploy_state.GetIsDynamicSched() && !deploy_state.IsEnableExceptionCatch()) {
GEEVENT(
"DynamicSched dynamic sched model close and exception catch disable, root_model_id=%u, "
"don't need send config to flowgw.",
deploy_state.GetRootModelId());
return SUCCESS;
}
std::map<std::string, const DeployPlan::DeviceInfo *> datagw_devices_used;
std::map<std::string, deployer::DeployerRequest> datagw_sched_infos;
for (const auto &name_and_submodel_desc : deploy_state.GetDeployPlan().GetSubmodels()) {
const auto &submodel_desc = name_and_submodel_desc.second;
if (submodel_desc.model == nullptr) {
GEEVENT("DynamicSched, process datagw model=%s", name_and_submodel_desc.first.c_str());
deployer::DeployerRequest request;
request.set_type(deployer::kDatagwSchedInfo);
auto datagw_config_info = request.mutable_datagw_sched_info();
datagw_config_info->set_root_model_id(deploy_state.GetRootModelId());
datagw_config_info->set_device_id(submodel_desc.device_info.GetDeviceId());
datagw_config_info->set_device_type(submodel_desc.device_info.GetType());
datagw_config_info->set_is_dynamic_sched(deploy_state.GetIsDynamicSched());
auto is_proxy_q = false;
if (submodel_desc.device_info.WithProxy()) {
is_proxy_q = true;
}
datagw_config_info->set_is_proxy(is_proxy_q);
if (submodel_desc.sched_input_queue_indices.size() == 1) {
datagw_config_info->set_input_queue_indice(submodel_desc.sched_input_queue_indices[0]);
GELOGI("DynamicSched set datagw sched info, input queue indice=%d.",
submodel_desc.sched_input_queue_indices[0]);
} else {
GELOGE(FAILED, "DynamicSched set datagw sched info failed, input indice num %u!",
submodel_desc.sched_input_queue_indices.size());
}
if (submodel_desc.sched_output_queue_indices.size() == 1) {
datagw_config_info->set_output_queue_indice(submodel_desc.sched_output_queue_indices[0]);
GELOGI("DynamicSched set datagw sched info, output queue indice=%d.",
submodel_desc.sched_output_queue_indices[0]);
} else {
GELOGE(FAILED, "DynamicSched set datagw sched info failed, output indice num %u!",
submodel_desc.sched_output_queue_indices.size());
}
datagw_devices_used[submodel_desc.device_info.GetKey()] = &submodel_desc.device_info;
datagw_sched_infos[submodel_desc.device_info.GetKey()] = std::move(request);
GEEVENT("DynamicSched add sched model info, root_model_id=%u, model name=%s, device_id=%d, device_type=%d, "
"is_proxy=%d.", deploy_state.GetRootModelId(), name_and_submodel_desc.first.c_str(),
submodel_desc.device_info.GetDeviceId(), submodel_desc.device_info.GetType(), is_proxy_q);
}
}
return SendDatagwSchedInfo(datagw_devices_used, datagw_sched_infos);
}
Status FlowModelSender::TransferDeployPlan(const DeployState &deploy_state) {
std::map<std::string, std::vector<deployer::SubmodelDesc>> grouped_by_target_device;
std::map<std::string, const DeployPlan::DeviceInfo *> devices_used;
GE_CHK_STATUS_RET_NOLOG(BuildSubmodelDescs(deploy_state, grouped_by_target_device, devices_used));
for (const auto &it : devices_used) {
const auto &target_device = *it.second;
const auto *node_info = DeployerProxy::GetInstance().GetNodeInfo(target_device.GetNodeId());
GE_CHECK_NOTNULL(node_info);
auto &submodel_descs = grouped_by_target_device[target_device.GetKey()];
for (const auto &submodel_desc : submodel_descs) {
GEEVENT("Model deployment info, model_name = %s, model_type = %s, graph_id = %u, %s, device_id = %d.",
submodel_desc.model_name().c_str(), submodel_desc.engine_name().c_str(), deploy_state.GetGraphId(),
node_info->DebugString().c_str(), target_device.GetDeviceId());
}
deployer::DeployerRequest request;
GE_CHK_STATUS_RET(BuildUpdateDeployPlanRequest(deploy_state, target_device, submodel_descs, request),
"Failed to build request");
GEEVENT("[Transfer][DeployPlan] in deploying start, root_model_id = %u, target_device = %s",
deploy_state.GetRootModelId(), target_device.GetDesc().c_str());
deployer::DeployerResponse response;
GE_CHK_STATUS_RET(DeployerProxy::GetInstance().SendRequest(target_device.GetNodeId(), request, response),
"Failed to send request to target_device = %s", target_device.GetDesc().c_str());
auto ret = response.error_code();
if (ret != SUCCESS) {
GELOGE(ret, "[Transfer][DeployPlan] failed, target_device = %s, request = %s, error_message = %s",
target_device.GetDesc().c_str(), request.DebugString().c_str(), response.error_message().c_str());
return ret;
}
GEEVENT("[Transfer][DeployPlan] in deploying success, root_model_id = %u, target_device = %s",
deploy_state.GetRootModelId(), target_device.GetDesc().c_str());
}
return SUCCESS;
}
Status FlowModelSender::TransferFlowRoutePlan(const DeployState &deploy_state) {
for (const auto &it : deploy_state.GetFlowRoutePlansToDeploy()) {
const auto &node_id = it.first;
const auto &flow_route_plan = it.second;
deployer::DeployerRequest request;
request.set_type(deployer::kAddFlowRoutePlan);
auto flow_route_plan_request = request.mutable_add_flow_route_plan_request();
flow_route_plan_request->set_node_id(node_id);
flow_route_plan_request->set_root_model_id(deploy_state.GetRootModelId());
*(flow_route_plan_request->mutable_flow_route_plan()) = flow_route_plan;
GEEVENT("[Transfer][FlowRoutePlan] start, root_model_id = %u, target_node = %d",
deploy_state.GetRootModelId(), node_id);
deployer::DeployerResponse response;
GE_CHK_STATUS_RET(DeployerProxy::GetInstance().SendRequest(node_id, request, response),
"Failed to send request to target_device = %d", node_id);
auto ret = response.error_code();
if (ret != SUCCESS) {
GELOGE(ret, "[Transfer][FlowRoutePlan] failed, target_node = %d, request = %s, error_message = %s",
node_id, request.DebugString().c_str(), response.error_message().c_str());
return ret;
}
GEEVENT("[Transfer][FlowRoutePlan] success, root_model_id = %u, target_node = %d",
deploy_state.GetRootModelId(), node_id);
}
return SUCCESS;
}
void FlowModelSender::AddDynamicSchedInfo(const DeployState &deploy_state, const std::string &model_instance_name,
deployer::SubmodelDesc &submodel_desc) {
uint32_t instance_num = 0U;
auto &model_instances_num = deploy_state.GetDeployPlan().GetDynamicSchedPlan().GetModelInstanceNum();
auto ins_iter = model_instances_num.find(model_instance_name);
if (ins_iter != model_instances_num.end()) {
instance_num = ins_iter->second;
} else {
GELOGI("DynamicSched single instance model no need report status, model name=%s.",
model_instance_name.c_str());
}
if ((instance_num > 1U) || deploy_state.IsEnableExceptionCatch()) {
const auto &dynamic_sched_model = deploy_state.GetDeployPlan().GetSubmodels();
auto iter = dynamic_sched_model.find(model_instance_name);
if (iter != dynamic_sched_model.end()) {
for (const auto &idx : iter->second.status_input_queue_indices) {
submodel_desc.add_status_input_queue_indices(idx);
GELOGI("DynamicSched add status input queue indice=%d, model name=%s.",
idx, model_instance_name.c_str());
}
for (const auto &idx : iter->second.status_output_queue_indices) {
submodel_desc.add_status_output_queue_indices(idx);
GELOGI("DynamicSched add status output queue indice=%d, model name=%s.",
idx, model_instance_name.c_str());
}
}
}
}
Status FlowModelSender::BuildSubmodelDescs(
const DeployState &deploy_state,
std::map<std::string, std::vector<deployer::SubmodelDesc>> &submodel_descs,
std::map<std::string, const DeployPlan::DeviceInfo *> &devices_used) {
std::map<std::string, std::map<std::string, std::vector<std::pair<const DeployPlan::SubmodelInfo *,
const std::string>>>> grouped_submodels;
std::map<std::string, std::vector<std::string>> model_instances;
for (const auto &name_and_submodel_desc : deploy_state.GetDeployPlan().GetSubmodels()) {
const auto &submodel_desc = name_and_submodel_desc.second;
if (submodel_desc.model == nullptr) {
continue;
}
grouped_submodels[submodel_desc.model->GetModelName()]
[submodel_desc.device_info.GetKey()].
emplace_back(std::make_pair(&submodel_desc, name_and_submodel_desc.first));
devices_used[submodel_desc.device_info.GetKey()] = &submodel_desc.device_info;
model_instances[submodel_desc.model->GetModelName()].emplace_back(name_and_submodel_desc.first);
}
const InputAlignAttrs &input_align_attrs = deploy_state.GetInputAlignAttrs();
bool enable_exception_catch = deploy_state.IsEnableExceptionCatch();
for (const auto &it : grouped_submodels) {
const auto &submodel_instances = it.second;
auto replica_num = static_cast<uint32_t>(model_instances[it.first].size());
uint32_t replica_idx = 0;
std::map<int32_t, std::vector<std::string>> model_instances_on_node;
for (const auto &model_instance : submodel_instances) {
const auto &submodel_infos = model_instance.second;
for (const auto &submodel_info_ptr : submodel_infos) {
const auto &submodel_info = *submodel_info_ptr.first;
deployer::SubmodelDesc submodel_desc;
GE_CHECK_NOTNULL(submodel_info.model, "Submodel is nullptr.");
const auto &model_name = submodel_info.model->GetModelName();
submodel_desc.set_model_name(model_name);
submodel_desc.set_model_instance_name(submodel_info_ptr.second);
submodel_desc.set_process_id(submodel_info.process_id);
model_instances_on_node[submodel_info.device_info.GetNodeId()].emplace_back(model_name);
uint32_t replica_idx_on_node = model_instances_on_node[submodel_info.device_info.GetNodeId()].size() - 1U;
submodel_desc.set_replica_idx_on_node(replica_idx_on_node);
const std::string gen_model_file_path = GetModelFilePath(deploy_state, model_name);
if ((submodel_info.model->GetModelType() == PNE_ID_UDF) && (!submodel_info.model->GetIsBuiltinModel())) {
const std::string untar_path = std::to_string(deploy_state.GetSessionId()) + "/" +
std::to_string(deploy_state.GetRootModelId()) + "/" + kUdfResourceSubDir + "/" +
submodel_info.model->GetNormalizedModelName() + ".om";
submodel_desc.set_model_path(untar_path);
GELOGD("Set model file path [%s] by normalized name.", untar_path.c_str());
} else {
submodel_desc.set_model_path(gen_model_file_path);
GELOGD("Set model file path [%s].", gen_model_file_path.c_str());
}
std::string saved_model_path;
GE_CHK_STATUS_RET(GetSavedFilePath(submodel_info, gen_model_file_path, saved_model_path),
"Failed to get saved path result of cache is not compatible.");
submodel_desc.set_saved_model_file_path(saved_model_path);
submodel_desc.set_is_builtin_udf(submodel_info.model->GetIsBuiltinModel());
submodel_desc.set_enable_exception_catch(enable_exception_catch);
GELOGD("Set saved model file path [%s].", saved_model_path.c_str());
submodel_desc.set_is_remote_model(saved_model_path == gen_model_file_path);
submodel_desc.set_submodel_id(deploy_state.GetSubmodelId(model_name));
submodel_desc.set_engine_name(submodel_info.model->GetModelType());
submodel_desc.set_replica_num(replica_num);
submodel_desc.set_replica_idx(replica_idx);
submodel_desc.set_phy_device_id(submodel_info.device_info.GetProxyDeviceId());
if (input_align_attrs.align_max_cache_num > 0) {
auto *proto_input_align_attrs = submodel_desc.mutable_input_align_attrs();
proto_input_align_attrs->set_align_max_cache_num(input_align_attrs.align_max_cache_num);
proto_input_align_attrs->set_align_timeout(input_align_attrs.align_timeout);
proto_input_align_attrs->set_drop_when_not_align(input_align_attrs.drop_when_not_align);
}
submodel_desc.set_execute_times(-1);
const ComputeGraphPtr root_graph = submodel_info.model->GetRootGraph();
GE_CHECK_NOTNULL(root_graph);
bool is_dynamic = false;
(void)AttrUtils::GetBool(root_graph, ATTR_NAME_DYNAMIC_SHAPE_PARTITIONED, is_dynamic);
is_dynamic = (is_dynamic || root_graph->GetGraphUnknownFlag());
GELOGI("Model[%s] set dynamic flag is %d.", model_name.c_str(), is_dynamic);
submodel_desc.set_is_dynamic(is_dynamic);
GELOGI("Model[%s] set head info is %d.", model_name.c_str(), submodel_info.is_head);
submodel_desc.set_is_head(submodel_info.is_head);
std::string scope = "";
(void)AttrUtils::GetStr(root_graph, ATTR_NAME_DATA_FLOW_DATA_FLOW_SCOPE, scope);
submodel_desc.set_scope(scope);
for (auto idx : submodel_info.input_queue_indices) {
submodel_desc.add_input_queue_indices(idx);
}
for (auto idx : submodel_info.control_input_queue_indices) {
submodel_desc.add_input_queue_indices(idx);
}
for (auto idx : submodel_info.output_queue_indices) {
submodel_desc.add_output_queue_indices(idx);
}
for (auto idx : submodel_info.control_output_queue_indices) {
submodel_desc.add_output_queue_indices(idx);
}
AddDynamicSchedInfo(deploy_state, submodel_info_ptr.second, submodel_desc);
auto attrs = submodel_desc.mutable_attrs();
for (const auto &attr : submodel_info.attrs) {
(*attrs)[attr.first] = attr.second;
GELOGD("Add attr[%s], value[%s] for model name[%s].", attr.first.c_str(), attr.second.c_str(),
submodel_desc.model_name().c_str());
}
if (!submodel_info.invoked_model_queue_infos.empty()) {
auto proto_invoked_model_queues = submodel_desc.mutable_invoked_model_queues();
for (const auto &invoked_model_queue_info : submodel_info.invoked_model_queue_infos) {
deployer::ModelQueueIndices proto_model_queue_indices;
for (auto idx : invoked_model_queue_info.second.feed_queue_indices) {
proto_model_queue_indices.add_input_queue_indices(idx);
}
for (auto idx : invoked_model_queue_info.second.fetch_queue_indices) {
proto_model_queue_indices.add_output_queue_indices(idx);
}
(*proto_invoked_model_queues)[invoked_model_queue_info.first] = std::move(proto_model_queue_indices);
}
}
submodel_descs[model_instance.first].emplace_back(std::move(submodel_desc));
++replica_idx;
}
}
}
return SUCCESS;
}
void FlowModelSender::BuildDeployPlanOptions(const DeployState &deploy_state,
deployer::UpdateDeployPlanRequest &request) {
auto options = request.mutable_options();
for (const auto &item : GetThreadLocalContext().GetAllGlobalOptions()) {
const auto &it = kTransferOptionsWhiteList.find(item.first);
if (it == kTransferOptionsWhiteList.cend()) {
continue;
}
options->mutable_global_options()->insert({item.first, item.second});
GELOGI("Add global option to proto, key = %s, value = %s.", item.first.c_str(), item.second.c_str());
}
bool is_data_flow_graph = false;
(void)AttrUtils::GetBool(deploy_state.GetFlowModel()->GetRootGraph(),
ATTR_NAME_IS_DATA_FLOW_GRAPH,
is_data_flow_graph);
if (is_data_flow_graph) {
options->mutable_global_options()->insert({OPTION_FLOAT_OVERFLOW_MODE, OPTION_FLOAT_OVERFLOW_MODE_SATURATION});
GELOGI("Add float overflow mode option to proto, value = %s.", OPTION_FLOAT_OVERFLOW_MODE_SATURATION);
}
for (const auto &item : GetThreadLocalContext().GetAllSessionOptions()) {
const auto &it = kTransferOptionsWhiteList.find(item.first);
if (it == kTransferOptionsWhiteList.cend()) {
continue;
}
options->mutable_session_options()->insert({item.first, item.second});
GELOGI("Add session option to proto, key = %s, value = %s.", item.first.c_str(), item.second.c_str());
}
for (const auto &item : GetThreadLocalContext().GetAllGraphOptions()) {
const auto &it = kTransferOptionsWhiteList.find(item.first);
if (it == kTransferOptionsWhiteList.cend()) {
continue;
}
options->mutable_graph_options()->insert({item.first, item.second});
GELOGI("Add graph option to proto, key = %s, value = %s.", item.first.c_str(), item.second.c_str());
}
}
Status FlowModelSender::BuildUpdateDeployPlanRequest(
const DeployState &deploy_state,
const DeployPlan::DeviceInfo &target_device,
std::vector<deployer::SubmodelDesc> &submodel_descs,
deployer::DeployerRequest &request) {
request.set_type(deployer::kUpdateDeployPlan);
auto update_deploy_plan_request = request.mutable_update_deploy_plan_request();
update_deploy_plan_request->set_graph_id(deploy_state.GetGraphId());
update_deploy_plan_request->set_device_id(target_device.GetDeviceId());
update_deploy_plan_request->set_device_type(target_device.GetType());
update_deploy_plan_request->set_session_id(deploy_state.GetSessionId());
update_deploy_plan_request->set_root_model_id(deploy_state.GetRootModelId());
auto device_info = ResourceManager::GetInstance().GetDeviceInfo(target_device.GetNodeId(),
target_device.GetDeviceId(), target_device.GetType());
if (device_info != nullptr) {
auto mem_config = deploy_state.GetFlowModel()->GetLogicDeviceToMemCfg();
auto model_mem_size = update_deploy_plan_request->mutable_model_mem_size();
model_mem_size->set_std_mem_size(mem_config[device_info->ToIndex()].first);
model_mem_size->set_shared_mem_size(mem_config[device_info->ToIndex()].second);
}
for (auto &submodel_desc : submodel_descs) {
*update_deploy_plan_request->add_submodel_descs() = std::move(submodel_desc);
}
BuildDeployPlanOptions(deploy_state, *update_deploy_plan_request);
return SUCCESS;
}
bool FlowModelSender::CacheLocalModel(const DeployPlan::SubmodelInfo &submodel) {
int32_t local_node_id = ResourceManager::GetInstance().GetLocalNodeId();
if (submodel.device_info.GetNodeId() == local_node_id) {
GE_CHECK_NOTNULL(submodel.model);
if (!submodel.model->GetSavedModelPath().empty()) {
return true;
}
}
return false;
}
Status FlowModelSender::TransferSubmodels(DeployState &deploy_state) {
const auto &submodels = deploy_state.GetDeployPlan().GetSubmodels();
std::map<PneModelPtr, std::set<int32_t>> model_to_devices;
for (const auto &it : submodels) {
const auto &submodel = it.second;
if (submodel.model == nullptr) {
continue;
}
if (CacheLocalModel(submodel)) {
continue;
}
model_to_devices[submodel.model].emplace(submodel.device_info.GetNodeId());
}
GELOGD("[Transfer][Submodels] start, root_model_id = %u, submodel number = %zu",
deploy_state.GetRootModelId(),
model_to_devices.size());
std::vector<std::future<std::tuple<Status, PneModelPtr, ModelBufferData>>> model_buffer_futs;
ThreadPool serialize_thread_pool("ge_dpl_serm", model_to_devices.size() > kMaxSerializePoolSize ?
kMaxSerializePoolSize :
model_to_devices.size(), false);
for (const auto &it : model_to_devices) {
const auto &pne_model = it.first;
GE_CHECK_NOTNULL(pne_model);
std::future<std::tuple<Status, PneModelPtr, ModelBufferData>> fut =
serialize_thread_pool.commit([&pne_model]() -> std::tuple<Status, PneModelPtr, ModelBufferData> {
ModelBufferData model_buff;
const auto ret = SerializeModel(pne_model, model_buff);
return std::make_tuple(ret, pne_model, model_buff);
});
model_buffer_futs.emplace_back(std::move(fut));
}
size_t tranfer_times = 0U;
for (const auto &it : model_to_devices) {
tranfer_times += it.second.size();
}
ThreadPool thread_pool("ge_dpl_trfm", tranfer_times > kMaxTransferPoolSize ? kMaxTransferPoolSize : tranfer_times,
false);
std::vector<std::future<Status>> deploy_futures;
for (auto &buffer_fut : model_buffer_futs) {
auto ret = buffer_fut.get();
const auto &pne_model = std::get<1>(ret);
const auto &model_buff = std::get<2>(ret);
GE_CHK_STATUS_RET(std::get<0>(ret), "Failed to serialize model, model_name = %s",
pne_model->GetModelName().c_str());
auto submodel_id = deploy_state.GetSubmodelId(pne_model->GetModelName());
const auto &target_node_ids = model_to_devices[pne_model];
for (auto target_node_id : target_node_ids) {
std::future<Status> fut = thread_pool.commit([target_node_id, &deploy_state, pne_model,
submodel_id, model_buff]() -> Status {
GEEVENT("[Transfer][Submodel] in deploying start, root_model_id = %u, submodel_id = %u, target_node_id = %d",
deploy_state.GetRootModelId(), submodel_id, target_node_id);
const auto trans_ret = TransferModel(target_node_id, deploy_state, pne_model, model_buff);
GE_CHK_BOOL_RET_STATUS(trans_ret == SUCCESS, trans_ret,
"[Transfer][Submodel] failed, root_model_id = %u, submodel_id = %u, target_node_id = %d",
deploy_state.GetRootModelId(), submodel_id, target_node_id);
GEEVENT("[Transfer][Submodel] in deploying success, root_model_id = %u, submodel_id = %u, target_node_id = %d",
deploy_state.GetRootModelId(), submodel_id, target_node_id);
return SUCCESS;
});
deploy_futures.emplace_back(std::move(fut));
}
}
std::vector<Status> future_rets;
for (size_t i = 0U; i < deploy_futures.size(); ++i) {
future_rets.emplace_back(deploy_futures[i].get());
}
for (size_t i = 0U; i < future_rets.size(); ++i) {
GE_CHK_STATUS_RET(future_rets[i], "Failed to transfer models");
}
return SUCCESS;
}
Status FlowModelSender::TransferModel(int32_t node_id,
const DeployState &deploy_state,
const PneModelPtr &model,
const ModelBufferData model_buff) {
GELOGD("[Transfer][Submodel] start, model_name = [%s]", model->GetModelName().c_str());
std::string path = GetModelFilePath(deploy_state, model->GetModelName());
GE_CHK_STATUS_RET_NOLOG(TransferFile(node_id, path, model_buff.data.get(), model_buff.length));
GELOGD("[Transfer][Submodel] succeeded, model_name = [%s]", model->GetModelName().c_str());
return SUCCESS;
}
Status FlowModelSender::TransferFile(int32_t target_node_id,
const std::string &path,
const void *content,
size_t size) {
GELOGI("[Download] start, node_id = %d, path = %s, size = %lu",
target_node_id, path.c_str(), size);
size_t remaining_size = size;
size_t offset = 0;
const size_t block_size = 2 * 1024 * 1024;
deployer::DeployerRequest request;
request.set_type(deployer::kTransferFile);
auto download_request = request.mutable_transfer_file_request();
GE_CHECK_NOTNULL(download_request);
download_request->set_path(path);
download_request->set_eof(false);
auto *buffer = static_cast<const uint8_t *>(content);
while (remaining_size > 0) {
size_t size_to_send = std::min(block_size, remaining_size);
remaining_size -= size_to_send;
download_request->set_content(buffer + offset, size_to_send);
download_request->set_eof(remaining_size == 0);
deployer::DeployerResponse response;
GE_CHK_STATUS_RET(DeployerProxy::GetInstance().SendRequest(target_node_id, request, response),
"[TransferFile] failed to send request, node_id = %d, path = %s, offset = %zu",
target_node_id,
path.c_str(),
offset);
if (response.error_code() != SUCCESS) {
GELOGE(FAILED,
"[TransferFile] failed, node_id = %d, path = %s, error code = %u, error message = %s",
target_node_id, path.c_str(), response.error_code(), response.error_message().c_str());
return FAILED;
}
offset += size_to_send;
GELOGD("[TransferFile] succeeded, node_id = %d, path = %s, progress: %zu/%zu",
target_node_id, path.c_str(), offset, size);
}
GELOGI("[TransferFile] succeeded, node_id = %d, path = %s, total size = %zu",
target_node_id, path.c_str(), size);
return SUCCESS;
}
Status FlowModelSender::GetDeviceInfo(int32_t node_id, int32_t device_id, int32_t device_type,
DeployPlan::DeviceInfo &deploy_device_info) {
auto device_info = ResourceManager::GetInstance().GetDeviceInfo(node_id, device_id, device_type);
GE_CHECK_NOTNULL(device_info);
deploy_device_info = DeployPlan::DeviceInfo(device_info->GetDeviceType(),
device_info->GetNodeId(),
device_info->GetDeviceId());
return SUCCESS;
}
Status FlowModelSender::TransferPreDeploy(const SendInfo &send_info,
ExchangeRoute &local_route,
deployer::FlowRoutePlan &remote_plan) const {
int32_t local_node_id = ResourceManager::GetInstance().GetLocalNodeId();
const auto &node_config = Configurations::GetInstance().GetLocalNode();
DeployPlan::DeviceInfo local_device(CPU, local_node_id, 0);
if (send_info.node_id != local_node_id) {
for (const auto &device_config : node_config.device_list) {
if (device_config.device_type == NPU) {
local_device = DeployPlan::DeviceInfo(NPU, local_node_id, device_config.device_id);
break;
}
}
}
std::vector<DeployPlan::DeviceInfo> remote_devices;
for (const int32_t &device_id : send_info.device_ids) {
DeployPlan::DeviceInfo remote_device;
GE_CHK_STATUS_RET(GetDeviceInfo(send_info.node_id, device_id, NPU, remote_device),
"Failed to get device info, device id = %d.", send_info.node_id);
remote_devices.emplace_back(remote_device);
}
DeployPlan deploy_plan;
GE_CHK_STATUS_RET(HeterogeneousDeployPlanner().BuildTransferPlan(local_device, remote_devices, deploy_plan),
"Failed to build transfer deploy plan.");
GELOGD("Transfer deploy plan built successfully");
deployer::FlowRoutePlan local_plan;
GE_CHK_STATUS_RET(FlowRoutePlanner::ResolveFlowRoutePlan(deploy_plan, local_node_id, local_plan),
"Resolve local route plan failed.");
GELOGD("Local flow route plan resolved successfully");
if (send_info.node_id != local_node_id) {
GE_CHK_STATUS_RET(FlowRoutePlanner::ResolveFlowRoutePlan(deploy_plan, send_info.node_id, remote_plan),
"Failed to resolve remote plan.");
GELOGD("Remote flow route plan resolved successfully");
}
auto &exchange_service = HeterogeneousExchangeService::GetInstance();
HeterogeneousExchangeDeployer deployer(exchange_service, local_plan,
DeployContext::LocalContext().GetFlowGwClientManager());
GE_CHK_STATUS_RET_NOLOG(deployer.Deploy(local_route));
return SUCCESS;
}
std::string FlowModelSender::GetModelFilePath(const DeployState &deploy_state, const std::string &model_name) {
std::string file_name = ProcessUtils::NormalizePath(model_name);
replace(file_name.begin(), file_name.end(), '/', '_');
replace(file_name.begin(), file_name.end(), '\\', '_');
replace(file_name.begin(), file_name.end(), '.', '_');
std::string path = deploy_state.GetRelativeWorkingDir() +
file_name + "_" + std::to_string(deploy_state.GetSubmodelId(model_name)) + ".om";
GELOGI("Get model file path[%s] success, model name = %s", path.c_str(), model_name.c_str());
return path;
}
Status FlowModelSender::GetSavedFilePath(const DeployPlan::SubmodelInfo &submodel_info,
const std::string &model_file_path,
std::string &saved_model_path) {
if (submodel_info.model->GetIsBuiltinModel()) {
saved_model_path = kUdfBuildInFuncNamePrefix;
return SUCCESS;
}
if (submodel_info.device_info.GetNodeId() != ResourceManager::GetInstance().GetLocalNodeId()) {
saved_model_path = model_file_path;
return SUCCESS;
}
saved_model_path = submodel_info.model->GetSavedModelPath();
if ((saved_model_path.empty()) && (submodel_info.model->GetModelType() == PNE_ID_UDF)) {
GELOGE(FAILED, "Saved model file path must be not empty in current version."
"Please generate cache based on current compiler version.");
return FAILED;
}
return SUCCESS;
}
}