* Copyright (c) 2025 Huawei Technologies Co., Ltd.
* This program is free software, you can redistribute it and/or modify it under the terms and conditions of
* CANN Open Software License Agreement Version 2.0 (the "License").
* Please refer to the License for details. You may not use this file except in compliance with the License.
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED,
* INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE.
* See LICENSE in the root of the software repository for the full text of the License.
*/
#include "deploy/resource/heterogeneous_deploy_planner.h"
#include <regex>
#include "ge/ge_api_types.h"
#include "graph/ge_context.h"
#include "dflow/base/model/model_deploy_resource.h"
#include "deploy/resource/resource_manager.h"
#include "deploy/resource/device_info.h"
#include "deploy/deployer/deployer_proxy.h"
namespace ge {
namespace {
constexpr int64_t kDepDefQueDepth = 128L;
void RewriteQueueName(const std::string &model_name,
const std::map<std::string, std::string> &name_map,
std::string &queue_name) {
const auto &it = name_map.find(queue_name);
if (it == name_map.end()) {
queue_name.insert(0, model_name + ":");
} else {
queue_name = it->second;
}
}
void RewriteQueueNames(const std::string &model_name,
const std::map<std::string, std::string> &name_map,
std::vector<std::string> &queue_names) {
std::for_each(queue_names.begin(), queue_names.end(), [&model_name, &name_map](std::string &queue_name) {
RewriteQueueName(model_name, name_map, queue_name);
});
}
Status RewriteQueueNames(const std::string &model_name,
const ModelRelation::ModelEndpointInfo &model_queue_info,
ModelRelation &model_relation,
std::vector<Endpoint> &internal_queue_defs) {
if (model_relation.root_model_endpoint_info.input_endpoint_names.size() !=
model_queue_info.input_endpoint_names.size()) {
GELOGE(PARAM_INVALID, "model input number mismatches, parent input size = %zu, submodel input size = %zu",
model_queue_info.input_endpoint_names.size(),
model_relation.root_model_endpoint_info.input_endpoint_names.size());
return PARAM_INVALID;
}
std::map<std::string, std::string> queue_name_mapping;
for (size_t i = 0; i < model_relation.root_model_endpoint_info.input_endpoint_names.size(); ++i) {
queue_name_mapping.emplace(model_relation.root_model_endpoint_info.input_endpoint_names[i],
model_queue_info.input_endpoint_names[i]);
model_relation.root_model_endpoint_info.input_endpoint_names[i] = model_queue_info.input_endpoint_names[i];
}
for (size_t i = 0; i < model_relation.root_model_endpoint_info.output_endpoint_names.size(); ++i) {
queue_name_mapping.emplace(model_relation.root_model_endpoint_info.output_endpoint_names[i],
model_queue_info.output_endpoint_names[i]);
model_relation.root_model_endpoint_info.output_endpoint_names[i] = model_queue_info.output_endpoint_names[i];
}
std::for_each(model_relation.endpoints.begin(),
model_relation.endpoints.end(),
[&model_name, &queue_name_mapping, &internal_queue_defs](Endpoint &queue_def) {
RewriteQueueName(model_name, queue_name_mapping, queue_def.MutableName());
if (queue_name_mapping.find(queue_def.GetName()) == queue_name_mapping.end()) {
internal_queue_defs.emplace_back(queue_def);
}
});
for (auto &submodel_it : model_relation.submodel_endpoint_infos) {
auto &queue_info = submodel_it.second;
RewriteQueueNames(model_name, queue_name_mapping, queue_info.input_endpoint_names);
RewriteQueueNames(model_name, queue_name_mapping, queue_info.output_endpoint_names);
}
return SUCCESS;
}
}
HeterogeneousDeployPlanner::HeterogeneousDeployPlanner(const FlowModelPtr &flow_model,
std::vector<DeployPlan::DeviceInfo> device_list) noexcept
: HeterogeneousDeployPlanner({flow_model}, nullptr, std::move(device_list)) {}
HeterogeneousDeployPlanner::HeterogeneousDeployPlanner(std::vector<FlowModelPtr> models,
const ModelRelation *root_model_relation,
std::vector<DeployPlan::DeviceInfo> device_list) noexcept
: DeployPlannerBase(),
models_(std::move(models)),
root_model_relation_(root_model_relation),
device_list_(std::move(device_list)) {}
Status HeterogeneousDeployPlanner::PrepareModelsAndRelation(ModelRelation &model_relation) {
GE_CHK_BOOL_RET_STATUS(!models_.empty(), PARAM_INVALID, "models is empty");
std::map<std::string, PneModelPtr> name_to_models;
if (models_.size() == 1U) {
GELOGD("start to build deploy plan for single model");
GE_CHK_STATUS_RET(PrepareForSingleFlowModel(name_to_models, model_relation),
"Failed to init for single model");
} else {
GELOGD("start to build deploy plan for multiply models");
GE_CHK_STATUS_RET(MergeModels(name_to_models, model_relation), "Failed to merge models by relation");
GE_CHK_STATUS_RET(ValidateModelAndRelation(name_to_models, model_relation),
"Failed to validate model and relation after merging submodels");
}
GE_CHK_STATUS_RET_NOLOG(PrepareTargetDevices(name_to_models, model_relation));
GE_CHK_STATUS_RET_NOLOG(ValidateModelAndRelation(name_to_models, model_relation));
return SUCCESS;
}
Status HeterogeneousDeployPlanner::GetDeployEngineType(const PneModel &pne_model, std::string &deploy_type) {
const auto &engine_type = pne_model.GetModelType().empty() ? PNE_ID_NPU : pne_model.GetModelType();
if (engine_type != PNE_ID_UDF) {
deploy_type = engine_type;
return SUCCESS;
}
deploy_type = PNE_ID_NPU;
auto deploy_resource = pne_model.GetDeployResource().get();
if ((deploy_resource == nullptr) || (deploy_resource->resource_type.empty())) {
GELOGI("Model deploy resource type is empty, deploy on NPU default.");
return SUCCESS;
}
const auto &devices = ResourceManager::GetInstance().GetDeviceInfoList();
std::map<std::string, std::set<std::string>> resource_to_deploy_type;
for (const auto &device_info : devices) {
GELOGI("Get device info = %s.", device_info.DebugString().c_str());
auto tmp_deploy_type = (device_info.GetDeviceType() == NPU) ? PNE_ID_NPU : PNE_ID_CPU;
resource_to_deploy_type[device_info.GetResourceType()].emplace(tmp_deploy_type);
}
auto it = resource_to_deploy_type.find(deploy_resource->resource_type);
if (it != resource_to_deploy_type.end() && it->second.size() == 1U) {
deploy_type = *(it->second.begin());
return SUCCESS;
}
GELOGE(PARAM_INVALID, "Failed to match device according to resource type[%s].",
deploy_resource->resource_type.c_str());
return PARAM_INVALID;
}
Status HeterogeneousDeployPlanner::SelectTargetDevice(
const std::map<std::string, PneModelPtr> &name_to_models,
std::map<std::string, std::vector<std::pair<DeployPlan::DeviceInfo,
bool>>> &target_devices) {
std::map<std::string, std::vector<DeployPlan::DeviceInfo>> engine_to_devices;
for (auto &device_info : device_list_) {
const std::string &engine_type = (device_info.GetType() == CPU) ? PNE_ID_CPU : PNE_ID_NPU;
engine_to_devices[engine_type].emplace_back(device_info);
}
GE_CHK_STATUS_RET_NOLOG(ReindexDevices());
for (const auto &it : name_to_models) {
const auto &model_name = it.first;
std::string engine_type;
GE_CHK_STATUS_RET_NOLOG(GetDeployEngineType(*it.second, engine_type));
auto &available_devices = engine_to_devices[engine_type];
GE_CHK_BOOL_RET_STATUS(!available_devices.empty(), FAILED,
"No available device for engine: %s", it.second->GetModelType().c_str());
GE_CHK_STATUS_RET(AssignDevices(model_name, it.second, engine_type,
available_devices, target_devices),
"Failed to assign device for %s", model_name.c_str());
}
return SUCCESS;
}
Status HeterogeneousDeployPlanner::ReindexDevices() {
for (const auto &device_info : device_list_) {
auto device = ResourceManager::GetInstance().GetDeviceInfo(device_info.GetNodeId(),
device_info.GetDeviceId(),
device_info.GetType());
GE_CHECK_NOTNULL(device);
auto index_key = device->ToIndex();
index_to_devices_[index_key] = device_info;
GELOGD("Add device = %s, index key = %s", device_info.GetDesc().c_str(), index_key.c_str());
}
return SUCCESS;
}
Status HeterogeneousDeployPlanner::GetLogicalDeviceId(const PneModel &pne_model,
const std::string &engine_type,
std::vector<std::string> &logical_device_ids) {
auto logical_device_id_str = pne_model.GetLogicDeviceId();
if ((logical_device_id_str.empty()) && (engine_type == PNE_ID_NPU)) {
GE_CHK_STATUS_RET_NOLOG(GetLogicalDeviceIdFromOption(logical_device_id_str));
GE_CHK_STATUS_RET_NOLOG(ParseLogicalDeviceIds(logical_device_id_str, logical_device_ids));
NormalizeLogicalDeviceId(logical_device_ids);
return SUCCESS;
}
GE_CHK_STATUS_RET_NOLOG(ParseLogicalDeviceIds(logical_device_id_str, logical_device_ids));
if (logical_device_ids.empty() && (pne_model.GetModelType() == PNE_ID_UDF)) {
const auto deploy_resource = pne_model.GetDeployResource();
if ((deploy_resource != nullptr)) {
GE_CHK_BOOL_RET_STATUS(!(deploy_resource->is_heavy_load), FAILED,
"heavy load must assign logic device id, model name=%s", pne_model.GetModelName().c_str());
constexpr const char *kResourceTypeAscend = "Ascend";
GE_CHK_BOOL_RET_STATUS(deploy_resource->resource_type == kResourceTypeAscend, FAILED,
"udf[%s] resource_type=%s must be heavy load and assign logic device id",
pne_model.GetModelName().c_str(), deploy_resource->resource_type.c_str());
}
}
return SUCCESS;
}
Status HeterogeneousDeployPlanner::GetRedundantLogicalDeviceId(const PneModel &pne_model,
std::vector<std::string> &logical_device_ids) {
auto logical_device_id_str = pne_model.GetRedundantLogicDeviceId();
GE_CHK_STATUS_RET_NOLOG(ParseLogicalDeviceIds(logical_device_id_str, logical_device_ids));
return SUCCESS;
}
void HeterogeneousDeployPlanner::NormalizeLogicalDeviceId(std::vector<std::string> &logical_device_ids) {
for (auto &device_id : logical_device_ids) {
constexpr size_t kIncompletedSize = 2U;
if (StringUtils::Split(device_id, ':').size() == kIncompletedSize) {
device_id = "0:0:" + device_id;
}
}
}
Status HeterogeneousDeployPlanner::GetLogicalDeviceIdFromOption(std::string &logical_device_id) {
(void) GetContext().GetOption(OPTION_EXEC_LOGICAL_DEVICE_ID, logical_device_id);
if (logical_device_id.empty()) {
return SUCCESS;
}
GELOGI("Got %s from option = %s", OPTION_EXEC_LOGICAL_DEVICE_ID, logical_device_id.c_str());
return SUCCESS;
}
Status HeterogeneousDeployPlanner::ParseLogicalDeviceIds(const std::string &logical_device_id_str,
std::vector<std::string> &logical_device_ids) {
auto normalized = StringUtils::ReplaceAll(logical_device_id_str, " ", "");
if (normalized.empty()) {
return SUCCESS;
}
std::regex remove_brackets(R"(\[?([0-9:,-]+)\]?)");
std::smatch match_result;
GE_CHK_BOOL_RET_STATUS(std::regex_match(normalized, match_result, remove_brackets),
PARAM_INVALID, "Invalid logical_device_id: %s", logical_device_id_str.c_str());
logical_device_ids = StringUtils::Split(match_result.str(1), ',');
return SUCCESS;
}
Status HeterogeneousDeployPlanner::CheckAssignedDevice(const std::string &model_name,
const std::string &engine_type,
const std::string &device_index,
const DeployPlan::DeviceInfo &device_info) {
const static std::map<std::string, int32_t> kTypeEngineToDevice = {{PNE_ID_NPU, static_cast<int32_t>(NPU)},
{PNE_ID_CPU, static_cast<int32_t>(CPU)}};
const auto &it = kTypeEngineToDevice.find(engine_type);
if (it != kTypeEngineToDevice.cend()) {
GE_CHK_BOOL_RET_STATUS(it->second == device_info.GetType(), FAILED,
"Failed to assign model[%s] to device[%s], "
"model should be assigned to device of type[%s], device desc=[%s].",
model_name.c_str(), device_index.c_str(), engine_type.c_str(),
device_info.GetDesc().c_str());
}
return SUCCESS;
}
Status HeterogeneousDeployPlanner::AssignDevices(const std::string &model_name,
const PneModelPtr &pne_model,
const std::string &engine_type,
const std::vector<DeployPlan::DeviceInfo> &device_list,
std::map<std::string,
std::vector<std::pair<DeployPlan::DeviceInfo,
bool>>> &target_devices) {
std::vector<std::string> logical_device_ids;
std::vector<std::string> redundant_logical_device_ids;
GE_CHK_STATUS_RET_NOLOG(GetLogicalDeviceId(*pne_model, engine_type, logical_device_ids));
GE_CHK_STATUS_RET_NOLOG(GetRedundantLogicalDeviceId(*pne_model, redundant_logical_device_ids));
if (logical_device_ids.empty()) {
std::vector<std::pair<DeployPlan::DeviceInfo, bool>> device_list_new;
for (const auto &device : device_list) {
device_list_new.emplace_back(std::make_pair(device, false));
}
target_devices.emplace(model_name, std::move(device_list_new));
GELOGI("No device is assigned to model[%s]", model_name.c_str());
return SUCCESS;
}
GE_CHK_BOOL_RET_STATUS(redundant_logical_device_ids.size() < logical_device_ids.size(),
PARAM_INVALID, "redundant logical device ids size[%u] is less than "
"logical device ids size[%u].",
static_cast<uint32_t>(redundant_logical_device_ids.size()),
static_cast<uint32_t>(logical_device_ids.size()));
bool is_heavy_load = false;
if (pne_model->GetModelType() == PNE_ID_UDF) {
const auto deploy_resource = pne_model->GetDeployResource();
if (deploy_resource != nullptr) {
is_heavy_load = deploy_resource->is_heavy_load;
}
}
const size_t main_instance_max_index = logical_device_ids.size() - redundant_logical_device_ids.size();
size_t logical_device_id_index = 0U;
for (std::string &logical_device_id : logical_device_ids) {
const bool is_redundant = logical_device_id_index >= main_instance_max_index;
GetValidLogicDeviceId(logical_device_id);
auto it = index_to_devices_.find(logical_device_id);
if (it == index_to_devices_.cend()) {
GELOGE(PARAM_INVALID, "Failed to assign physical device for %s", logical_device_id.c_str());
return PARAM_INVALID;
}
if (is_heavy_load) {
const auto &proxy_device_info = it->second;
if (proxy_device_info.GetType() != NPU) {
GELOGE(PARAM_INVALID, "heavy load model[%s] assign physical device[%s] is not NPU, device desc = %s",
model_name.c_str(), logical_device_id.c_str(), proxy_device_info.GetDesc().c_str());
return PARAM_INVALID;
}
DeployPlan::DeviceInfo deploy_info = {CPU, proxy_device_info.GetNodeId(), 0, proxy_device_info.GetDeviceId()};
GELOGI("The heavy load model[%s] assigned to logic device[%s] will be deploy on device desc = %s.",
model_name.c_str(), logical_device_id.c_str(), deploy_info.GetDesc().c_str());
GE_CHK_STATUS_RET(CheckAssignedDevice(model_name, engine_type, logical_device_id, deploy_info),
"Failed to assign device.");
target_devices[model_name].emplace_back(std::make_pair(deploy_info, is_redundant));
GELOGI("The device[%s] is assigned to heavy load model[%s], device desc = %s, redundant flag = %d.",
logical_device_id.c_str(), model_name.c_str(), deploy_info.GetDesc().c_str(),
static_cast<int32_t>(is_redundant));
} else {
GE_CHK_STATUS_RET(CheckAssignedDevice(model_name, engine_type, logical_device_id, it->second),
"Failed to assign device.");
target_devices[model_name].emplace_back(std::make_pair(it->second, is_redundant));
GELOGI("The device[%s] is assigned to model[%s], device desc = %s, redundant flag = %d.",
logical_device_id.c_str(), model_name.c_str(), it->second.GetDesc().c_str(),
static_cast<int32_t>(is_redundant));
}
logical_device_id_index++;
}
return SUCCESS;
}
void HeterogeneousDeployPlanner::GetValidLogicDeviceId(std::string &device_id) {
constexpr uint8_t invalid_device_id_size = 3;
if (StringUtils::Split(device_id, ':').size() == invalid_device_id_size) {
device_id.append(":0");
}
}
Status HeterogeneousDeployPlanner::PrepareForSingleFlowModel(std::map<std::string, PneModelPtr> &name_to_models,
ModelRelation &model_relation) {
const auto &flow_model = models_.front();
GE_CHECK_NOTNULL(flow_model);
if (flow_model->GetSubmodels().size() == 1U) {
GELOGD("Prepare for single flow model with single submodel");
const auto &root_model = flow_model->GetSubmodels().begin()->second;
const auto &root_graph = root_model->GetRootGraph();
GE_CHECK_NOTNULL(root_graph);
if (flow_model->GetModelRelation() == nullptr) {
flow_model->SetModelRelation(MakeShared<ModelRelation>());
GE_CHECK_NOTNULL(flow_model->GetModelRelation());
GE_CHK_STATUS_RET(ModelRelationBuilder().BuildForSingleModel(*root_graph, *flow_model->GetModelRelation()),
"Failed to build model relation");
} else if (flow_model->GetModelRelation()->IsEmpty()) {
GE_CHK_STATUS_RET(ModelRelationBuilder().BuildForSingleModel(*root_graph, *flow_model->GetModelRelation()),
"Failed to build model relation");
} else {
GELOGD("Model relation is not build for single model.");
}
}
ModelRelationFlattener flattener(flow_model);
GE_CHK_STATUS_RET_NOLOG(flattener.Flatten(model_relation, name_to_models));
GE_CHK_STATUS_RET(ValidateModelAndRelation(name_to_models, model_relation),
"Failed to validate model and relation");
return SUCCESS;
}
Status HeterogeneousDeployPlanner::UnfoldSubModel(const ModelRelation::ModelEndpointInfo &model_endpoint_info,
const FlowModel &model,
std::map<std::string, PneModelPtr> &name_to_models) {
GE_CHECK_NOTNULL(model.GetModelRelation());
GE_CHK_STATUS_RET(ValidateModelAndRelation(model.GetSubmodels(), *model.GetModelRelation()),
"Failed to validate model and relation for submodel: %s", model.GetModelName().c_str());
ModelRelation model_relation = *model.GetModelRelation();
GE_CHK_STATUS_RET(RewriteQueueNames(model.GetModelName(), model_endpoint_info,
model_relation,
merged_model_relation_.endpoints),
"Failed to rewrite queue names, model name = %s",
model.GetModelName().c_str());
ModelRelationReader reader(model_relation);
GE_CHK_STATUS_RET_NOLOG(reader.Initialize());
for (const auto &submodel_it : model.GetSubmodels()) {
const auto &inner_model_name = submodel_it.first;
auto queue_info = reader.GetSubmodelQueueInfo(inner_model_name);
GE_CHECK_NOTNULL(queue_info);
std::string merged_model_name = model.GetModelName() + ":" + inner_model_name;
name_to_models.emplace(merged_model_name, submodel_it.second);
merged_model_relation_.submodel_endpoint_infos[merged_model_name] = *queue_info;
}
return SUCCESS;
}
Status HeterogeneousDeployPlanner::MergeModels(std::map<std::string, PneModelPtr> &name_to_models,
ModelRelation &model_relation) {
GE_CHECK_NOTNULL(root_model_relation_);
merged_model_relation_.root_model_endpoint_info.input_endpoint_names =
root_model_relation_->root_model_endpoint_info.input_endpoint_names;
merged_model_relation_.root_model_endpoint_info.output_endpoint_names =
root_model_relation_->root_model_endpoint_info.output_endpoint_names;
merged_model_relation_.endpoints.insert(merged_model_relation_.endpoints.end(),
root_model_relation_->endpoints.begin(),
root_model_relation_->endpoints.end());
std::map<std::string, FlowModelPtr> submodels;
for (const auto &model : models_) {
submodels.emplace(model->GetModelName(), model);
}
for (const auto &it : root_model_relation_->submodel_endpoint_infos) {
const auto &model_name = it.first;
const auto &model_endpoint_info = it.second;
auto &submodel = submodels[model_name];
if (submodel == nullptr) {
GELOGE(PARAM_INVALID, "Failed to get model, name = %s", model_name.c_str());
return PARAM_INVALID;
}
if (submodel->GetSubmodels().empty()) {
GELOGD("Submodel [%s] contains no child model", model_name.c_str());
name_to_models.emplace(model_name, submodel);
merged_model_relation_.submodel_endpoint_infos[model_name] = model_endpoint_info;
} else {
GELOGD("Submodel [%s] contains child model, start to unfold them to root relation", model_name.c_str());
GE_CHK_STATUS_RET(UnfoldSubModel(model_endpoint_info, *submodel, name_to_models), "Failed to unfold submodels");
}
}
merged_model_relation_.invoked_model_queue_infos = std::move(model_relation.invoked_model_queue_infos);
model_relation = std::move(merged_model_relation_);
return SUCCESS;
}
bool HeterogeneousDeployPlanner::HasDeployedModelOnDevice(const DeployPlan::DeviceInfo &device_info) const {
if (((device_info.GetType() == static_cast<int32_t>(CPU)) && (device_info.GetNodeId() != 0)) ||
(device_info.GetType() == static_cast<int32_t>(NPU))) {
auto iter = device_deployed_model_num_.find(device_info);
if ((iter == device_deployed_model_num_.end()) ||
((iter != device_deployed_model_num_.end()) && (iter->second == 0U))) {
GELOGI("DynamicSched no deployed model on device [%s], don't add dynamic sched info.",
device_info.GetKey().c_str());
return false;
}
}
GELOGI("DynamicSched have deployed model on device [%s].", device_info.GetKey().c_str());
return true;
}
void HeterogeneousDeployPlanner::AddDynamicSchedModel(std::map<std::string, PneModelPtr> &model_instance_to_models,
ModelRelation &resolved_model_relation,
std::map<std::string,
ModelRelation::ModelEndpointInfo> &device_endpoint_info,
std::vector<DeployPlan::DeviceInfo> &flowgw_device_infos) {
uint32_t queue_id = 0U;
for (const auto &device_info : flowgw_device_infos) {
auto device = ResourceManager::GetInstance().GetDeviceInfo(device_info.GetNodeId(),
device_info.GetDeviceId(),
device_info.GetType());
GE_RT_VOID_CHECK_NOTNULL(device);
if (!device->SupportFlowgw() && (device_info.GetType() == static_cast<int32_t>(CPU))) {
GELOGI("DynamicSched no host flowgw don't create sched model.");
continue;
}
if (HasDeployedModelOnDevice(device_info)) {
const auto model_instance_name = "Sched@" + device_info.GetKey() + "@" + std::to_string(queue_id++);
model_instance_to_models.emplace(model_instance_name, nullptr);
auto &submodel_info = MutableSubmodelInfo(model_instance_name);
submodel_info.model = nullptr;
submodel_info.device_info = device_info;
(void)AssignSubmodelsQueueDeviceInfo(model_instance_name, submodel_info);
auto &endpoint_info = device_endpoint_info[device_info.GetKey()];
endpoint_info.model_name = model_instance_name;
resolved_model_relation.submodel_endpoint_infos.emplace(model_instance_name, endpoint_info);
GELOGI("DynamicSched create a sched model, model instance name=%s.", model_instance_name.c_str());
}
}
}
void HeterogeneousDeployPlanner::AddDynamicSchedRelation(std::map<std::string, PneModelPtr> &model_instance_to_models,
ModelRelation &resolved_model_relation) {
if (!GetIsDynamicSched()) {
GEEVENT("DynamicSched flag close, don't add sched model.");
return;
}
uint32_t sched_queue_id = 0U;
std::map<std::string, ModelRelation::ModelEndpointInfo> device_endpoints_info;
for (const auto &device_info : device_list_) {
auto device = ResourceManager::GetInstance().GetDeviceInfo(device_info.GetNodeId(),
device_info.GetDeviceId(),
device_info.GetType());
GE_RT_VOID_CHECK_NOTNULL(device);
if (!device->SupportFlowgw() && (device_info.GetType() == static_cast<int32_t>(CPU))) {
GELOGI("DynamicSched no host flowgw don't create host request queue.");
continue;
}
if (HasDeployedModelOnDevice(device_info)) {
const auto queue_name = "queue_response@" + device_info.GetKey() + "@" + std::to_string(sched_queue_id);
const auto request_queue_name = "queue_request@" + device_info.GetKey() + "@" + std::to_string(sched_queue_id);
sched_queue_id++;
device_endpoints_info[device_info.GetKey()].sched_output_queue_names.emplace_back(request_queue_name);
device_endpoints_info[device_info.GetKey()].sched_input_queue_names.emplace_back(queue_name);
resolved_model_relation.root_model_endpoint_info.sched_input_queue_names.emplace_back(queue_name);
resolved_model_relation.root_model_endpoint_info.sched_output_queue_names.emplace_back(request_queue_name);
GELOGI("DynamicSched model datagw request queue_name=%s, reponse queue_name=%s.",
request_queue_name.c_str(), queue_name.c_str());
Endpoint queue_def(queue_name, EndpointType::kQueue);
QueueNodeUtils(queue_def).SetDepth(kDepDefQueDepth).SetEnqueuePolicy("FIFO").
SetNodeAction(kQueueActionSched);
resolved_model_relation.endpoints.emplace_back(queue_def);
Endpoint request_queue_def(request_queue_name, EndpointType::kQueue);
QueueNodeUtils(request_queue_def).SetDepth(kDepDefQueDepth).SetEnqueuePolicy("FIFO").
SetNodeAction(kQueueActionSched);
resolved_model_relation.endpoints.emplace_back(request_queue_def);
}
}
AddDynamicSchedModel(model_instance_to_models, resolved_model_relation, device_endpoints_info, device_list_);
}
Status HeterogeneousDeployPlanner::AssignSubmodelsQueueDeviceInfo(const std::string &model_name,
DeployPlan::SubmodelInfo &submodel_info) const {
auto queue_device_info = submodel_info.device_info;
if (queue_device_info.GetType() == static_cast<int32_t>(CPU)) {
auto device = ResourceManager::GetInstance().GetDeviceInfo(queue_device_info.GetNodeId(),
queue_device_info.GetDeviceId(),
queue_device_info.GetType());
GE_CHECK_NOTNULL(device);
queue_device_info.SetSupportFlowgw(device->SupportFlowgw());
if (queue_device_info.GetProxyDeviceId() == -1) {
auto head_npu_device = ResourceManager::GetInstance().GetHeadNpuDeviceInfo(queue_device_info.GetNodeId());
if (head_npu_device != nullptr) {
queue_device_info.SetProxyDeviceId(head_npu_device->GetDeviceId());
}
}
}
submodel_info.queue_device_info = queue_device_info;
GELOGI("Model[%s] queues will be deployed on device[%s]",
model_name.c_str(), submodel_info.queue_device_info.GetDesc().c_str());
return SUCCESS;
}
void HeterogeneousDeployPlanner::RecordDeviceDeployedModelNum(const DeployPlan::DeviceInfo &device_info) {
device_deployed_model_num_[device_info]++;
if (device_info.WithProxy()) {
device_deployed_model_num_[device_info.ProxyDevice()]++;
}
}
Status HeterogeneousDeployPlanner::PrepareTargetDevices(std::map<std::string, PneModelPtr> &name_to_models,
ModelRelation &model_relation) {
std::map<std::string, std::vector<std::pair<DeployPlan::DeviceInfo, bool>>> target_devices;
GE_CHK_STATUS_RET_NOLOG(SelectTargetDevice(name_to_models, target_devices));
ModelRelation resolved_model_relation;
std::map<std::string, DeployPlan::DeviceInfo> model_instance_locations;
std::map<std::string, std::pair<PneModelPtr, bool>> model_instance_to_models;
resolved_model_relation.endpoints = model_relation.endpoints;
resolved_model_relation.root_model_endpoint_info = model_relation.root_model_endpoint_info;
resolved_model_relation.invoked_model_queue_infos = model_relation.invoked_model_queue_infos;
for (const auto &it : target_devices) {
const auto &model_name = it.first;
int32_t process_id = 0;
for (const auto &target_device : it.second) {
const bool is_redundant = target_device.second;
const auto model_instance_name = model_name + "@" + std::to_string(process_id++) + "@" + target_device.first.GetKey() + "@" +
std::to_string(is_redundant);
(deploy_plan_.GetModelDeployInfos())[model_name][model_instance_name] = {};
model_instance_to_models.emplace(model_instance_name, std::make_pair(name_to_models[model_name], is_redundant));
model_instance_locations.emplace(model_instance_name, target_device.first);
resolved_model_relation.submodel_endpoint_infos.emplace(model_instance_name,
model_relation.submodel_endpoint_infos.at(model_name));
auto &model_queue_info = resolved_model_relation.submodel_endpoint_infos.at(model_instance_name);
model_queue_info.model_name = model_name;
RecordDeviceDeployedModelNum(target_device.first);
GELOGI("Model[%s] emplace model instance[%s].", model_name.c_str(), model_instance_name.c_str());
}
}
name_to_models.clear();
for (auto &model_instance : model_instance_to_models) {
name_to_models.emplace(model_instance.first, model_instance.second.first);
}
DeployPlan::DeviceInfo model_io_device_info{};
SelectHeadAndTailDevice(model_io_device_info);
RecordDeviceDeployedModelNum(model_io_device_info);
AddDynamicSchedRelation(name_to_models, resolved_model_relation);
model_relation = std::move(resolved_model_relation);
for (const auto &it : model_instance_to_models) {
const auto &model_instance_name = it.first;
if (it.second.first == nullptr) {
continue;
}
auto &submodel_info = MutableSubmodelInfo(model_instance_name);
submodel_info.model = it.second.first;
submodel_info.device_info = model_instance_locations[model_instance_name];
submodel_info.is_redundant = it.second.second;
GE_CHK_STATUS_RET(AssignSubmodelsQueueDeviceInfo(model_instance_name, submodel_info),
"Failed to assign model queue device info.");
GELOGD("Model [%s] will be deployed on device [%s]", model_instance_name.c_str(),
submodel_info.device_info.GetDesc().c_str());
}
return SUCCESS;
}
Status HeterogeneousDeployPlanner::BuildTransferPlan(const DeployPlan::DeviceInfo &local_device,
const std::vector<DeployPlan::DeviceInfo> &target_devices,
DeployPlan &deploy_plan) {
GE_CHK_STATUS_RET_NOLOG(CreateTransferInfo(local_device, target_devices));
deploy_plan = std::move(deploy_plan_);
plan_id_gen_++;
return SUCCESS;
}
Status HeterogeneousDeployPlanner::CreateTransferInfo(const DeployPlan::DeviceInfo &src_device_info,
const std::vector<DeployPlan::DeviceInfo> &dst_device_infos) {
if (dst_device_infos.empty()) {
return SUCCESS;
}
const DeployPlan::DeviceInfo &dst_device_info = dst_device_infos[0];
const std::string route_name = src_device_info.GetKey() + "_TO_" + dst_device_info.GetKey()
+ "_T" + std::to_string(plan_id_gen_);
DeployPlan::QueueInfo src_queue_info{};
src_queue_info.name = route_name;
src_queue_info.device_info = src_device_info;
src_queue_info.depth = 128U;
DeployPlan::QueueInfo dst_queue_info{};
dst_queue_info.name = route_name;
dst_queue_info.device_info = dst_device_info;
dst_queue_info.depth = 128U;
int32_t src_queue_index = -1;
if (src_device_info.GetNodeId() != dst_device_info.GetNodeId()) {
GE_CHK_STATUS_RET(CreateEndpointInfo(src_queue_info, src_queue_index));
int32_t dst_queue_index = -1;
GE_CHK_STATUS_RET(CreateEndpointInfo(dst_queue_info, dst_queue_index));
int32_t dst_tag_index = -1;
GE_CHK_STATUS_RET_NOLOG(CreateGroupEntry(dst_queue_info, dst_tag_index));
int32_t dst_tag_group_index = -1;
std::vector<int32_t> dst_tag_group = {dst_tag_index};
GE_CHK_STATUS_RET_NOLOG(CreateGroupInfo(src_queue_info, dst_tag_group, dst_tag_group_index));
AddEndpointBindings(src_queue_index, dst_tag_group_index);
int32_t src_tag_index = -1;
GE_CHK_STATUS_RET_NOLOG(CreateGroupEntry(src_queue_info, src_tag_index));
int32_t src_tag_group_index = -1;
std::vector<int32_t> src_tag_group = {src_tag_index};
GE_CHK_STATUS_RET_NOLOG(CreateGroupInfo(dst_queue_info, src_tag_group, src_tag_group_index));
AddEndpointBindings(src_tag_group_index, dst_queue_index);
} else {
GE_CHK_STATUS_RET(CreateEndpointInfo(src_queue_info, src_queue_index));
}
return SUCCESS;
}
void HeterogeneousDeployPlanner::SelectHeadAndTailDevice(DeployPlan::DeviceInfo &device_info) {
DeployPlan::DeviceInfo default_device_info = {};
auto device = ResourceManager::GetInstance().GetDeviceInfo(default_device_info.GetNodeId(),
default_device_info.GetDeviceId(),
default_device_info.GetType());
if (device != nullptr) {
default_device_info.SetSupportFlowgw(device->SupportFlowgw());
auto head_npu_device = ResourceManager::GetInstance().GetHeadNpuDeviceInfo(default_device_info.GetNodeId());
if (head_npu_device != nullptr) {
default_device_info.SetProxyDeviceId(head_npu_device->GetDeviceId());
}
}
device_info = default_device_info;
GELOGI("ModelIO will be deployed on device[%s]", device_info.GetDesc().c_str());
}
}