* Copyright (c) 2025 Huawei Technologies Co., Ltd.
* This program is free software, you can redistribute it and/or modify it under the terms and conditions of
* CANN Open Software License Agreement Version 2.0 (the "License").
* Please refer to the License for details. You may not use this file except in compliance with the License.
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED,
* INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE.
* See LICENSE in the root of the software repository for the full text of the License.
*/
#include "deploy_planner.h"
#include <algorithm>
#include <exception>
#include "nlohmann/json.hpp"
#include "dflow/base/model/endpoint.h"
#include "graph/ge_context.h"
#include "graph/debug/ge_attr_define.h"
#include "framework/common/framework_types_internal.h"
#include "graph_metadef/common/ge_common/util.h"
#include "common/checker.h"
namespace ge {
namespace {
constexpr int32_t kLocalNodeId = 0;
constexpr int64_t kDepDefQueDepth = 128L;
constexpr uint32_t kMaxQueueNameLen = 127U;
const DeployPlan::DeviceInfo kLocalDeviceInfo{CPU, kLocalNodeId, 0};
constexpr const char_t *kEnableFusionTrue = "true";
constexpr const char_t *kAttrNameInvokedModelFusionInputs = "_invoked_model_fusion_inputs";
constexpr const char_t *kAttrValueDevicePlacement = "device";
const std::string kDynamicSchedRelationSuffix = "_dynamic_sched";
bool HasIntersection(const std::vector<std::string> &submodel_input, const std::vector<std::string> &root_input) {
std::unordered_set<std::string> submodel_inputs(submodel_input.begin(), submodel_input.end());
for (auto &ele : root_input) {
if (submodel_inputs.count(ele) > 0) {
return true;
}
}
return false;
}
bool IsNeedDeviceQueue(const DeployPlan::SubmodelInfo &submodel_info) {
if (submodel_info.model == nullptr) {
return false;
}
std::string placement;
(void)AttrUtils::GetStr(submodel_info.model->GetRootGraph(), ATTR_NAME_FLOW_ATTR_IO_PLACEMENT, placement);
return placement == kAttrValueDevicePlacement;
}
}
std::atomic<int64_t> DeployPlannerBase::endpoint_name_id_gen_{};
std::atomic<int64_t> DeployPlannerBase::plan_id_gen_{};
using DynamicSchedInfo = std::map<std::string, DeployPlan::SubmodelInfo>;
const std::vector<DeployPlan::QueueInfo> &DeployPlan::GetQueueInfoList() const {
return queues_;
}
const std::vector<DeployPlan::QueueInfo> &DeployPlan::GetGroupEntryInfoList() const {
return group_entries_;
}
const std::vector<std::pair<int32_t, int32_t>> &DeployPlan::GetQueueBindings() const {
return queue_bindings_;
}
const std::vector<int32_t> DeployPlan::GetBroadcastIndices(int32_t src_endpoint_index) const {
std::vector<int32_t> empty;
std::vector<int32_t> broadcast_indices;
const auto &it = src_to_dst_endpoints_.find(src_endpoint_index);
if (it == src_to_dst_endpoints_.cend()) {
return empty;
}
for (const auto &ins_it : it->second) {
if (ins_it.second.size() > 1U) {
return empty;
}
broadcast_indices.emplace_back(ins_it.second[0]);
}
return broadcast_indices;
}
const std::vector<int32_t> &DeployPlan::GetInputQueueIndices() const {
return root_model_info_.input_queue_indices;
}
const std::vector<int32_t> &DeployPlan::GetOutputQueueIndices() const {
return root_model_info_.output_queue_indices;
}
const std::map<std::string, std::vector<std::string>> &DeployPlan::GetTrimmingEdgesModelInstances() const {
return trimming_edges_model_instance_names_;
}
const std::map<std::string, DeployPlan::SubmodelInfo> &DeployPlan::GetSubmodels() const {
return submodels_;
}
std::map<std::string, DeployPlan::SubmodelInfo> &DeployPlan::MutableSubmodels() {
return submodels_;
}
const std::map<int32_t, std::vector<int32_t>> &DeployPlan::GetGroups() const {
return groups_;
}
bool DeployPlan::IsGroupEndpoint(const int32_t queue_index) const {
return groups_.find(queue_index) != groups_.end();
}
Status DeployPlan::GetQueueInfo(const int32_t queue_index, const DeployPlan::QueueInfo *&queue_info) const {
if ((queue_index < 0) || (static_cast<size_t>(queue_index) >= queues_.size())) {
GELOGE(PARAM_INVALID, "Queue index(%d) out of range: [0, %zu)", queue_index, queues_.size());
return PARAM_INVALID;
}
queue_info = &queues_[static_cast<size_t>(queue_index)];
return SUCCESS;
}
std::vector<int32_t> DeployPlan::GetAllInputQueueIndices() const {
auto all_indices = root_model_info_.input_queue_indices;
(void)all_indices.insert(all_indices.cend(), root_model_info_.control_input_queue_indices.cbegin(),
root_model_info_.control_input_queue_indices.cend());
return all_indices;
}
const std::vector<int32_t> &DeployPlan::GetControlInputQueueIndices() const {
return root_model_info_.control_input_queue_indices;
}
const std::vector<int32_t> &DeployPlan::GetControlOutputQueueIndices() const {
return root_model_info_.control_output_queue_indices;
}
const DeployPlan::DeviceInfo &DeployPlan::GetRootModelQueueDeviceInfo() const {
return root_model_info_.queue_device_info;
}
const DeployPlan::DynamicSchedPlan &DeployPlan::GetDynamicSchedPlan() const {
return dynamic_sched_plan_;
}
void DeployPlan::SetIsDynamicSched(const bool is_dynamic_sched) {
is_dynamic_sched_ = is_dynamic_sched;
}
const bool &DeployPlan::GetIsDynamicSched() const {
return is_dynamic_sched_;
}
void DeployPlan::SetEnableExceptionCatch(bool enable_exception_catch) {
enable_exception_catch_ = enable_exception_catch;
}
bool DeployPlan::IsEnableExceptionCatch() const {
return enable_exception_catch_;
}
DeployPlan::ModelDeployInfo &DeployPlan::GetModelDeployInfos() {
return model_deploy_infos_;
}
bool DeployPlan::DeviceInfo::WithProxy() const {
return (GetType() == static_cast<int32_t>(CPU)) && (GetProxyDeviceId() != -1);
}
DeployPlan::DeviceInfo DeployPlan::DeviceInfo::ProxyDevice() const {
return DeployPlan::DeviceInfo(static_cast<int32_t>(NPU), node_id_, proxy_device_id_);
}
DeployPlan::DeviceInfo::DeviceInfo(const int32_t type, const int32_t node_id, const int32_t device_id) noexcept
: DeviceInfo(type, node_id, device_id, (type == static_cast<int32_t>(CPU)) ? -1 : device_id) {}
DeployPlan::DeviceInfo::DeviceInfo(const int32_t type, const int32_t node_id, const int32_t device_id,
const int32_t proxy_device_id) noexcept
: type_(type),
node_id_(node_id),
device_id_(device_id),
proxy_device_id_(proxy_device_id),
hcom_device_id_(device_id) {
key_ = std::to_string(type) + "_" + std::to_string(node_id) + "_" + std::to_string(device_id);
desc_ = key_ + "(" + std::to_string(proxy_device_id_) + ")";
}
int32_t DeployPlan::DeviceInfo::GetType() const {
return type_;
}
int32_t DeployPlan::DeviceInfo::GetNodeId() const {
return node_id_;
}
int32_t DeployPlan::DeviceInfo::GetDeviceId() const {
return device_id_;
}
int32_t DeployPlan::DeviceInfo::GetProxyDeviceId() const {
return proxy_device_id_;
}
int32_t DeployPlan::DeviceInfo::GetHcomDeviceId() const {
return hcom_device_id_;
}
void DeployPlan::DeviceInfo::SetHcomDeviceId(int32_t hcom_device_id) {
hcom_device_id_ = hcom_device_id;
}
int32_t DeployPlan::DeviceInfo::GetOsId() const {
return os_id_;
}
void DeployPlan::DeviceInfo::SetOsId(int32_t os_id) {
os_id_ = os_id;
}
const std::string &DeployPlan::DeviceInfo::GetKey() const {
return key_;
}
const std::string &DeployPlan::DeviceInfo::GetDesc() const {
return desc_;
}
const std::vector<int32_t> &DeployPlan::DynamicSchedPlan::GetStatusOutputQueueIndices() const {
return root_model_info_.status_output_queue_indices;
}
const std::vector<int32_t> &DeployPlan::DynamicSchedPlan::GetSchedOutputQueueIndices() const {
return root_model_info_.sched_output_queue_indices;
}
const std::vector<int32_t> &DeployPlan::DynamicSchedPlan::GetSchedInputQueueIndices() const {
return root_model_info_.sched_input_queue_indices;
}
const std::map<int32_t, int32_t> &DeployPlan::DynamicSchedPlan::GetDatagwRequestBindings() const {
return datagw_request_bindings_;
}
const std::map<int32_t, int32_t> &DeployPlan::DynamicSchedPlan::GetEntryBindings() const {
return entry_to_dst_index_;
}
const DeployPlan::DynamicSchedIndex &DeployPlan::DynamicSchedPlan::GetModelIndexInfo() const {
return model_index_info_;
}
const std::map<std::string, uint32_t> &DeployPlan::DynamicSchedPlan::GetModelInstanceNum() const {
return model_instances_num_;
}
DeployPlanner::DeployPlanner(const PneModelPtr &root_model) : DeployPlannerBase(), root_model_(root_model) {}
Status DeployPlannerBase::BuildPlan(DeployPlan &deploy_plan) {
deploy_plan_.is_dynamic_sched_ = deploy_plan.GetIsDynamicSched();
deploy_plan_.enable_exception_catch_ = deploy_plan.IsEnableExceptionCatch();
GE_CHK_STATUS_RET(Initialize(), "Failed to initialize deploy planner.");
GE_CHK_STATUS_RET(ParseModelRelation(), "Failed to parse model relation.");
plan_id_gen_++;
deploy_plan = std::move(deploy_plan_);
return SUCCESS;
}
Status DeployPlannerBase::Initialize() {
GE_CHK_STATUS_RET(PrepareModelsAndRelation(model_relation_), "Failed to prepare");
UpdateRelationForControlIo();
UpdateRelationForDynamicSched();
relation_reader_ = MakeUnique<ModelRelationReader>(model_relation_);
GE_CHECK_NOTNULL(relation_reader_);
GE_CHK_STATUS_RET(relation_reader_->Initialize(), "Failed to initialize model relation reader");
const auto &root_model_endpoint_info = model_relation_.root_model_endpoint_info;
head_model_queue_info_.output_endpoint_names = root_model_endpoint_info.input_endpoint_names;
head_model_queue_info_.external_output_queue_names = root_model_endpoint_info.external_input_queue_names;
head_model_queue_info_.model_name = "__head";
tail_model_queue_info_.input_endpoint_names = root_model_endpoint_info.output_endpoint_names;
tail_model_queue_info_.external_input_queue_names = root_model_endpoint_info.external_output_queue_names;
tail_model_queue_info_.model_name = "__tail";
SelectHeadAndTailDevice(head_model_info_.queue_device_info);
SelectHeadAndTailDevice(tail_model_info_.queue_device_info);
return SUCCESS;
}
Status DeployPlanner::PrepareModelsAndRelation(ModelRelation &model_relation) {
GE_CHECK_NOTNULL(root_model_->GetModelRelation().get());
ModelRelationFlattener flattener(root_model_);
std::map<std::string, PneModelPtr> name_to_models;
GE_CHK_STATUS_RET_NOLOG(flattener.Flatten(model_relation, name_to_models));
GE_CHK_STATUS_RET_NOLOG(ValidateModelAndRelation(name_to_models, model_relation));
for (const auto &it : name_to_models) {
const auto &model_name = it.first;
const auto &submodel = it.second;
auto &submodel_info = MutableSubmodelInfo(model_name);
submodel_info.model = submodel;
submodel_info.device_info = kLocalDeviceInfo;
submodel_info.queue_device_info = kLocalDeviceInfo;
GELOGD("Model[%s] will be deployed on device[%s]", model_name.c_str(), submodel_info.device_info.GetDesc().c_str());
submodel_info.is_head = false;
}
return SUCCESS;
}
void DeployPlannerBase::UpdateForInputControlIo() {
std::vector<std::string> models_without_input;
for (const auto &it : model_relation_.submodel_endpoint_infos) {
const auto &submodel_endpoint_info = it.second;
if (submodel_endpoint_info.input_endpoint_names.empty() &&
submodel_endpoint_info.external_input_queue_names.empty()) {
if (!submodel_endpoint_info.output_endpoint_names.empty()) {
GELOGI("submodel [%s] needs control input", it.first.c_str());
models_without_input.emplace_back(it.first);
}
}
}
if (!models_without_input.empty()) {
const std::string control_input_queue_name = "__control_input";
Endpoint queue_def(control_input_queue_name, EndpointType::kQueue);
auto queue_def_utils =
QueueNodeUtils(queue_def).SetDepth(kDepDefQueDepth).SetEnqueuePolicy("FIFO").
SetNodeAction(kQueueActionControl);
model_relation_.endpoints.emplace_back(queue_def);
model_relation_.root_model_endpoint_info.input_endpoint_names.emplace_back(control_input_queue_name);
for (const auto &model_name : models_without_input) {
model_relation_.submodel_endpoint_infos[model_name].input_endpoint_names.emplace_back(control_input_queue_name);
GELOGD("model_name:%s, control_input_name:%s, is_control:%d.", model_name.c_str(),
control_input_queue_name.c_str(), queue_def_utils.GetIsControl());
}
}
}
void DeployPlannerBase::UpdateForOutputControlIo() {
std::set<std::string> invoked_inputs;
for (const auto &it : model_relation_.invoked_model_queue_infos) {
invoked_inputs.insert(it.second.input_queue_names.cbegin(), it.second.input_queue_names.cend());
}
std::map<std::string, std::vector<std::string>> models_without_output;
for (const auto &it : model_relation_.submodel_endpoint_infos) {
const auto &submodel_endpoint_info = it.second;
if (submodel_endpoint_info.output_endpoint_names.empty()) {
const auto is_invoked = std::any_of(submodel_endpoint_info.input_endpoint_names.cbegin(),
submodel_endpoint_info.input_endpoint_names.cend(),
[&invoked_inputs](const std::string &endpoint_name) -> bool {
return invoked_inputs.find(endpoint_name) != invoked_inputs.cend();
});
if ((!is_invoked) &&
(!submodel_endpoint_info.input_endpoint_names.empty() ||
!submodel_endpoint_info.external_input_queue_names.empty())) {
GELOGI("Submodel[%s] needs control output", it.first.c_str());
models_without_output[submodel_endpoint_info.model_name].emplace_back(it.first);
}
}
}
for (const auto &it : models_without_output) {
const auto &model_name = it.first;
const std::string control_output_queue_name = "__" + model_name + "_control_output";
Endpoint queue_def(control_output_queue_name, EndpointType::kQueue);
QueueNodeUtils(queue_def).SetDepth(kDepDefQueDepth).SetEnqueuePolicy("FIFO").
SetNodeAction(kQueueActionControl);
model_relation_.endpoints.emplace_back(queue_def);
model_relation_.root_model_endpoint_info.output_endpoint_names.emplace_back(control_output_queue_name);
for (const auto &model_instance_name : it.second) {
model_relation_.submodel_endpoint_infos[model_instance_name].output_endpoint_names.emplace_back(
control_output_queue_name);
}
}
}
void DeployPlannerBase::UpdateRelationForControlIo() {
UpdateForInputControlIo();
UpdateForOutputControlIo();
}
Status DeployPlannerBase::ValidateModelAndRelation(const std::map<std::string, PneModelPtr> &models,
const ModelRelation &model_relation) {
for (const auto &it : model_relation.submodel_endpoint_infos) {
const auto &model_instance_name = it.first;
const auto &submodel = models.find(model_instance_name);
if (submodel == models.end()) {
GELOGE(PARAM_INVALID, "model exists in ModelRelation bot not found in RootModel, name = %s",
model_instance_name.c_str());
return PARAM_INVALID;
}
}
return SUCCESS;
}
Status DeployPlannerBase::ParseModelRelation() {
MarkMultiDeployedModels();
GenerateDynamicSchedModelId();
GE_CHK_STATUS_RET(AssignEnqueueQueues(), "Failed to assign enqueue queues");
GE_CHK_STATUS_RET(AssignDynamicSchedEnqueueQueues(), "Failed to assign dynamic sched enqueue queues");
GE_CHK_STATUS_RET(ResolveEnqueueFusion(), "Failed to resolve enqueue fusion");
GE_CHK_STATUS_RET(ResolveInvokedFusion(), "Failed to resolve invoked fusion");
GE_CHK_STATUS_RET(ResolveDataFlows(), "Failed to resolve flow relations");
GE_CHK_STATUS_RET(AdjustEnqueueDevices(), "Failed to adjust enqueue devices");
LogDataFlow();
GE_CHK_STATUS_RET(ResolveReusableQueues(), "Failed to resolve reusable queues");
GE_CHK_STATUS_RET(AssignDequeueQueues(), "Failed to assign dequeue queues");
GE_CHK_STATUS_RET(AssignDynamicSchedDequeueQueues(), "Failed to assign dynamic sched dequeue queues");
GE_CHK_STATUS_RET(BindOutputToRemoteInputs(), "Failed to bind output groups");
GE_CHK_STATUS_RET(BindRemoteOutputGroupToInput(), "Failed to bind input groups");
UpdateDeployPlan();
UpdateDynamicSchedDeployPlan();
GE_CHK_STATUS_RET(BuildDynamicSchedInfo(), "Failed to build dynamic sched info");
GE_CHK_STATUS_RET(SetHeadNodeInfo(), "Failed to set head node info");
return SUCCESS;
}
Status DeployPlannerBase::AssignEnqueueQueues() {
GE_CHK_STATUS_RET_NOLOG(
CreateOutputQueueDefs(head_model_queue_info_.model_name, head_model_queue_info_.output_endpoint_names));
GE_CHK_STATUS_RET_NOLOG(CreateOutputQueueDefs(head_model_queue_info_.model_name,
head_model_queue_info_.external_output_queue_names, false));
for (const auto &it : model_relation_.submodel_endpoint_infos) {
const auto &model_instance_name = it.first;
GE_CHK_STATUS_RET_NOLOG(CreateOutputQueueDefs(model_instance_name, it.second.output_endpoint_names));
const auto &invoke_model_keys = it.second.invoke_model_keys;
for (const auto &invoke_model_key : invoke_model_keys) {
auto invoked_model_queue_info = relation_reader_->GetInvokedModelQueueInfo(invoke_model_key);
GE_CHECK_NOTNULL(invoked_model_queue_info, ", get invoked model queue info is null, model_name = %s",
model_instance_name.c_str());
GE_CHK_STATUS_RET_NOLOG(
CreateFeedEndpoints(model_instance_name, invoked_model_queue_info->input_queue_names, invoke_model_key));
}
}
return SUCCESS;
}
Status DeployPlannerBase::GetInvokedModelFusionInputs(const PneModelPtr model,
std::map<std::string, std::string> &fusion_inputs) {
std::string invoked_model_fusion_inputs_str;
if (model != nullptr && model->GetRootGraph() != nullptr) {
(void) AttrUtils::GetStr(model->GetRootGraph(), kAttrNameInvokedModelFusionInputs, invoked_model_fusion_inputs_str);
}
if (invoked_model_fusion_inputs_str.empty()) {
return SUCCESS;
}
nlohmann::json js;
try {
js = nlohmann::json::parse(invoked_model_fusion_inputs_str);
for (const auto &item : js.items()) {
fusion_inputs[item.key()] = item.value().get<std::string>();
}
} catch (const nlohmann::json::exception &e) {
GELOGE(FAILED, "Invalid json format of invoked model fusion inputs, exception:%s", e.what());
return FAILED;
}
return SUCCESS;
}
Status DeployPlannerBase::ResolveInvokedFusion() {
for (const auto &it : model_relation_.submodel_endpoint_infos) {
const auto &model_instance_name = it.first;
auto &submodel_info = MutableSubmodelInfo(model_instance_name);
std::map<std::string, std::string> invoked_model_fusion_inputs;
GE_CHK_STATUS_RET(GetInvokedModelFusionInputs(submodel_info.model, invoked_model_fusion_inputs),
"Failed to get invoked model fusion inputs");
if (invoked_model_fusion_inputs.empty()) {
GELOGI("Model[%s] is no need fusion invoked inputs without attr[%s].",
model_instance_name.c_str(), kAttrNameInvokedModelFusionInputs);
continue;
}
const auto &invoke_model_keys = it.second.invoke_model_keys;
for (const auto &invoke_model_key : invoke_model_keys) {
const auto &it2 = invoked_model_fusion_inputs.find(invoke_model_key);
if (it2 == invoked_model_fusion_inputs.cend()) {
GELOGI("Invoked model[invoke key:%s] is no need fusion invoked inputs without attr[%s].",
model_instance_name.c_str(), kAttrNameInvokedModelFusionInputs);
continue;
}
const auto &fusion_inputs = it2->second;
auto invoked_model_queue_info = relation_reader_->GetInvokedModelQueueInfo(invoke_model_key);
GE_CHECK_NOTNULL(invoked_model_queue_info, ", get invoked model queue info is null, model_name = %s",
model_instance_name.c_str());
GE_CHK_STATUS_RET(ResolveModelInvokedFusion(model_instance_name,
invoked_model_queue_info->input_queue_names,
invoke_model_key,
fusion_inputs),
"Failed to resolve model invoked fusion");
}
}
return SUCCESS;
}
Status DeployPlannerBase::ParseInputIndexWithRange(const std::string &fusion_input_str,
std::vector<size_t> &fusion_input_index_list) {
std::string range_begin_str;
std::string range_end_str;
auto range_str = ge::StringUtils::Split(fusion_input_str, '~');
GE_CHECK_GE(range_str.size(), 1UL);
GE_CHECK_LE(range_str.size(), 2UL);
range_begin_str = range_str[0];
if (range_str.size() == 1UL) {
range_end_str = range_str[0];
} else {
range_end_str = range_str[1];
}
int32_t range_begin = 0;
int32_t range_end = 0;
try {
range_begin = std::stoi(range_begin_str);
range_end = std::stoi(range_end_str);
} catch (...) {
GELOGE(FAILED, "Fusion input str[%s] is illegal.", fusion_input_str.c_str());
return FAILED;
}
GE_CHECK_GE(range_end, range_begin);
std::set<size_t> index_list_set;
for (int32_t i = range_begin; i <= range_end; ++i) {
index_list_set.emplace(static_cast<size_t>(i));
}
fusion_input_index_list.insert(fusion_input_index_list.end(), index_list_set.begin(), index_list_set.end());
return SUCCESS;
}
Status DeployPlannerBase::ParseInvokedModelFusionInputs(const std::string &fusion_inputs_str,
std::vector<std::vector<size_t>> &fusion_inputs_list) {
auto normalized_fusion_inputs_str = StringUtils::ReplaceAll(fusion_inputs_str, " ", "");
GE_CHK_BOOL_RET_STATUS(!normalized_fusion_inputs_str.empty(), PARAM_INVALID,
"Fusion inputs[%s] is empty after normalized.", fusion_inputs_str.c_str());
auto fusion_inputs_str_list = ge::StringUtils::Split(normalized_fusion_inputs_str, ';');
GE_CHK_BOOL_RET_STATUS(fusion_inputs_str_list.size() != 0UL, PARAM_INVALID,
"Invalid format of fusion inputs:%s to separated by ';'", fusion_inputs_str.c_str());
for (const auto &fusion_input_str_list_str : fusion_inputs_str_list) {
std::vector<size_t> fusion_input_index_list;
auto fusion_input_str_list = ge::StringUtils::Split(fusion_input_str_list_str, ',');
GE_CHK_BOOL_RET_STATUS(fusion_input_str_list.size() != 0UL, PARAM_INVALID,
"Invalid format of fusion inputs:%s to separated by ','", fusion_inputs_str.c_str());
for (const auto &fusion_input_str : fusion_input_str_list) {
GE_CHK_STATUS_RET(ParseInputIndexWithRange(fusion_input_str, fusion_input_index_list),
"Failed to parse input index with range");
}
GELOGI("Parse fusion input string[%s] success, fusion list = %s",
fusion_input_str_list_str.c_str(), ToString(fusion_input_index_list).c_str());
fusion_inputs_list.emplace_back(fusion_input_index_list);
}
return SUCCESS;
}
Status DeployPlannerBase::ResolveModelInvokedFusion(const std::string &model_instance_name,
const std::vector<std::string> &queue_names,
const std::string &invoke_key,
const std::string &fusion_inputs) {
std::vector<const Endpoint *> endpoints;
GE_CHK_STATUS_RET(relation_reader_->BatchGetEndpoints(queue_names, endpoints), "Failed to batch get endpoints");
auto &submodel_info = MutableSubmodelInfo(model_instance_name);
if (endpoints.size() <= 1UL) {
GELOGI("Model[%s] input size[%zu] <= 1, no need to fusion.", model_instance_name.c_str(),
endpoints.size());
return SUCCESS;
}
std::vector<std::vector<size_t>> fusion_inputs_list;
GE_CHK_STATUS_RET(ParseInvokedModelFusionInputs(fusion_inputs, fusion_inputs_list),
"Failed to parse invoked model fusion inputs");
for (const auto &fusion_list : fusion_inputs_list) {
GE_CHECK_GE(fusion_list.size(), 1UL);
auto begin_index = *fusion_list.begin();
GE_CHECK_LE(begin_index + 1, submodel_info.invoked_model_queue_infos[invoke_key].feed_queue_indices.size());
auto fusion_index = submodel_info.invoked_model_queue_infos[invoke_key].feed_queue_indices[begin_index];
int32_t i = -1;
for (auto input_index : fusion_list) {
i++;
if (i == 0) {
continue;
}
const auto &feed_queue_indices = submodel_info.invoked_model_queue_infos[invoke_key].feed_queue_indices;
GE_CHK_BOOL_RET_STATUS(input_index < feed_queue_indices.size(), FAILED,
"Failed to check input index[%zu], must < %zu", input_index, feed_queue_indices.size());
auto index = feed_queue_indices[input_index];
auto &endpoint = deploy_plan_.queues_[static_cast<size_t>(index)];
endpoint.ref_index = fusion_index;
endpoint.fusion_offset = i;
GELOGI("Input[%zu] is fused, index = %d, fusion index = %d, fusion offset = %d.",
input_index, index, fusion_index, endpoint.fusion_offset);
}
}
return SUCCESS;
}
Status DeployPlannerBase::ResolveEnqueueFusion() {
std::string enable_fusion;
(void)ge::GetContext().GetOption(OPTION_EXEC_ENABLE_FUSION, enable_fusion);
if (enable_fusion != kEnableFusionTrue) {
GELOGI("Option[%s] value[%s] means no need to fusion.", OPTION_EXEC_ENABLE_FUSION, enable_fusion.c_str());
return SUCCESS;
}
for (const auto &it : model_relation_.submodel_endpoint_infos) {
const auto &model_instance_name = it.first;
const auto &submodel_endpoint_info = it.second;
GELOGI("Resolve model[%s] placement begin.", model_instance_name.c_str());
GE_CHK_STATUS_RET_NOLOG(ResolveInputsPlacement(model_instance_name, submodel_endpoint_info));
}
GE_CHK_STATUS_RET_NOLOG(ResolveInputsPlacement(tail_model_queue_info_.model_name, tail_model_queue_info_));
GE_CHK_STATUS_RET_NOLOG(ResolveModelFusion(head_model_queue_info_.model_name, head_model_queue_info_));
return SUCCESS;
}
Status DeployPlannerBase::ResolveInputsPlacement(const std::string &model_instance_name,
const ModelRelation::ModelEndpointInfo &model_endpoint_info) {
const std::set<std::string> kSupportFusionEngines = {PNE_ID_NPU};
const auto &model_type = GetSubmodelType(model_instance_name);
bool support_fusion = kSupportFusionEngines.find(model_type) != kSupportFusionEngines.end();
auto &submodel_info = MutableSubmodelInfo(model_instance_name);
std::vector<const Endpoint *> model_input_endpoints;
GE_CHK_STATUS_RET_NOLOG(
relation_reader_->BatchGetEndpoints(model_endpoint_info.input_endpoint_names, model_input_endpoints));
GE_CHK_STATUS_RET_NOLOG(
relation_reader_->BatchGetEndpoints(model_endpoint_info.sched_input_queue_names, model_input_endpoints));
for (size_t i = 0UL; i < model_input_endpoints.size(); ++i) {
const auto *queue_def = model_input_endpoints[i];
const auto &queue_name = queue_def->GetName();
if (!support_fusion) {
disable_fusion_queues_.emplace(queue_name);
}
dequeue_placements_[queue_name].emplace(submodel_info.queue_device_info.GetKey());
}
return SUCCESS;
}
Status DeployPlannerBase::ResolveModelFusion(const std::string &model_instance_name,
const ModelRelation::ModelEndpointInfo &model_endpoint_info) {
std::vector<const Endpoint *> model_output_endpoints;
GE_CHK_STATUS_RET_NOLOG(
relation_reader_->BatchGetEndpoints(model_endpoint_info.output_endpoint_names, model_output_endpoints));
auto &submodel_info = MutableSubmodelInfo(model_instance_name);
if (model_output_endpoints.size() <= 1UL) {
GELOGI("Model[%s] input size[%zu] <= 1, no need to fusion.", model_instance_name.c_str(),
model_output_endpoints.size());
return SUCCESS;
}
auto fusion_index = submodel_info.output_queue_indices[0];
auto fusion_name = model_output_endpoints[0]->GetName();
for (size_t i = 1UL; i < model_output_endpoints.size(); ++i) {
const auto *queue_def = model_output_endpoints[i];
const auto &queue_name = queue_def->GetName();
if (!CanBeFused(fusion_name, queue_name)) {
GELOGI("Endpoint[%s] cannot be fused.", queue_name.c_str());
break;
}
auto index = submodel_info.output_queue_indices[i];
auto &endpoint = deploy_plan_.queues_[static_cast<size_t>(index)];
endpoint.ref_index = fusion_index;
endpoint.fusion_offset = static_cast<int32_t>(i);
GELOGI("Endpoint[%s] is fused, fusion endpoint name = %s, index = %d, fusion index = %d, fusion offset = %d.",
queue_name.c_str(), fusion_name.c_str(), index, fusion_index, endpoint.fusion_offset);
}
return SUCCESS;
}
bool DeployPlannerBase::CanBeFused(const std::string &fusion_name, const std::string &endpoint_name) {
if (disable_fusion_queues_.find(endpoint_name) != disable_fusion_queues_.end()) {
GELOGI("Endpoint[%s] cannot be fused.", endpoint_name.c_str());
return false;
}
return dequeue_placements_[fusion_name] == dequeue_placements_[endpoint_name];
}
void DeployPlannerBase::MarkMultiDeployedModels() {
std::map<std::string, std::map<std::string, std::vector<std::string>>> model_instances;
for (const auto &it : model_relation_.submodel_endpoint_infos) {
const auto &model_instance_name = it.first;
const auto &device_info = MutableSubmodelInfo(model_instance_name).queue_device_info;
const auto &submodel_endpoint_info = it.second;
model_instances[submodel_endpoint_info.model_name][device_info.GetKey()].emplace_back(model_instance_name);
instance_to_model_name_[model_instance_name] = submodel_endpoint_info.model_name;
model_deploy_locations_[submodel_endpoint_info.model_name].emplace_back(std::make_pair(model_instance_name,
device_info));
}
int32_t model_id = 1;
for (const auto &it : model_instances) {
model_name_to_id_[it.first] = model_id++;
const auto &model_name = it.first;
const auto &model_instance_list = it.second;
if (model_instance_list.size() > 1U || model_instance_list.begin()->second.size() > 1U) {
GELOGI("Submodel[%s] is multiple deployed.", model_name.c_str());
for (const auto &dev_to_model_instance_name : model_instance_list) {
const auto &device_key = dev_to_model_instance_name.first;
int32_t process_id = 0;
for (const auto &model_instance_name : dev_to_model_instance_name.second) {
auto &submodel_info = MutableSubmodelInfo(model_instance_name);
submodel_info.process_id = process_id++;
GELOGD("Submodel[%s] is deploy to device[%s], instance = %s, process_id = %d.",
model_name.c_str(), device_key.c_str(), model_instance_name.c_str(), submodel_info.process_id);
}
}
}
}
}
bool DeployPlannerBase::CanConnectWithQ(const DeployPlan::DeviceInfo &src_device_info,
const DeployPlan::DeviceInfo &dst_device_info) {
GELOGI("Check can connect with queue, src device[%s], dst device[%s].",
src_device_info.GetDesc().c_str(), dst_device_info.GetDesc().c_str());
return (src_device_info.GetNodeId() == dst_device_info.GetNodeId()) &&
((src_device_info.GetType() != dst_device_info.GetType()) ||
(src_device_info.GetDeviceId() == dst_device_info.GetDeviceId()) ||
(src_device_info.GetOsId() == dst_device_info.GetOsId() &&
src_device_info.GetHcomDeviceId() == dst_device_info.GetHcomDeviceId()));
}
bool DeployPlannerBase::CanConnectWithLocalQ(const DeployPlan::DeviceInfo &src_device_info,
const DeployPlan::DeviceInfo &dst_device_info) {
GELOGI("Check can directly connect with queue, src device[%s], dst device[%s].",
src_device_info.GetDesc().c_str(), dst_device_info.GetDesc().c_str());
return (src_device_info.GetKey() == dst_device_info.GetKey()) ||
((src_device_info.GetType() == dst_device_info.GetType()) &&
(src_device_info.GetOsId() == dst_device_info.GetOsId()) &&
(src_device_info.GetHcomDeviceId() == dst_device_info.GetHcomDeviceId()));
}
bool DeployPlannerBase::IsContainInvokedModel(const std::string &src_model_instance_name,
const std::string &dst_model_instance_name) {
std::vector<std::string> model_instance_names;
model_instance_names.push_back(src_model_instance_name);
model_instance_names.push_back(dst_model_instance_name);
GELOGI("Try find if containing invoked nn model, src model instance name[%s], dst model instance name[%s].",
src_model_instance_name.c_str(), dst_model_instance_name.c_str());
for (const auto &model_instance_name : model_instance_names) {
auto &submodel_info = MutableSubmodelInfo(model_instance_name);
if (submodel_info.model != nullptr) {
bool is_invoked_model = false;
(void)AttrUtils::GetBool(submodel_info.model->GetRootGraph(),
ATTR_NAME_DATA_FLOW_UDF_INVOKED_NN, is_invoked_model);
if (is_invoked_model) {
return true;
}
}
}
return false;
}
void DeployPlannerBase::AddTrimmingEdgesModelInstance(const std::string &src_model_instance_name,
const std::string &dst_model_instance_name) {
deploy_plan_.trimming_edges_model_instance_names_[src_model_instance_name].emplace_back(dst_model_instance_name);
GELOGI("Added trimming edges model instance, src model instance name[%s], dst model instance name[%s].",
src_model_instance_name.c_str(), dst_model_instance_name.c_str());
}
bool DeployPlannerBase::CheckSkipBinding(const std::string &src_model_instance_name,
const std::string &dst_model_instance_name) {
const auto &src_it = instance_to_model_name_.find(src_model_instance_name);
const auto &dst_it = instance_to_model_name_.find(dst_model_instance_name);
if (src_it == instance_to_model_name_.cend() || dst_it == instance_to_model_name_.cend()) {
GELOGI("Cannot find model name according instance, src[%s], dst[%s].",
src_model_instance_name.c_str(), dst_model_instance_name.c_str());
return false;
}
const auto &src_model_name = src_it->second;
const auto &dst_model_name = dst_it->second;
const auto &src_model_location_it = model_deploy_locations_.find(src_model_name);
const auto &dst_model_location_it = model_deploy_locations_.find(dst_model_name);
if (src_model_location_it == model_deploy_locations_.cend() ||
dst_model_location_it == model_deploy_locations_.cend()) {
GELOGI("Failed to find model location, src model_name = %s, dst model_name = %s.",
src_model_name.c_str(), dst_model_name.c_str());
return false;
}
const auto &src_model_location = src_model_location_it->second;
const auto &dst_model_location = dst_model_location_it->second;
if (src_model_location.size() != dst_model_location.size()) {
GELOGI("Model deployed instance num is different, src[%s] is %zu, dst[%s] is %zu.",
src_model_name.c_str(), src_model_location.size(), dst_model_name.c_str(), dst_model_location.size());
return false;
}
if (src_model_location.size() <= 1U) {
GELOGI("Model is not muilti deployed, model name = %s.", src_model_name.c_str());
return false;
}
size_t src_model_instance_index = 0U;
size_t dst_model_instance_index = 0U;
const bool has_invoked = IsContainInvokedModel(src_model_instance_name, dst_model_instance_name);
for (size_t i = 0U; i < src_model_location.size(); ++i) {
if (!has_invoked && !CanConnectWithQ(src_model_location[i].second, dst_model_location[i].second)) {
GELOGI("Model instances cannot connect with queue.");
return false;
}
if (src_model_location[i].first == src_model_instance_name) {
src_model_instance_index = i;
}
if (dst_model_location[i].first == dst_model_instance_name) {
dst_model_instance_index = i;
}
}
auto skip_binding = src_model_instance_index != dst_model_instance_index;
if (!skip_binding && !has_invoked) {
AddTrimmingEdgesModelInstance(src_model_instance_name, dst_model_instance_name);
}
GELOGI("Model instances deployed on same devices, skip binding = %d, src instance[%s] index = %zu, "
"dst instance[%s] index = %zu.", static_cast<int32_t>(skip_binding), src_model_instance_name.c_str(),
src_model_instance_index, dst_model_instance_name.c_str(), dst_model_instance_index);
return skip_binding;
}
Status DeployPlannerBase::ResolveDataFlows() {
for (const auto &it : model_relation_.submodel_endpoint_infos) {
const auto &model_instance_name = it.first;
const auto &submodel_endpoint_info = it.second;
GE_CHK_STATUS_RET_NOLOG(ResolveModelInputs(model_instance_name, submodel_endpoint_info));
GE_CHK_STATUS_RET_NOLOG(ResolveModelDynamicInputs(model_instance_name, submodel_endpoint_info));
}
GE_CHK_STATUS_RET_NOLOG(ResolveModelInputs(tail_model_queue_info_.model_name, tail_model_queue_info_));
GE_CHK_STATUS_RET_NOLOG(ResolveModelDynamicInputs(tail_model_queue_info_.model_name, tail_model_queue_info_));
return SUCCESS;
}
Status DeployPlannerBase::ResolveModelInputs(const std::string &model_instance_name,
const ModelRelation::ModelEndpointInfo &model_endpoint_info) {
std::vector<const Endpoint *> model_input_endpoints;
GE_CHK_STATUS_RET_NOLOG(
relation_reader_->BatchGetEndpoints(model_endpoint_info.input_endpoint_names, model_input_endpoints));
GE_CHK_STATUS_RET_NOLOG(
relation_reader_->BatchGetEndpoints(model_endpoint_info.external_input_queue_names, model_input_endpoints));
std::vector<ModelQueueIndex> model_queue_ids;
model_queue_ids.reserve(model_input_endpoints.size());
for (size_t input_index = 0UL; input_index < model_endpoint_info.input_endpoint_names.size(); ++input_index) {
ModelQueueIndex input_queue_index{model_endpoint_info.model_name, "", static_cast<int32_t>(input_index)};
model_queue_ids.emplace_back(std::move(input_queue_index));
}
ModelQueueIndex external_queue_index{model_endpoint_info.model_name, "", -1};
model_queue_ids.resize(model_input_endpoints.size(), external_queue_index);
for (const auto &invoke_model_key : model_endpoint_info.invoke_model_keys) {
auto invoked_model_queue_info = relation_reader_->GetInvokedModelQueueInfo(invoke_model_key);
GE_ASSERT_NOTNULL(invoked_model_queue_info,
", failed to get invoked model queue info, model_instance_name=%s, invoke_model_key=%s.",
model_instance_name.c_str(), invoke_model_key.c_str());
GE_CHK_STATUS_RET_NOLOG(
relation_reader_->BatchGetEndpoints(invoked_model_queue_info->output_queue_names, model_input_endpoints));
for (size_t feed_index = 0UL; feed_index < invoked_model_queue_info->output_queue_names.size(); ++feed_index) {
ModelQueueIndex feed_queue_index{model_endpoint_info.model_name, invoke_model_key,
static_cast<int32_t>(feed_index)};
model_queue_ids.emplace_back(std::move(feed_queue_index));
}
}
GE_CHK_BOOL_RET_STATUS(model_queue_ids.size() == model_input_endpoints.size(), INTERNAL_ERROR,
"model_queue_ids.size=%zu is not same as model_input_endpoints.size=%zu, model=%s",
model_queue_ids.size(), model_input_endpoints.size(), model_instance_name.c_str());
auto &submodel_info = MutableSubmodelInfo(model_instance_name);
for (size_t i = 0UL; i < model_input_endpoints.size(); ++i) {
const auto &model_queue_id = model_queue_ids[i];
const auto endpoint = model_input_endpoints[i];
const auto &endpoint_name = endpoint->GetName();
if (endpoint->GetEndpointType() == EndpointType::kEvent) {
GELOGD("Endpoint name = %s is event def. Skip bind endpoints.", endpoint_name.c_str());
continue;
}
const auto &src_endpoint_indices = src_endpoint_indices_[endpoint_name];
if (src_endpoint_indices.empty()) {
GELOGE(PARAM_INVALID, "Failed to find enqueue operation for queue [%s]", endpoint_name.c_str());
return PARAM_INVALID;
}
for (auto src_endpoint_index : src_endpoint_indices) {
const auto &src_endpoint = deploy_plan_.queues_[static_cast<size_t>(src_endpoint_index)];
if (CheckSkipBinding(src_endpoint.model_instance_name, model_instance_name)) {
GELOGI("Skip bind endpoints: name = %s, from %s to %s:%d@%s", endpoint_name.c_str(),
src_endpoint.model_instance_name.c_str(), model_endpoint_info.model_name.c_str(), model_queue_id.id,
submodel_info.queue_device_info.GetDesc().c_str());
continue;
}
auto &dst_endpoint_groups = endpoint_pairs_[src_endpoint_index];
auto queue_info = BuildQueueInfo(*endpoint, model_instance_name);
GE_CHK_STATUS_RET(AdjustDequeueDevice(queue_info, src_endpoint_indices), "Failed to adjust dequeue device");
queue_info.name = GetEndpointFullName(queue_info, model_queue_id);
relation_dst_to_src_[queue_info.name].emplace(src_endpoint_index);
GELOGD("Bind endpoints: name = %s, from %s to %s:%d@%s, invoke_key=%s, queue device info=%s.",
endpoint_name.c_str(), src_endpoint.model_instance_name.c_str(), model_endpoint_info.model_name.c_str(),
model_queue_id.id, submodel_info.device_info.GetDesc().c_str(), model_queue_id.invoke_key.c_str(),
queue_info.device_info.GetDesc().c_str());
dst_endpoint_groups[model_queue_id].emplace_back(std::move(queue_info));
}
}
return SUCCESS;
}
Status DeployPlannerBase::AdjustDequeueDevice(DeployPlan::QueueInfo &dst_endpoint,
const std::vector<int32_t> &src_endpoint_indices) {
if (!dst_endpoint.device_info.WithProxy()) {
return SUCCESS;
}
if (dst_endpoint.device_info.SupportFlowgw()) {
return SUCCESS;
}
std::map<DeployPlan::DeviceInfo, size_t> local_device_used;
for (auto src_endpoint_index : src_endpoint_indices) {
const auto &src_endpoint = deploy_plan_.queues_[static_cast<size_t>(src_endpoint_index)];
if (src_endpoint.device_info.GetNodeId() == dst_endpoint.device_info.GetNodeId()) {
local_device_used[src_endpoint.device_info]++;
}
}
if ((!IsMultiDeployed(dst_endpoint.model_instance_name)) &&
(local_device_used.size() == 1U) &&
(local_device_used.begin()->first.GetType() == static_cast<int32_t>(NPU))) {
GELOGI("Adjust queue[%s] device from [%s] to [%s]",
dst_endpoint.name.c_str(),
dst_endpoint.device_info.GetDesc().c_str(), local_device_used.begin()->first.GetDesc().c_str());
dst_endpoint.device_info = local_device_used.begin()->first;
return SUCCESS;
}
dst_endpoint.device_info = dst_endpoint.device_info.ProxyDevice();
return SUCCESS;
}
Status DeployPlannerBase::AdjustEnqueueDevice(
DeployPlan::QueueInfo &src_endpoint,
const std::map<ModelQueueIndex, std::vector<DeployPlan::QueueInfo>> &dst_endpoints) const {
if (!src_endpoint.device_info.WithProxy()) {
return SUCCESS;
}
if (src_endpoint.device_info.SupportFlowgw()) {
return SUCCESS;
}
std::map<DeployPlan::DeviceInfo, size_t> local_device_used;
for (const auto &dst_loc_and_queue_info : dst_endpoints) {
for (const auto &dst_queue_info : dst_loc_and_queue_info.second) {
if (src_endpoint.device_info.GetNodeId() == dst_queue_info.device_info.GetNodeId()) {
local_device_used[dst_queue_info.device_info]++;
}
}
}
if ((!IsMultiDeployed(src_endpoint.model_instance_name)) &&
(local_device_used.size() == 1U) &&
(local_device_used.begin()->first.GetType() == static_cast<int32_t>(NPU))) {
GELOGI("Adjust queue[%s] device from [%s] to [%s]",
src_endpoint.name.c_str(),
src_endpoint.device_info.GetDesc().c_str(), local_device_used.begin()->first.GetDesc().c_str());
src_endpoint.device_info = local_device_used.begin()->first;
return SUCCESS;
}
src_endpoint.device_info = src_endpoint.device_info.ProxyDevice();
return SUCCESS;
}
Status DeployPlannerBase::AdjustEnqueueDevices() {
for (const auto &endpoint_pair : endpoint_pairs_) {
const auto src_endpoint_idx = endpoint_pair.first;
auto &src_endpoint = deploy_plan_.queues_[src_endpoint_idx];
GE_CHK_STATUS_RET(AdjustEnqueueDevice(src_endpoint, endpoint_pair.second), "Failed to adjust enqueue device");
}
for (const auto &endpoint_pair : deploy_plan_.dynamic_sched_plan_.endpoint_pairs_) {
const auto src_endpoint_idx = endpoint_pair.first;
auto &src_endpoint = deploy_plan_.queues_[src_endpoint_idx];
GE_CHK_STATUS_RET(AdjustEnqueueDevice(src_endpoint, endpoint_pair.second), "Failed to adjust dynamic enqueue device");
}
return SUCCESS;
}
void DeployPlannerBase::LogDataFlow() const {
if (!IsLogEnable(GE_MODULE_NAME, DLOG_DEBUG)) {
return;
}
for (const auto &endpoint_pair : endpoint_pairs_) {
const auto src_endpoint_idx = endpoint_pair.first;
const auto &src_endpoint_info = deploy_plan_.queues_[src_endpoint_idx];
std::map<ModelQueueIndex, std::vector<std::string>> group_by_dst_loc;
for (const auto &dst_loc_and_queue_info : endpoint_pair.second) {
for (const auto &dst_queue_info : dst_loc_and_queue_info.second) {
group_by_dst_loc[dst_loc_and_queue_info.first].emplace_back(dst_queue_info.device_info.GetDesc());
}
}
GELOGD("Bindings for queue [%s@%s] are:", src_endpoint_info.name.c_str(),
src_endpoint_info.device_info.GetDesc().c_str());
for (const auto &it : group_by_dst_loc) {
GELOGD(" %s:%d@%s", it.first.model_name.c_str(), it.first.id, ToString(it.second).c_str());
}
}
}
Status DeployPlannerBase::ResolveReusableQueues() {
for (const auto &endpoint_pair : endpoint_pairs_) {
const auto src_endpoint_idx = endpoint_pair.first;
auto &src_endpoint_info = deploy_plan_.queues_[src_endpoint_idx];
const auto &queue_name = src_endpoint_info.name;
if (!src_endpoint_info.owned) {
GELOGD("Queue[%s@%s] is external.", queue_name.c_str(), src_endpoint_info.device_info.GetDesc().c_str());
continue;
}
if (endpoint_pair.second.size() != 1U) {
GELOGD("Queue[%s@%s] has one-to-many relation to models", queue_name.c_str(),
src_endpoint_info.device_info.GetDesc().c_str());
continue;
}
const auto &dst_queue_infos = *endpoint_pair.second.begin();
if (dst_queue_infos.second.size() != 1U) {
GELOGD("Queue[%s@%s] has multi-device dest endpoints", queue_name.c_str(),
src_endpoint_info.device_info.GetDesc().c_str());
continue;
}
const auto &dst_endpoint_name = dst_queue_infos.second.begin()->name;
const auto &it = relation_dst_to_src_.find(dst_endpoint_name);
if (it != relation_dst_to_src_.end() && it->second.size() != 1U) {
GELOGD("Queue[%s@%s] has many-to-one src endpoints.", dst_endpoint_name.c_str(),
dst_queue_infos.second.begin()->device_info.GetDesc().c_str());
continue;
}
const auto &dst_device_info = dst_queue_infos.second.begin()->device_info;
if (src_endpoint_info.device_info.GetNodeId() != dst_device_info.GetNodeId()) {
GELOGD("Queue[%s@%d] has diff node[%d] dest endpoints",
queue_name.c_str(), src_endpoint_info.device_info.GetNodeId(), dst_device_info.GetNodeId());
continue;
}
if ((src_endpoint_info.device_info.GetType() == dst_device_info.GetType()) &&
(src_endpoint_info.device_info.GetType() == static_cast<int32_t>(NPU)) &&
(src_endpoint_info.device_info.GetDeviceId() != dst_device_info.GetDeviceId())) {
GELOGD("Queue[%s@%s] on npu has diff device endpoints, device = [%s]", queue_name.c_str(),
src_endpoint_info.device_info.GetDesc().c_str(), dst_device_info.GetDesc().c_str());
continue;
}
if (src_endpoint_info.device_info.GetType() != dst_device_info.GetType() &&
dst_device_info.GetType() == static_cast<int32_t>(NPU)) {
GELOGI("Queue[%s@%s] reuse npu device[%s] on same node.",
queue_name.c_str(), src_endpoint_info.device_info.GetDesc().c_str(), dst_device_info.GetDesc().c_str());
src_endpoint_info.device_info = dst_device_info;
}
GELOGD("Queue[%s@%s] is reusable, index = %d", queue_name.c_str(), src_endpoint_info.device_info.GetDesc().c_str(),
src_endpoint_idx);
(void)reusable_queue_indices_.emplace(src_endpoint_idx);
}
return SUCCESS;
}
bool DeployPlannerBase::IsOutputMultiConnected(const int32_t src_endpoint_idx) {
const auto &it = endpoint_pairs_.find(src_endpoint_idx);
if (it == endpoint_pairs_.cend()) {
return false;
}
const auto &src_endpoint_info = deploy_plan_.queues_[src_endpoint_idx];
if (it->second.size() != 1U) {
GELOGD("Queue[%s@%s] has one-to-many relation to models", src_endpoint_info.name.c_str(),
src_endpoint_info.device_info.GetDesc().c_str());
return true;
}
const auto &dst_queue_infos = *(it->second.begin());
if (dst_queue_infos.second.size() != 1U) {
GELOGD("Queue[%s@%s] has multi-device dest endpoints", src_endpoint_info.name.c_str(),
src_endpoint_info.device_info.GetDesc().c_str());
return true;
}
return false;
}
bool DeployPlannerBase::IsInputMultiConnected(const int32_t dst_endpoint_idx) {
const auto &dst_endpoint_info = deploy_plan_.queues_[dst_endpoint_idx];
const auto &it = relation_dst_to_src_.find(dst_endpoint_info.name);
return (it != relation_dst_to_src_.cend()) && (it->second.size() > 1U);
}
Status DeployPlannerBase::AssignDequeueQueues() {
std::map<std::string, std::map<std::string, std::map<int32_t, int32_t>>> model_input_indices;
std::map<std::string, int32_t> external_input_indices;
std::map<std::string, std::set<int32_t>> model_control_input_indices;
for (const auto &endpoint_pair : endpoint_pairs_) {
const auto src_endpoint_idx = endpoint_pair.first;
for (const auto &queue_loc_and_queue_infos : endpoint_pair.second) {
const auto &model_queue_loc = queue_loc_and_queue_infos.first;
for (size_t i = 0; i < queue_loc_and_queue_infos.second.size(); ++i) {
const auto &queue_info = queue_loc_and_queue_infos.second[i];
int32_t dst_endpoint_idx = -1;
const auto &model_instance_name = queue_info.model_instance_name;
if (reusable_queue_indices_.count(src_endpoint_idx) > 0UL) {
GELOGD("Reuse src queue, queue name = %s, queue index = %d",
deploy_plan_.queues_[src_endpoint_idx].name.c_str(), src_endpoint_idx);
dst_endpoint_idx = src_endpoint_idx;
} else {
GE_CHK_STATUS_RET_NOLOG(GetOrCreateInputEndpoint(model_queue_loc, queue_info, dst_endpoint_idx));
GE_CHK_STATUS_RET_NOLOG(ResolveDequeueFusion(src_endpoint_idx, dst_endpoint_idx));
InputGroupAttr input_group_attr = {};
input_group_attr.instance_num = static_cast<int32_t>(queue_loc_and_queue_infos.second.size());
input_group_attr.instance_idx = static_cast<int32_t>(i);
GE_CHK_STATUS_RET(PrepareRelations(src_endpoint_idx,
dst_endpoint_idx,
model_queue_loc,
queue_info,
input_group_attr),
"Failed to prepare relations");
GELOGI("Prepare relation success, src = %s, index = %d, dst = %s, index = %d",
ToEndpointDesc(src_endpoint_idx).c_str(), src_endpoint_idx,
ToEndpointDesc(dst_endpoint_idx).c_str(), dst_endpoint_idx);
}
deploy_plan_.src_to_dst_endpoints_[src_endpoint_idx][model_queue_loc].emplace_back(dst_endpoint_idx);
if (queue_info.queue_action == DeployPlan::QueueAction::kControl) {
model_control_input_indices[model_instance_name].emplace(dst_endpoint_idx);
} else if (model_queue_loc.id >= 0) {
model_input_indices[model_instance_name][model_queue_loc.invoke_key][model_queue_loc.id] = dst_endpoint_idx;
} else {
external_input_indices[model_instance_name] = dst_endpoint_idx;
}
}
}
}
for (const auto &name_and_input_indices : model_control_input_indices) {
auto &submodel_info = MutableSubmodelInfo(name_and_input_indices.first);
submodel_info.control_input_queue_indices.assign(name_and_input_indices.second.cbegin(),
name_and_input_indices.second.cend());
}
for (const auto &name_and_input_indices : model_input_indices) {
auto &submodel_info = MutableSubmodelInfo(name_and_input_indices.first);
for (const auto &group_indices : name_and_input_indices.second) {
const auto &invoke_key = group_indices.first;
for (const auto &input_index_and_endpoint_index : group_indices.second) {
if (!invoke_key.empty()) {
submodel_info.invoked_model_queue_infos[invoke_key].fetch_queue_indices.emplace_back(
input_index_and_endpoint_index.second);
continue;
}
submodel_info.input_queue_indices.emplace_back(input_index_and_endpoint_index.second);
}
}
}
for (const auto &name_and_input_index : external_input_indices) {
auto &submodel_info = MutableSubmodelInfo(name_and_input_index.first);
submodel_info.input_queue_indices.emplace_back(name_and_input_index.second);
}
return SUCCESS;
}
bool DeployPlannerBase::IsMultiDeployed(const std::string &model_instance_name) const {
const auto &ins_it = instance_to_model_name_.find(model_instance_name);
if (ins_it == instance_to_model_name_.cend()) {
return false;
}
const auto &model_name = ins_it->second;
const auto &location_it = model_deploy_locations_.find(model_name);
return (location_it != model_deploy_locations_.cend()) && (location_it->second.size() > 1U);
}
void DeployPlannerBase::AddInputGroups(const int32_t dst_endpoint_idx,
const int32_t src_tag_idx,
const InputGroupAttr &input_group_attr) {
auto &input_group = input_groups_[dst_endpoint_idx];
auto it = std::find(input_group.begin(), input_group.end(), src_tag_idx);
if (it == input_group.end()) {
input_groups_[dst_endpoint_idx].emplace_back(src_tag_idx);
input_groups_attr_[dst_endpoint_idx] = input_group_attr;
}
}
bool DeployPlannerBase::CheckAndAddRelation(const int32_t src_endpoint_idx,
const int32_t dst_endpoint_idx,
const std::string &suffix) {
const std::string relation_key = std::to_string(src_endpoint_idx) + "_to_" +
std::to_string(dst_endpoint_idx) + suffix;
const bool relation_added = relations_.find(relation_key) != relations_.end();
if (!relation_added) {
(void) relations_.emplace(relation_key);
}
GELOGD("Check and add relation[%s] success, relation added = %d.",
relation_key.c_str(), static_cast<int32_t>(relation_added));
return relation_added;
}
void DeployPlannerBase::GenTagEntityPair(int32_t endpoint_idx,
const DeployPlan::QueueInfo &mapping_queue_info,
std::pair<DeployPlan::QueueInfo, DeployPlan::QueueInfo> &entity_pair) {
auto tag_name = mapping_queue_info.name + "_" + std::to_string(endpoint_idx);
const auto &queue_info = deploy_plan_.queues_[endpoint_idx];
entity_pair.first = queue_info;
entity_pair.first.name = tag_name;
entity_pair.second = mapping_queue_info;
entity_pair.second.name = tag_name;
}
Status DeployPlannerBase::GetOrCreateMappingTagPairEntry(const int32_t endpoint_idx,
const DeployPlan::QueueInfo &mapping_queue_info,
std::pair<int32_t, int32_t> &tag_pair,
bool use_balanced) {
const auto &queue_info = deploy_plan_.queues_[endpoint_idx];
auto src_is_multi_deployed = IsMultiDeployed(queue_info.model_instance_name);
auto dst_is_multi_deployed = IsMultiDeployed(mapping_queue_info.model_instance_name);
auto device_key = mapping_queue_info.device_info.GetKey();
auto mapping_key = std::make_pair(endpoint_idx, device_key);
auto get_from_cache = (!src_is_multi_deployed && !dst_is_multi_deployed) || (!use_balanced);
if (get_from_cache) {
const auto &it = endpoint_device_tags_mapping_.find(mapping_key);
if (it != endpoint_device_tags_mapping_.cend()) {
tag_pair = it->second;
return SUCCESS;
}
}
std::pair<DeployPlan::QueueInfo, DeployPlan::QueueInfo> entity_pair;
GenTagEntityPair(endpoint_idx, mapping_queue_info, entity_pair);
GE_CHK_STATUS_RET(CreateGroupEntry(entity_pair.first, tag_pair.first), "Failed to create group entity.");
GE_CHK_STATUS_RET(CreateGroupEntry(entity_pair.second, tag_pair.second), "Failed to create group entity.");
if (get_from_cache) {
endpoint_device_tags_mapping_[mapping_key] = tag_pair;
}
GELOGI("Endpoint[%d] add mapping tag pair[%d,%d] success, mapping device = %s",
endpoint_idx, tag_pair.first, tag_pair.second, mapping_queue_info.device_info.GetDesc().c_str());
return SUCCESS;
}
Status DeployPlannerBase::GetOrCreateMappingEntry(const int32_t endpoint_idx,
const DeployPlan::QueueInfo &mapping_queue_info,
int32_t &mapping_idx) {
const auto &queue_info = deploy_plan_.queues_[endpoint_idx];
auto src_is_multi_deployed = IsMultiDeployed(queue_info.model_instance_name);
auto dst_is_multi_deployed = IsMultiDeployed(mapping_queue_info.model_instance_name);
auto device_key = mapping_queue_info.device_info.GetKey();
auto mapping_key = std::make_pair(endpoint_idx, device_key);
auto get_from_cache = (!src_is_multi_deployed && !dst_is_multi_deployed);
if (get_from_cache) {
const auto &it = endpoint_device_mapping_.find(mapping_key);
if (it != endpoint_device_mapping_.cend()) {
mapping_idx = it->second;
return SUCCESS;
}
}
auto entry_info = queue_info;
entry_info.device_info = mapping_queue_info.device_info;
int32_t queue_index = 0;
auto mapping_desc = mapping_queue_info.device_info.GetDesc();
GE_CHK_STATUS_RET(CreateGroupQueueEntry(entry_info, queue_index, mapping_idx),
"Failed to create group entity.");
if (get_from_cache) {
endpoint_device_mapping_[mapping_key] = mapping_idx;
}
GELOGI("Endpoint[%d] add mapping endpoint[%d] success, mapping device = %s",
endpoint_idx, queue_index, mapping_desc.c_str());
return SUCCESS;
}
Status DeployPlannerBase::PrepareDiffNodeRelation(const int32_t src_endpoint_idx,
const int32_t dst_endpoint_idx,
const ModelQueueIndex &model_queue_loc,
const DeployPlan::QueueInfo &queue_info,
const InputGroupAttr &input_group_attr) {
auto deploy_location_key = queue_info.device_info.GetKey() + "_" + std::to_string(queue_info.process_id);
const auto src_queue_info = deploy_plan_.queues_[src_endpoint_idx];
auto src_queue_index = src_endpoint_idx;
auto dst_queue_index = dst_endpoint_idx;
if (src_queue_info.device_info.WithProxy()) {
auto proxy_queue_info = queue_info;
proxy_queue_info.device_info = src_queue_info.device_info.ProxyDevice();
int32_t mapping_index = 0;
GE_CHK_STATUS_RET(GetOrCreateMappingEntry(src_endpoint_idx, proxy_queue_info, mapping_index),
"Failed to create mapping entity.");
(void)output_groups_[src_endpoint_idx][model_queue_loc].emplace(deploy_location_key, mapping_index);
src_queue_index = deploy_plan_.group_entries_[mapping_index].ref_index;
GELOGI("Prepare relation, src index = %d, src name = %s, mapping index = %d, "
"model name = %s, input index = %d",
src_endpoint_idx, ToEndpointDesc(src_endpoint_idx).c_str(), mapping_index,
model_queue_loc.model_name.c_str(), model_queue_loc.id);
deploy_plan_.dynamic_sched_plan_.entry_to_dst_index_[mapping_index] = dst_queue_index;
GELOGI("DynamicSched, Step1, add group entry index=%d, dest endpoint idx=%d.", mapping_index, dst_queue_index);
} else if (!src_queue_info.owned) {
int32_t mapping_index = 0;
auto inner_queue_info = queue_info;
inner_queue_info.device_info = src_queue_info.device_info;
GE_CHK_STATUS_RET(GetOrCreateMappingEntry(src_endpoint_idx, inner_queue_info, mapping_index),
"Failed to create mapping entity.");
(void)output_groups_[src_endpoint_idx][model_queue_loc].emplace(deploy_location_key, mapping_index);
src_queue_index = deploy_plan_.group_entries_[mapping_index].ref_index;
deploy_plan_.queues_[src_queue_index].owned = true;
GELOGI("Prepare relation, src index = %d, src name = %s, mapping index = %d, "
"model name = %s, input index = %d",
src_endpoint_idx, ToEndpointDesc(src_endpoint_idx).c_str(), mapping_index,
model_queue_loc.model_name.c_str(), model_queue_loc.id);
deploy_plan_.dynamic_sched_plan_.entry_to_dst_index_[mapping_index] = dst_queue_index;
}
auto dst_queue_info = queue_info;
if (queue_info.device_info.WithProxy()) {
dst_queue_info.device_info = queue_info.device_info.ProxyDevice();
}
std::pair<int32_t, int32_t> tag_pair;
GE_CHK_STATUS_RET(GetOrCreateMappingTagPairEntry(src_queue_index, dst_queue_info, tag_pair),
"Failed to create mapping tag pair entity.");
(void)output_groups_[src_queue_index][model_queue_loc].emplace(deploy_location_key, tag_pair.second);
GELOGI("Prepare relation, src index = %d, src name = %s, entity index = %d, model name = %s, input index = %d",
src_queue_index, ToEndpointDesc(src_queue_index).c_str(), tag_pair.second,
model_queue_loc.model_name.c_str(), model_queue_loc.id);
deploy_plan_.dynamic_sched_plan_.entry_to_dst_index_[tag_pair.second] = dst_queue_index;
GELOGI("DynamicSched, Step1, add group entry index=%d, dest endpoint idx=%d.", tag_pair.second, dst_queue_index);
if (queue_info.device_info.WithProxy()) {
int32_t mapping_index = 0;
GE_CHK_STATUS_RET(GetOrCreateMappingEntry(src_queue_index, dst_queue_info, mapping_index),
"Failed to create mapping entity.");
dst_queue_index = deploy_plan_.group_entries_[mapping_index].ref_index;
AddInputGroups(dst_endpoint_idx, mapping_index, input_group_attr);
GELOGI("Prepare relation, mapping index = %d, dst index = %d, dst name = %s",
mapping_index, dst_endpoint_idx, ToEndpointDesc(dst_endpoint_idx).c_str());
}
AddInputGroups(dst_queue_index, tag_pair.first, input_group_attr);
GELOGI("Prepare relation, src tag index = %d, dst index = %d, dst name = %s",
tag_pair.first, dst_queue_index, ToEndpointDesc(dst_queue_index).c_str());
return SUCCESS;
}
Status DeployPlannerBase::PrepareSameNodeRelation(const int32_t src_endpoint_idx,
const int32_t dst_endpoint_idx,
const ModelQueueIndex &model_queue_loc,
const DeployPlan::QueueInfo &queue_info,
const InputGroupAttr &input_group_attr) {
auto deploy_location_key = queue_info.device_info.GetKey() + "_" + std::to_string(queue_info.process_id);
const auto src_queue_info = deploy_plan_.queues_[src_endpoint_idx];
if (CanConnectWithLocalQ(src_queue_info.device_info, queue_info.device_info)) {
GE_CHK_STATUS_RET(PrepareQueuesRelation(src_endpoint_idx,
dst_endpoint_idx,
model_queue_loc,
queue_info,
input_group_attr),
"Failed to prepare relation of queues");
} else if (src_queue_info.device_info.WithProxy()) {
int32_t mapping_index = 0;
GE_CHK_STATUS_RET(GetOrCreateMappingEntry(src_endpoint_idx, queue_info, mapping_index),
"Failed to create mapping entity.");
(void)output_groups_[src_endpoint_idx][model_queue_loc].emplace(deploy_location_key, mapping_index);
AddInputGroups(dst_endpoint_idx, mapping_index, input_group_attr);
deploy_plan_.dynamic_sched_plan_.entry_to_dst_index_[mapping_index] = dst_endpoint_idx;
GELOGI("Prepare relation, src index = %d, src name = %s, mapping index = %d, dst index = %d, dst name = %s, "
"model name = %s, input index = %d",
src_endpoint_idx, ToEndpointDesc(src_endpoint_idx).c_str(), mapping_index,
dst_endpoint_idx, ToEndpointDesc(dst_endpoint_idx).c_str(),
model_queue_loc.model_name.c_str(), model_queue_loc.id);
} else if (queue_info.device_info.WithProxy()) {
int32_t mapping_index = 0;
GE_CHK_STATUS_RET(GetOrCreateMappingEntry(dst_endpoint_idx, src_queue_info, mapping_index),
"Failed to create mapping entity.");
(void)output_groups_[src_endpoint_idx][model_queue_loc].emplace(deploy_location_key, mapping_index);
AddInputGroups(dst_endpoint_idx, mapping_index, input_group_attr);
deploy_plan_.dynamic_sched_plan_.entry_to_dst_index_[mapping_index] = dst_endpoint_idx;
GELOGI("Prepare relation, src index = %d, src name = %s, mapping index = %d, dst index = %d, dst name = %s, "
"model name = %s, input index = %d",
src_endpoint_idx, ToEndpointDesc(src_endpoint_idx).c_str(), mapping_index,
dst_endpoint_idx, ToEndpointDesc(dst_endpoint_idx).c_str(),
model_queue_loc.model_name.c_str(), model_queue_loc.id);
} else {
std::pair<int32_t, int32_t> tag_pair;
GE_CHK_STATUS_RET(GetOrCreateMappingTagPairEntry(src_endpoint_idx, queue_info, tag_pair),
"Failed to create mapping tag pair entity.");
(void)output_groups_[src_endpoint_idx][model_queue_loc].emplace(deploy_location_key, tag_pair.second);
AddInputGroups(dst_endpoint_idx, tag_pair.first, input_group_attr);
deploy_plan_.dynamic_sched_plan_.entry_to_dst_index_[tag_pair.second] = dst_endpoint_idx;
GELOGI("Prepare relation, src index = %d, src name = %s, src tag index = %d, "
"dst index = %d, dst name = %s, dst tag index = %d, "
"model name = %s, input index = %d",
src_endpoint_idx, ToEndpointDesc(src_endpoint_idx).c_str(), tag_pair.first,
dst_endpoint_idx, ToEndpointDesc(dst_endpoint_idx).c_str(), tag_pair.second,
model_queue_loc.model_name.c_str(), model_queue_loc.id);
}
return SUCCESS;
}
Status DeployPlannerBase::PrepareQueuesRelation(const int32_t src_endpoint_idx,
const int32_t dst_endpoint_idx,
const ModelQueueIndex &model_queue_loc,
const DeployPlan::QueueInfo &queue_info,
const InputGroupAttr &input_group_attr) {
auto deploy_location_key = queue_info.device_info.GetKey() + "_" + std::to_string(queue_info.process_id);
auto src_is_multi_connected = IsOutputMultiConnected(src_endpoint_idx);
auto dst_is_multi_connected = IsInputMultiConnected(dst_endpoint_idx);
if (src_is_multi_connected && dst_is_multi_connected) {
int32_t mapping_index = 0;
GE_CHK_STATUS_RET(GetOrCreateMappingEntry(src_endpoint_idx, queue_info, mapping_index),
"Failed to create mapping entity.");
(void)output_groups_[src_endpoint_idx][model_queue_loc].emplace(deploy_location_key, mapping_index);
AddInputGroups(dst_endpoint_idx, mapping_index, input_group_attr);
GELOGI("Prepare relation, src index = %d, src name = %s, mapping index = %d, dst index = %d, dst name = %s, "
"model name = %s, input index = %d",
src_endpoint_idx, ToEndpointDesc(src_endpoint_idx).c_str(), mapping_index,
dst_endpoint_idx, ToEndpointDesc(dst_endpoint_idx).c_str(),
model_queue_loc.model_name.c_str(), model_queue_loc.id);
deploy_plan_.dynamic_sched_plan_.entry_to_dst_index_[mapping_index] = dst_endpoint_idx;
GELOGI("DynamicSched, Step1, add group entry index=%d, dest endpoint idx=%d.", mapping_index, dst_endpoint_idx);
} else if (dst_is_multi_connected) {
int32_t entry_index = 0;
GE_CHK_STATUS_RET(CreateGroupRefEntry(queue_info, src_endpoint_idx, entry_index),
"Failed to create group entity.");
AddInputGroups(dst_endpoint_idx, entry_index, input_group_attr);
GELOGI("Prepare relation, ref entity index = %d, dst index = %d, dst name = %s",
entry_index, dst_endpoint_idx, ToEndpointDesc(dst_endpoint_idx).c_str());
} else {
int32_t entry_index = 0;
GE_CHK_STATUS_RET(CreateGroupRefEntry(queue_info, dst_endpoint_idx, entry_index),
"Failed to create group entity.");
(void)output_groups_[src_endpoint_idx][model_queue_loc].emplace(deploy_location_key, entry_index);
GELOGI("Prepare relation, src index = %d, src name = %s, ref entity index = %d, model name = %s, input index = %d",
src_endpoint_idx, ToEndpointDesc(src_endpoint_idx).c_str(), entry_index,
model_queue_loc.model_name.c_str(), model_queue_loc.id);
deploy_plan_.dynamic_sched_plan_.entry_to_dst_index_[entry_index] = dst_endpoint_idx;
GELOGI("DynamicSched, Step1, add group entry index=%d, dest endpoint idx=%d.", entry_index, dst_endpoint_idx);
}
return SUCCESS;
}
Status DeployPlannerBase::PrepareRelations(const int32_t src_endpoint_idx,
const int32_t dst_endpoint_idx,
const ModelQueueIndex &model_queue_loc,
const DeployPlan::QueueInfo &queue_info,
const InputGroupAttr &input_group_attr) {
UpdateFusionOffset(src_endpoint_idx, dst_endpoint_idx);
const int32_t src_ref_idx = deploy_plan_.queues_[src_endpoint_idx].ref_index;
const int32_t dst_ref_idx = deploy_plan_.queues_[dst_endpoint_idx].ref_index;
const int32_t src_queue_idx = src_ref_idx >= 0 ? src_ref_idx : src_endpoint_idx;
const int32_t dst_queue_idx = dst_ref_idx >= 0 ? dst_ref_idx : dst_endpoint_idx;
if (CheckAndAddRelation(src_queue_idx, dst_queue_idx)) {
return SUCCESS;
}
const auto &src_device_info = deploy_plan_.queues_[src_queue_idx].device_info;
if (src_device_info.GetNodeId() == queue_info.device_info.GetNodeId()) {
GELOGI("Begin to prepare relation of same node, node_id = %d", src_device_info.GetNodeId());
GE_CHK_STATUS_RET(PrepareSameNodeRelation(src_queue_idx,
dst_queue_idx,
model_queue_loc,
queue_info,
input_group_attr),
"Failed to prepare relation of same node");
} else {
GELOGI("Begin to prepare relation of diff node, src node_id = %d, dst node_id = %d",
src_device_info.GetNodeId(), queue_info.device_info.GetNodeId());
GE_CHK_STATUS_RET(PrepareDiffNodeRelation(src_queue_idx,
dst_queue_idx,
model_queue_loc,
queue_info,
input_group_attr),
"Failed to prepare relation of diff node");
}
GELOGI("%s add input queue %s", ToEndpointDesc(dst_queue_idx).c_str(), ToEndpointDesc(src_queue_idx).c_str());
return SUCCESS;
}
void DeployPlannerBase::UpdateFusionOffset(int32_t src_index, int32_t dst_index) {
auto it = deploy_plan_.groups_.find(dst_index);
if (it != deploy_plan_.groups_.end()) {
const auto &group_elements = it->second;
for (auto index : group_elements) {
deploy_plan_.queues_[index].fusion_offset = deploy_plan_.queues_[src_index].fusion_offset;
GELOGI("Update queue[%d] fusion offset, offset = %d", index, deploy_plan_.queues_[index].fusion_offset);
}
}
deploy_plan_.queues_[dst_index].fusion_offset = deploy_plan_.queues_[src_index].fusion_offset;
GELOGI("Update queue[%d] fusion offset, offset = %d", dst_index, deploy_plan_.queues_[dst_index].fusion_offset);
}
void DeployPlannerBase::AddEndpointBindings(int32_t src_index, int32_t dst_index, bool skip_if_dst_exists) {
GELOGI("Begin to add bind relation[%d -> %d].", src_index, dst_index);
auto real_index = dst_index;
if (deploy_plan_.queues_[dst_index].ref_index >= 0) {
real_index = deploy_plan_.queues_[dst_index].ref_index;
GELOGI("Dst[%d] is ref endpoint, ref index = %d.", dst_index, real_index);
}
if (skip_if_dst_exists) {
auto it = deploy_plan_.dst_to_src_bindings_.find(real_index);
if (it != deploy_plan_.dst_to_src_bindings_.end()) {
GELOGI("Bind relation[%d -> %d] has been added already.", it->second, real_index);
return;
}
deploy_plan_.dst_to_src_bindings_[dst_index] = src_index;
}
deploy_plan_.queue_bindings_.emplace_back(src_index, dst_index);
GELOGI("Add bind relation[%d -> %d] success.", src_index, real_index);
}
Status DeployPlannerBase::BindRemoteOutputGroupToInput() {
for (const auto &it : input_groups_) {
const auto endpoint_index = it.first;
const auto input_endpoint_info = deploy_plan_.queues_[endpoint_index];
const auto &group_attr = input_groups_attr_[endpoint_index];
DeployPlan::QueueInfo group_info{};
group_info.device_info = input_endpoint_info.device_info;
group_info.model_instance_name = input_endpoint_info.model_instance_name;
group_info.instance_num = group_attr.instance_num;
group_info.instance_idx = group_attr.instance_idx;
std::vector<int32_t> group_entry_indices;
for (const auto group_entry_index : it.second) {
const auto &entry_info = deploy_plan_.group_entries_[group_entry_index];
if (no_group_endpoint_names_.find(entry_info.name) != no_group_endpoint_names_.cend()) {
group_info.name = input_endpoint_info.name + "_" + std::to_string(group_entry_index);
GE_CHK_STATUS_RET_NOLOG(CreateAndBindGroup(group_info, {group_entry_index}, endpoint_index, false));
} else {
group_entry_indices.emplace_back(group_entry_index);
}
}
if (!group_entry_indices.empty()) {
group_info.name = input_endpoint_info.name;
GE_CHK_STATUS_RET_NOLOG(CreateAndBindGroup(group_info, group_entry_indices, endpoint_index));
}
}
return SUCCESS;
}
Status DeployPlannerBase::CreateAndBindGroup(const DeployPlan::QueueInfo &group_info,
const std::vector<int32_t> &group_entry_index,
const int32_t dst_endpoint_index,
const bool skip_if_dst_exists) {
int32_t group_index = -1;
GE_CHK_STATUS_RET_NOLOG(CreateGroupInfo(group_info, group_entry_index, group_index));
AddEndpointBindings(group_index, dst_endpoint_index, skip_if_dst_exists);
GELOGD("Input group binding added, peer = %s, local = %s@%s",
ToString(ToEndpointDescs(deploy_plan_.groups_[group_index], true)).c_str(), group_info.name.c_str(),
deploy_plan_.queues_[dst_endpoint_index].device_info.GetDesc().c_str());
return SUCCESS;
}
Status DeployPlannerBase::BindOutputToRemoteInputs() {
for (auto &it : output_groups_) {
const auto endpoint_index = it.first;
for (auto &grouped_peer_inputs : it.second) {
const auto &model_queue_loc = grouped_peer_inputs.first;
const auto &grouped_inputs = grouped_peer_inputs.second;
DeployPlan::QueueInfo group_info{};
const auto output_endpoint_info = deploy_plan_.queues_[endpoint_index];
group_info.name = output_endpoint_info.name;
group_info.device_info = output_endpoint_info.device_info;
group_info.model_instance_name = output_endpoint_info.model_instance_name;
int32_t group_index = -1;
std::vector<int32_t> grouped_inputs_order_by_device;
for (const auto &device_and_index : grouped_inputs) {
grouped_inputs_order_by_device.emplace_back(device_and_index.second);
}
GE_CHK_STATUS_RET(CreateGroupInfo(group_info, grouped_inputs_order_by_device, group_index));
AddEndpointBindings(endpoint_index, group_index);
GELOGD("Output group binding added, local = %s@%s, peer model = %s:%d, peer input indices = %s.",
group_info.name.c_str(), deploy_plan_.queues_[endpoint_index].device_info.GetDesc().c_str(),
model_queue_loc.model_name.c_str(), model_queue_loc.id,
ToString(ToEndpointDescs(deploy_plan_.groups_[group_index], true)).c_str());
}
}
return SUCCESS;
}
void DeployPlannerBase::UpdateDeployPlan() {
deploy_plan_.root_model_info_.input_queue_indices = std::move(head_model_info_.output_queue_indices);
deploy_plan_.root_model_info_.control_input_queue_indices = std::move(head_model_info_.control_output_queue_indices);
deploy_plan_.root_model_info_.output_queue_indices = std::move(tail_model_info_.input_queue_indices);
deploy_plan_.root_model_info_.control_output_queue_indices = std::move(tail_model_info_.control_input_queue_indices);
deploy_plan_.root_model_info_.queue_device_info = std::move(head_model_info_.queue_device_info);
}
DeployPlan::SubmodelInfo &DeployPlannerBase::MutableSubmodelInfo(const std::string &name) {
if (name == head_model_queue_info_.model_name) {
return head_model_info_;
} else if (name == tail_model_queue_info_.model_name) {
return tail_model_info_;
} else {
return deploy_plan_.submodels_[name];
}
}
bool DeployPlannerBase::IsHeadOrTail(const std::string &name) const {
return (name == head_model_queue_info_.model_name) || (name == tail_model_queue_info_.model_name);
}
const std::string &DeployPlannerBase::GetSubmodelType(const std::string &name) {
if (name == head_model_queue_info_.model_name || name == tail_model_queue_info_.model_name) {
return PNE_ID_CPU;
}
if (deploy_plan_.submodels_[name].model == nullptr) {
return PNE_ID_CPU;
}
return deploy_plan_.submodels_[name].model->GetModelType();
}
std::string DeployPlannerBase::ToEndpointDesc(const int32_t endpoint_indices, const bool is_group_entry) const {
const auto &endpoint_info =
is_group_entry ? deploy_plan_.group_entries_[endpoint_indices] : deploy_plan_.queues_[endpoint_indices];
auto desc = endpoint_info.name;
desc += ("@" + endpoint_info.device_info.GetDesc());
return desc;
}
std::vector<std::string> DeployPlannerBase::ToEndpointDescs(const std::vector<int32_t> &endpoint_indices,
const bool is_group_entry) const {
std::vector<std::string> ret;
(void)std::transform(endpoint_indices.cbegin(), endpoint_indices.cend(), std::back_inserter(ret),
[this, is_group_entry](const int32_t index) { return ToEndpointDesc(index, is_group_entry); });
return ret;
}
DeployPlan::QueueInfo DeployPlannerBase::BuildQueueInfo(const Endpoint &queue_def,
const std::string &model_instance_name) {
DeployPlan::QueueInfo queue_info{};
const auto &submodel_info = MutableSubmodelInfo(model_instance_name);
queue_info.device_info = submodel_info.queue_device_info;
queue_info.model_instance_name = model_instance_name;
const auto &name = queue_def.GetName();
queue_info.name = name.length() <= kMaxQueueNameLen ? name : GenShortName(name);
queue_info.process_id = submodel_info.process_id;
queue_info.depth = QueueNodeUtils::GetDepth(queue_def);
queue_info.enqueue_policy = QueueNodeUtils::GetEnqueuePolicy(queue_def);
bool use_proxy_device = false;
if (QueueNodeUtils::GetIsControl(queue_def)) {
queue_info.queue_action = DeployPlan::QueueAction::kControl;
} else if (QueueNodeUtils::GetIsStatus(queue_def)) {
use_proxy_device = true;
queue_info.queue_action = DeployPlan::QueueAction::kStatus;
} else if (QueueNodeUtils::GetIsSched(queue_def)) {
queue_info.queue_action = DeployPlan::QueueAction::kSched;
} else {
queue_info.queue_action = DeployPlan::QueueAction::kDefault;
}
use_proxy_device = use_proxy_device || IsNeedDeviceQueue(submodel_info);
if (use_proxy_device && queue_info.device_info.WithProxy()) {
queue_info.device_info = queue_info.device_info.ProxyDevice();
}
queue_info.is_dummy = (queue_def.GetEndpointType() == EndpointType::kDummyQueue);
queue_info.model_id = deploy_plan_.dynamic_sched_plan_.submodels_id_[model_instance_name];
GELOGI("[%s] queue depth = [%u], policy = [%s], model_id=[%u], queue device info=%s, use_proxy_device=%d.",
queue_def.GetName().c_str(), queue_info.depth, queue_info.enqueue_policy.c_str(), queue_info.model_id,
queue_info.device_info.GetDesc().c_str(), static_cast<int32_t>(use_proxy_device));
return queue_info;
}
Status DeployPlannerBase::CreateEndpointInfo(const DeployPlan::QueueInfo &queue_info, int32_t &queue_idx) {
const auto queue_size = deploy_plan_.queues_.size();
GE_CHK_STATUS_RET(CreateEndpointInfo(queue_info), "Create endpoint info failed.");
queue_idx = static_cast<int32_t>(queue_size);
GELOGI("Create endpoint success, index = %d", queue_idx);
return SUCCESS;
}
Status DeployPlannerBase::CreateEndpointInfo(const DeployPlan::QueueInfo &queue_info) {
const auto queue_size = deploy_plan_.queues_.size();
GE_CHECK_LE(queue_size, static_cast<size_t>(INT32_MAX));
deploy_plan_.queues_.emplace_back(queue_info);
return SUCCESS;
}
Status DeployPlannerBase::CreateGroupQueueEntry(const DeployPlan::QueueInfo &queue_info,
int32_t &queue_index,
int32_t &entry_index) {
GE_CHK_STATUS_RET(CreateEndpointInfo(queue_info, queue_index), "Failed to create endpoint.");
auto entry_info = queue_info;
entry_info.ref_index = queue_index;
GE_CHK_STATUS_RET(CreateGroupEntry(entry_info, entry_index), "Failed to create group entity.");
return SUCCESS;
}
Status DeployPlannerBase::CreateGroupRefEntry(const DeployPlan::QueueInfo &queue_info,
int32_t endpoint_index,
int32_t &entry_index) {
auto entry_info = queue_info;
entry_info.ref_index = endpoint_index;
GE_CHK_STATUS_RET(CreateGroupEntry(entry_info, entry_index), "Failed to create group entity.");
return SUCCESS;
}
Status DeployPlannerBase::CreateGroupEntry(const DeployPlan::QueueInfo &queue_info, int32_t &entry_index) {
const auto entry_size = deploy_plan_.group_entries_.size();
GE_CHECK_LE(entry_size, static_cast<size_t>(INT32_MAX));
deploy_plan_.group_entries_.emplace_back(queue_info);
entry_index = static_cast<int32_t>(entry_size);
return SUCCESS;
}
Status DeployPlannerBase::CreateGroupInfo(const DeployPlan::QueueInfo &queue_info,
const std::vector<int32_t> &grouped_indices,
int32_t &group_index) {
auto group_key = ToString(grouped_indices);
auto it = deploy_plan_.groups_key_to_idx_.find(group_key);
if (it != deploy_plan_.groups_key_to_idx_.end()) {
group_index = it->second;
return SUCCESS;
}
GE_CHK_STATUS_RET(CreateEndpointInfo(queue_info, group_index));
deploy_plan_.groups_[group_index] = grouped_indices;
deploy_plan_.groups_key_to_idx_[group_key] = group_index;
GELOGD("Group created, name = %s, group_index = %d, endpoint_indices = %s, endpoint_descs = %s",
queue_info.name.c_str(), group_index, ToString(grouped_indices).c_str(),
ToString(ToEndpointDescs(grouped_indices, true)).c_str());
return SUCCESS;
}
Status DeployPlannerBase::ResolveDequeueFusion(int32_t src_endpoint_idx, int32_t dst_endpoint_idx) {
GELOGD("Begin to resolve dequeue fusion, src index = %d, dst index %d.", src_endpoint_idx, dst_endpoint_idx);
const std::set<std::string> kSupportFusionEngines = {PNE_ID_NPU};
auto &endpoint = deploy_plan_.queues_[static_cast<size_t>(dst_endpoint_idx)];
const auto &dst_model_instance_name = endpoint.model_instance_name;
const auto &model_type = GetSubmodelType(dst_model_instance_name);
if (kSupportFusionEngines.find(model_type) == kSupportFusionEngines.end()) {
GELOGI("Dequeue fusion is unsupported for engine type[%s].", model_type.c_str());
return SUCCESS;
}
auto real_src_idx = src_endpoint_idx;
if (deploy_plan_.queues_[src_endpoint_idx].ref_index >= 0) {
real_src_idx = deploy_plan_.queues_[src_endpoint_idx].ref_index;
GELOGI("Src[%d] is ref endpoint, ref index = %d.", src_endpoint_idx, real_src_idx);
}
auto src_it = dequeue_ref_indices_.find(real_src_idx);
if (src_it == dequeue_ref_indices_.end()) {
GELOGI("Relation added, dst model_name = %s, src index = %d, dst index = %d.", dst_model_instance_name.c_str(),
real_src_idx, dst_endpoint_idx);
dequeue_ref_indices_[real_src_idx][dst_model_instance_name] = dst_endpoint_idx;
return SUCCESS;
}
const auto &dst_info = src_it->second;
auto dst_info_it = dst_info.find(dst_model_instance_name);
if (dst_info_it == dst_info.end()) {
GELOGI("Relation added, dst model_name = %s, src index = %d, dst index = %d.", dst_model_instance_name.c_str(),
real_src_idx, dst_endpoint_idx);
dequeue_ref_indices_[real_src_idx][dst_model_instance_name] = dst_endpoint_idx;
return SUCCESS;
}
endpoint.ref_index = dst_info_it->second;
GELOGI("Fusion dequeue endpoint[%d] success, ref index = %d.", dst_endpoint_idx, endpoint.ref_index);
return SUCCESS;
}
Status DeployPlannerBase::GetOrCreateInputEndpoint(const ModelQueueIndex &model_queue_index,
const DeployPlan::QueueInfo &queue_info, int32_t &endpoint_index) {
auto key = std::make_tuple(model_queue_index, queue_info.device_info.GetKey(), queue_info.process_id);
const auto &it = input_endpoint_indices_.find(key);
if (it != input_endpoint_indices_.cend()) {
endpoint_index = it->second;
return SUCCESS;
}
GE_CHK_STATUS_RET_NOLOG(CreateEndpointInfo(queue_info, endpoint_index));
GELOGD("Input endpoint created, queue name = %s, device = %s, index = %d", queue_info.name.c_str(),
queue_info.device_info.GetDesc().c_str(), endpoint_index);
input_endpoint_indices_[key] = endpoint_index;
return SUCCESS;
}
Status DeployPlannerBase::CreateOutputQueueDefs(const std::string &model_instance_name,
const std::vector<std::string> &queue_names, const bool is_owned) {
std::vector<const Endpoint *> endpoints;
GE_CHK_STATUS_RET_NOLOG(relation_reader_->BatchGetEndpoints(queue_names, endpoints));
for (size_t output_idx = 0U; output_idx < endpoints.size(); ++output_idx) {
const auto endpoint = endpoints[output_idx];
int32_t endpoint_index = -1;
auto queue_info = BuildQueueInfo(*endpoint, model_instance_name);
queue_info.owned = is_owned;
GE_CHK_STATUS_RET_NOLOG(CreateEndpointInfo(queue_info, endpoint_index));
src_endpoint_indices_[endpoint->GetName()].emplace_back(endpoint_index);
if (queue_info.owned) {
auto &submodel_info = MutableSubmodelInfo(queue_info.model_instance_name);
if (queue_info.queue_action == DeployPlan::QueueAction::kControl) {
submodel_info.control_output_queue_indices.emplace_back(endpoint_index);
} else {
submodel_info.output_queue_indices.emplace_back(endpoint_index);
}
}
GELOGD("Output endpoint created, model = %s, output_index = %zu, queue name = %s, queue index = %d",
model_instance_name.c_str(), output_idx, endpoint->GetName().c_str(), endpoint_index);
}
return SUCCESS;
}
Status DeployPlannerBase::CreateFeedEndpoints(const std::string &model_instance_name,
const std::vector<std::string> &queue_names,
const std::string &invoke_key) {
std::vector<const Endpoint *> endpoints;
GE_CHK_STATUS_RET_NOLOG(relation_reader_->BatchGetEndpoints(queue_names, endpoints));
for (size_t output_idx = 0UL; output_idx < endpoints.size(); ++output_idx) {
const auto endpoint = endpoints[output_idx];
if (endpoint->GetEndpointType() != EndpointType::kQueue) {
continue;
}
int32_t endpoint_index = -1;
auto queue_info = BuildQueueInfo(*endpoint, model_instance_name);
queue_info.owned = true;
GE_CHK_STATUS_RET_NOLOG(CreateEndpointInfo(queue_info, endpoint_index));
src_endpoint_indices_[endpoint->GetName()].emplace_back(endpoint_index);
auto &submodel_info = MutableSubmodelInfo(queue_info.model_instance_name);
submodel_info.invoked_model_queue_infos[invoke_key].feed_queue_indices.emplace_back(endpoint_index);
GELOGD("Feed endpoint created, model = %s, invoke_key = %s, feed_index = %zu, queue name = %s, queue index = %d",
model_instance_name.c_str(), invoke_key.c_str(), output_idx, endpoint->GetName().c_str(), endpoint_index);
}
return SUCCESS;
}
std::string DeployPlannerBase::GenShortName(const std::string &name) {
auto &short_name = short_names_[name];
if (short_name.empty()) {
short_name = "deploy_planner.auto_generated:" + std::to_string(endpoint_name_id_gen_++);
GELOGD("endpoint name too long, change from %s to %s", name.c_str(), short_name.c_str());
}
return short_name;
}
std::string DeployPlannerBase::GetEndpointFullName(const DeployPlan::QueueInfo &endpoint_info,
const ModelQueueIndex &model_queue_index) {
std::stringstream ss;
ss << endpoint_info.model_instance_name << ":" << model_queue_index.id << "_FROM_" << endpoint_info.name << "@"
<< endpoint_info.device_info.GetDesc() << "_T" << std::to_string(plan_id_gen_);
const auto &name = ss.str();
if (name.length() <= kMaxQueueNameLen) {
return name;
}
return GenShortName(name);
}
ModelRelationFlattener::ModelRelationFlattener(PneModelPtr root_model) : root_model_(std::move(root_model)) {}
Status ModelRelationFlattener::Flatten(ModelRelation &flattened_model_relation,
std::map<std::string, PneModelPtr> &name_to_models) {
const auto &model_relation = root_model_->GetModelRelation();
GE_CHECK_NOTNULL(model_relation, ", FlowModel's ModelRelation is nullptr, model_name = %s",
root_model_->GetModelName().c_str());
flattened_model_relation_.root_model_endpoint_info = model_relation->root_model_endpoint_info;
flattened_model_relation_.invoked_model_queue_infos = model_relation->invoked_model_queue_infos;
MergeEndpoints({}, model_relation->endpoints);
for (auto &it : model_relation->submodel_endpoint_infos) {
const auto &submodel_info_info = it.second;
const auto &model_name = it.first;
const auto submodel = root_model_->GetSubmodel(model_name);
GE_CHECK_NOTNULL(submodel, ", Failed to get submodel, submodel_name = %s", model_name.c_str());
GE_CHK_STATUS_RET(FlattenSubmodel(submodel_info_info, submodel, 0), "Failed to flatten submodel %s",
model_name.c_str());
}
flattened_model_relation = std::move(flattened_model_relation_);
name_to_models = std::move(leaf_models_);
return SUCCESS;
}
bool ModelRelationFlattener::NeedFlatten(const PneModelPtr &root_model) {
const auto &submodels = root_model->GetSubmodels();
for (const auto &submodel : submodels) {
if (!submodel.second->GetSubmodels().empty()) {
return true;
}
}
return false;
}
Status ModelRelationFlattener::Flatten(const PneModelPtr &root_model) {
GE_CHECK_NOTNULL(root_model);
const auto is_need_flatten = NeedFlatten(root_model);
if (!is_need_flatten) {
GELOGD("model is no need flatten, model %s", root_model->GetModelName().c_str());
return SUCCESS;
}
if (root_model->GetModelRelation() == nullptr) {
GELOGD("model need flatten but relation is null, need build relation, model %s",
root_model->GetModelName().c_str());
const auto &root_graph = root_model->GetRootGraph();
GE_CHECK_NOTNULL(root_graph, ", need build model relation, but root graph is null, model %s",
root_model->GetModelName().c_str());
auto model_relation = MakeShared<ModelRelation>();
GE_CHECK_NOTNULL(model_relation, ", need build model relation, but make shared failed, model %s",
root_model->GetModelName().c_str());
GE_CHK_STATUS_RET(ModelRelationBuilder().BuildForSingleModel(*root_graph, *model_relation),
"Failed to build ModelRelation from root graph: %s.", root_graph->GetName().c_str());
root_model->SetModelRelation(model_relation);
GELOGD("make model relation success, model %s", root_model->GetModelName().c_str());
}
auto flattened_model_relation = MakeShared<ModelRelation>();
GE_CHECK_NOTNULL(flattened_model_relation, ", Failed to make flatten model relation for model %s",
root_model->GetModelName().c_str());
ModelRelationFlattener flattener(root_model);
std::map<std::string, PneModelPtr> flattened_submodels;
GE_CHK_STATUS_RET_NOLOG(flattener.Flatten(*flattened_model_relation, flattened_submodels));
root_model->SetModelRelation(flattened_model_relation);
root_model->SetSubmodels(flattened_submodels);
GELOGD("model flatten end, model %s", root_model->GetModelName().c_str());
return SUCCESS;
}
Status ModelRelationFlattener::FlattenSubmodel(const ModelRelation::ModelEndpointInfo &parent_model_queue_info,
const PneModelPtr &pne_model, const int32_t depth) {
const auto &submodels = pne_model->GetSubmodels();
if (submodels.empty()) {
const auto &model_name = pne_model->GetModelName();
(void)leaf_models_.emplace(model_name, pne_model);
(void)flattened_model_relation_.submodel_endpoint_infos.emplace(model_name, parent_model_queue_info);
GELOGD("Leaf submodel %s(%s) flattened to parent model %s", pne_model->GetModelName().c_str(),
pne_model->GetModelType().c_str(), parent_model_queue_info.model_name.c_str());
return SUCCESS;
}
if (depth >= max_depth_) {
GELOGE(UNSUPPORTED, "Depth limit(%d) reached", max_depth_);
return UNSUPPORTED;
}
GELOGD("To flatten submodel %s(%s) to parent model %s", pne_model->GetModelName().c_str(),
pne_model->GetModelType().c_str(), parent_model_queue_info.model_name.c_str());
const auto &model_relation = pne_model->GetModelRelation();
GE_CHECK_NOTNULL(model_relation);
GE_CHK_STATUS_RET_NOLOG(CheckConsistency(parent_model_queue_info, model_relation->root_model_endpoint_info));
auto name_refs = BuildNameRefs(parent_model_queue_info, model_relation->root_model_endpoint_info);
MergeEndpoints(name_refs, model_relation->endpoints);
flattened_model_relation_.invoked_model_queue_infos.insert(model_relation->invoked_model_queue_infos.begin(),
model_relation->invoked_model_queue_infos.end());
for (auto &it : model_relation->submodel_endpoint_infos) {
auto &submodel_info_info = it.second;
ReplaceQueueNames(name_refs, submodel_info_info.input_endpoint_names);
ReplaceQueueNames(name_refs, submodel_info_info.output_endpoint_names);
auto submodel = pne_model->GetSubmodel(submodel_info_info.model_name);
GE_CHECK_NOTNULL(submodel, "Failed to get submodel, parent_model = %s, submodel_name = %s",
pne_model->GetModelName().c_str(), submodel_info_info.model_name.c_str());
GE_CHK_STATUS_RET(FlattenSubmodel(submodel_info_info, submodel, depth + 1), "Failed to flatten submodel %s",
submodel_info_info.model_name.c_str());
}
return SUCCESS;
}
void ModelRelationFlattener::ReplaceQueueNames(const std::map<std::string, std::string> &name_refs,
std::vector<std::string> &names) {
for (auto &name : names) {
auto it = name_refs.find(name);
if (it != name_refs.cend()) {
name = it->second;
}
}
}
void ModelRelationFlattener::MergeEndpoints(const map<std::string, std::string> &name_refs,
const vector<Endpoint> &endpoints) {
for (const auto &endpoint : endpoints) {
auto it = name_refs.find(endpoint.GetName());
if (it == name_refs.cend()) {
flattened_model_relation_.endpoints.emplace_back(endpoint);
}
}
}
std::map<std::string, std::string> ModelRelationFlattener::BuildNameRefs(
const ModelRelation::ModelEndpointInfo &parent_model_queue_info,
const ModelRelation::ModelEndpointInfo &root_model_queue_info) {
std::map<std::string, std::string> name_refs;
const auto &input_endpoint_names = root_model_queue_info.input_endpoint_names;
const auto &output_endpoint_names = root_model_queue_info.output_endpoint_names;
for (size_t i = 0; i < input_endpoint_names.size(); ++i) {
name_refs[input_endpoint_names[i]] = parent_model_queue_info.input_endpoint_names[i];
}
for (size_t i = 0; i < output_endpoint_names.size(); ++i) {
name_refs[output_endpoint_names[i]] = parent_model_queue_info.output_endpoint_names[i];
}
return name_refs;
}
Status ModelRelationFlattener::CheckConsistency(const ModelRelation::ModelEndpointInfo &parent_model_queue_info,
const ModelRelation::ModelEndpointInfo &root_model_queue_info) {
if (root_model_queue_info.input_endpoint_names.size() != parent_model_queue_info.input_endpoint_names.size()) {
GELOGE(PARAM_INVALID, "input queue size(%zu) mismatches that of parent's (%zu)",
root_model_queue_info.input_endpoint_names.size(), parent_model_queue_info.input_endpoint_names.size());
return PARAM_INVALID;
}
if (root_model_queue_info.output_endpoint_names.size() != parent_model_queue_info.output_endpoint_names.size()) {
GELOGE(PARAM_INVALID, "output queue size(%zu) mismatches that of parent's (%zu)",
root_model_queue_info.output_endpoint_names.size(), parent_model_queue_info.output_endpoint_names.size());
return PARAM_INVALID;
}
return SUCCESS;
}
const bool &DeployPlannerBase::GetIsDynamicSched() const {
return deploy_plan_.is_dynamic_sched_;
}
void DeployPlannerBase::GenerateDynamicSchedModelId() {
std::map<std::string, std::vector<std::string>> model_instances;
uint32_t id = 0U;
deploy_plan_.dynamic_sched_plan_.submodels_id_["__head"] = id;
deploy_plan_.dynamic_sched_plan_.submodels_id_["__tail"] = id;
for (const auto &it : model_relation_.submodel_endpoint_infos) {
const auto &model_instance_name = it.first;
const auto &submodel_endpoint_info = it.second;
deploy_plan_.dynamic_sched_plan_.submodels_id_[model_instance_name] = ++id;
model_instances[submodel_endpoint_info.model_name].emplace_back(model_instance_name);
GELOGI("Submodel instance [%s], model_uuid=%u.", model_instance_name.c_str(), id);
}
for (const auto &it : model_relation_.submodel_endpoint_infos) {
const auto &submodel_endpoint_info = it.second;
deploy_plan_.dynamic_sched_plan_.model_instances_num_[it.first] =
model_instances[submodel_endpoint_info.model_name].size();
GELOGI("Submodel instance [%s] num=%u.", it.first.c_str(),
deploy_plan_.dynamic_sched_plan_.model_instances_num_[it.first]);
}
}
void DeployPlannerBase::UpdateRelationForDynamicSched() {
if (!GetIsDynamicSched() && (!deploy_plan_.IsEnableExceptionCatch())) {
GELOGI("DynamicSched flag close and exception catch is disable, don't add status queues.");
return;
}
GELOGD("DynamicSched flag=%d, exception catch flag=%d.", static_cast<int32_t>(GetIsDynamicSched()),
static_cast<int32_t>(deploy_plan_.IsEnableExceptionCatch()));
const std::string status_queue_name = "__status_output";
Endpoint queue_def(status_queue_name, EndpointType::kQueue);
QueueNodeUtils(queue_def).SetDepth(kDepDefQueDepth).SetEnqueuePolicy("FIFO").
SetNodeAction(kQueueActionStatus);
model_relation_.endpoints.emplace_back(queue_def);
for (const auto &it : model_relation_.submodel_endpoint_infos) {
model_relation_.submodel_endpoint_infos[it.first].status_output_queue_names.emplace_back(status_queue_name);
}
model_relation_.root_model_endpoint_info.status_output_queue_names.emplace_back(status_queue_name);
GELOGI("DynamicSched add status report queue, name=%s.", status_queue_name.c_str());
head_model_queue_info_.sched_output_queue_names = model_relation_.root_model_endpoint_info.sched_input_queue_names;
head_model_queue_info_.status_output_queue_names = model_relation_.root_model_endpoint_info.status_input_queue_names;
tail_model_queue_info_.sched_input_queue_names = model_relation_.root_model_endpoint_info.sched_output_queue_names;
tail_model_queue_info_.status_input_queue_names = model_relation_.root_model_endpoint_info.status_output_queue_names;
}
Status DeployPlannerBase::AssignDynamicSchedEnqueueQueues() {
GE_CHK_STATUS_RET_NOLOG(
CreateDynamicSchedOutputQueueDefs(head_model_queue_info_.model_name,
head_model_queue_info_.sched_output_queue_names));
for (const auto &it : model_relation_.submodel_endpoint_infos) {
const auto &model_instance_name = it.first;
GE_CHK_STATUS_RET_NOLOG(CreateDynamicSchedOutputQueueDefs(model_instance_name,
it.second.status_output_queue_names));
GE_CHK_STATUS_RET_NOLOG(CreateDynamicSchedOutputQueueDefs(model_instance_name,
it.second.sched_output_queue_names));
}
return SUCCESS;
}
void DeployPlannerBase::DynamicSchedGroupFormat(const int32_t &real_entry_index,
const int32_t &entry_index,
const DeployPlan::QueueInfo *src_queue_info,
const int32_t &src_q_idx,
const int32_t &dst_q_idx) {
auto &modelInfo = deploy_plan_.dynamic_sched_plan_.model_index_info_;
auto iter = deploy_plan_.dynamic_sched_plan_.entry_to_dst_index_.find(entry_index);
if (iter == deploy_plan_.dynamic_sched_plan_.entry_to_dst_index_.end()) {
GELOGI("DynamicSched, Step2, can't find entry index=%d from entry bindings.", entry_index);
} else {
const auto &dst_endpoint_info = deploy_plan_.queues_[iter->second];
auto mul_submodel_id = 0;
auto submodel_info = instance_to_model_name_.find(dst_endpoint_info.model_instance_name);
if (submodel_info != instance_to_model_name_.end()) {
auto model_info = model_name_to_id_.find(submodel_info->second);
if (model_info != model_name_to_id_.end()) {
mul_submodel_id = model_info->second;
}
}
const auto model_id = src_queue_info->model_id;
modelInfo[model_id][src_q_idx].first.device_info = src_queue_info->device_info;
modelInfo[model_id][src_q_idx].first.submodel_instance_name = src_queue_info->model_instance_name;
modelInfo[model_id][src_q_idx].first.is_normal = true;
modelInfo[model_id][src_q_idx].second[dst_q_idx].model_id = mul_submodel_id;
DeployPlan::ExtendedIndexInfo index_info;
index_info.device_info = deploy_plan_.queues_[iter->second].device_info;
index_info.submodel_instance_name = deploy_plan_.queues_[iter->second].model_instance_name;
index_info.is_normal = true;
auto &dst_submodule_info = MutableSubmodelInfo(dst_endpoint_info.model_instance_name);
const bool is_redundant = dst_submodule_info.is_redundant;
DeployPlan::DynamicGroupRouteInfo group_route_info = {
real_entry_index,
iter->second,
index_info,
is_redundant
};
modelInfo[model_id][src_q_idx].second[dst_q_idx].routes.push_back(group_route_info);
GELOGI("DynamicSched, Step2, add src index bind info: src endpoint index=%d, logic group id=%d, "
"group entry endpoint index=%d, group dst endpoint index=%d, src model instance name=%s, "
"model_id=%u, dst model instance name=%s, redundant flag=%d.",
src_q_idx, dst_q_idx, real_entry_index, iter->second,
src_queue_info->model_instance_name.c_str(), model_id,
dst_endpoint_info.model_instance_name.c_str(), static_cast<int32_t>(is_redundant));
}
return;
}
void DeployPlannerBase::AddDependentDevice(std::set<DeployPlan::DeviceInfo> &device_infos,
const std::vector<int32_t> &queue_indexs) {
for (const auto index : queue_indexs) {
device_infos.emplace(deploy_plan_.queues_[index].device_info);
}
}
void DeployPlannerBase::BuildModelDeployInfos() {
for (auto &model_iter : deploy_plan_.GetModelDeployInfos()) {
for (auto &model_instance_iter : model_iter.second) {
const auto &model_instance_name = model_instance_iter.first;
const auto &submodel_info = MutableSubmodelInfo(model_instance_name);
AddDependentDevice(model_instance_iter.second, submodel_info.input_queue_indices);
AddDependentDevice(model_instance_iter.second, submodel_info.control_input_queue_indices);
AddDependentDevice(model_instance_iter.second, submodel_info.output_queue_indices);
AddDependentDevice(model_instance_iter.second, submodel_info.control_output_queue_indices);
AddDependentDevice(model_instance_iter.second, submodel_info.status_input_queue_indices);
AddDependentDevice(model_instance_iter.second, submodel_info.status_output_queue_indices);
}
}
}
Status DeployPlannerBase::BuildDynamicSchedInfo() {
for (auto &binding : deploy_plan_.GetQueueBindings()) {
auto src_q_idx = binding.first;
auto dst_q_idx = binding.second;
const DeployPlan::QueueInfo *src_queue_info = nullptr;
GE_CHK_STATUS_RET_NOLOG(deploy_plan_.GetQueueInfo(src_q_idx, src_queue_info));
if (deploy_plan_.IsGroupEndpoint(src_q_idx) || !deploy_plan_.IsGroupEndpoint(dst_q_idx)) {
continue;
}
auto queue_list_iter = deploy_plan_.GetGroups().find(dst_q_idx);
GE_CHK_BOOL_RET_STATUS(queue_list_iter != deploy_plan_.GetGroups().end(),
FAILED, "DynamicSched, Step2, Get group[%d] info failed.", dst_q_idx);
auto group_entries_index_start = static_cast<int32_t>(deploy_plan_.GetQueueInfoList().size());
for (auto entry_index : queue_list_iter->second) {
auto &entry_info = deploy_plan_.GetGroupEntryInfoList()[entry_index];
auto real_entry_index = 0;
if (entry_info.ref_index >= 0) {
real_entry_index = entry_info.ref_index;
} else {
real_entry_index = group_entries_index_start + entry_index;
}
DynamicSchedGroupFormat(real_entry_index, entry_index, src_queue_info, src_q_idx, dst_q_idx);
}
}
BuildModelDeployInfos();
return SUCCESS;
}
Status DeployPlannerBase::SetHeadNodeInfo() {
for (const auto &it : model_relation_.submodel_endpoint_infos) {
const auto &model_instance_name = it.first;
const auto input_names = it.second.input_endpoint_names;
auto &submodel_info = MutableSubmodelInfo(model_instance_name);
submodel_info.is_head = HasIntersection(input_names, model_relation_.root_model_endpoint_info.input_endpoint_names);
GELOGI("SetHeadNodeInfo, name = %s set head %d", model_instance_name.c_str(), submodel_info.is_head);
}
return SUCCESS;
}
Status DeployPlannerBase::ResolveModelDynamicInputs(const std::string &model_instance_name,
const ModelRelation::ModelEndpointInfo &model_endpoint_info) {
std::vector<const Endpoint *> model_input_endpoints;
GE_CHK_STATUS_RET_NOLOG(
relation_reader_->BatchGetEndpoints(model_endpoint_info.sched_input_queue_names, model_input_endpoints));
GE_CHK_STATUS_RET_NOLOG(
relation_reader_->BatchGetEndpoints(model_endpoint_info.status_input_queue_names, model_input_endpoints));
std::vector<ModelQueueIndex> model_queue_indexs;
model_queue_indexs.reserve(model_input_endpoints.size());
for (size_t input_index = 0UL; input_index < model_endpoint_info.sched_input_queue_names.size(); ++input_index) {
ModelQueueIndex input_queue_index{model_endpoint_info.model_name + "_sched", "",
static_cast<int32_t>(input_index)};
model_queue_indexs.emplace_back(std::move(input_queue_index));
}
for (size_t input_index = 0UL; input_index < model_endpoint_info.status_input_queue_names.size(); ++input_index) {
ModelQueueIndex input_queue_index{model_endpoint_info.model_name + "_status", "",
static_cast<int32_t>(input_index)};
model_queue_indexs.emplace_back(std::move(input_queue_index));
}
GE_CHK_BOOL_RET_STATUS(model_queue_indexs.size() == model_input_endpoints.size(), INTERNAL_ERROR,
"model_queue_indexs.size=%zu is not same as model_input_endpoints.size=%zu, model=%s",
model_queue_indexs.size(), model_input_endpoints.size(), model_instance_name.c_str());
auto &submodel_info = MutableSubmodelInfo(model_instance_name);
for (size_t index = 0UL; index < model_input_endpoints.size(); ++index) {
const auto &model_queue_id = model_queue_indexs[index];
const auto endpoint = model_input_endpoints[index];
const auto &endpoint_name = endpoint->GetName();
const auto &src_endpoint_indices = deploy_plan_.dynamic_sched_plan_.src_endpoint_indices_[endpoint_name];
if (src_endpoint_indices.empty()) {
GELOGE(PARAM_INVALID, "Failed to find enqueue operation for queue [%s]", endpoint_name.c_str());
return PARAM_INVALID;
}
for (auto src_endpoint_index : src_endpoint_indices) {
const auto &src_endpoint = deploy_plan_.queues_[static_cast<size_t>(src_endpoint_index)];
auto &dst_endpoint_groups = deploy_plan_.dynamic_sched_plan_.endpoint_pairs_[src_endpoint_index];
auto queue_info = BuildQueueInfo(*endpoint, model_instance_name);
GE_CHK_STATUS_RET(AdjustDequeueDevice(queue_info, src_endpoint_indices), "Failed to adjust dequeue device");
queue_info.name = GetEndpointFullName(queue_info, model_queue_id);
relation_dst_to_src_[queue_info.name].emplace(src_endpoint_index);
dst_endpoint_groups[model_queue_id].emplace_back(std::move(queue_info));
GELOGD("DynamicSched Bind endpoints: name = %s, from %s to %s:%d@%s, invoke_key=%s.", endpoint_name.c_str(),
src_endpoint.model_instance_name.c_str(), model_endpoint_info.model_name.c_str(), model_queue_id.id,
submodel_info.device_info.GetDesc().c_str(), model_queue_id.invoke_key.c_str());
}
}
return SUCCESS;
}
Status DeployPlannerBase::AssignDynamicSchedDequeueQueue(const DeployPlan::QueueInfo &queue_info,
const ModelQueueIndex &model_queue_loc,
const int32_t &src_endpoint_idx) {
int32_t dst_endpoint_idx = -1;
const auto &model_instance_name = queue_info.model_instance_name;
if (reusable_queue_indices_.count(src_endpoint_idx) > 0UL) {
GELOGI("DynamicSched, Reuse src queue, queue name = %s, queue index = %d",
deploy_plan_.queues_[src_endpoint_idx].name.c_str(), src_endpoint_idx);
dst_endpoint_idx = src_endpoint_idx;
} else {
GE_CHK_STATUS_RET_NOLOG(GetOrCreateInputEndpoint(model_queue_loc, queue_info, dst_endpoint_idx));
GE_CHK_STATUS_RET_NOLOG(CreateDynamicSchedTags(src_endpoint_idx,
dst_endpoint_idx,
queue_info));
GELOGI("DynamicSched, Endpoint binding added, src = %s, dst = %s", ToEndpointDesc(src_endpoint_idx).c_str(),
ToEndpointDesc(dst_endpoint_idx).c_str());
}
if (queue_info.queue_action == DeployPlan::QueueAction::kStatus) {
auto &submodel_info = MutableSubmodelInfo(model_instance_name);
submodel_info.status_input_queue_indices.push_back(dst_endpoint_idx);
GELOGI("DynamicSched, add status input indices, model instance name=%s, input indice=%d.",
model_instance_name.c_str(), dst_endpoint_idx);
} else {
auto &submodel_info = MutableSubmodelInfo(model_instance_name);
submodel_info.sched_input_queue_indices.push_back(dst_endpoint_idx);
GELOGI("DynamicSched, add sched input indices, model instance name=%s, input indice=%d.",
model_instance_name.c_str(), dst_endpoint_idx);
deploy_plan_.dynamic_sched_plan_.datagw_request_bindings_[src_endpoint_idx] = dst_endpoint_idx;
GELOGI("DynamicSched, datagw request bindings, datagw input=%d, sched app output=%d.",
dst_endpoint_idx, src_endpoint_idx);
}
return SUCCESS;
}
Status DeployPlannerBase::AssignDynamicSchedDequeueQueues() {
for (const auto &endpoint_pair : deploy_plan_.dynamic_sched_plan_.endpoint_pairs_) {
const auto src_endpoint_idx = endpoint_pair.first;
for (const auto &queue_loc_and_queue_infos : endpoint_pair.second) {
const auto &model_queue_loc = queue_loc_and_queue_infos.first;
for (size_t i = 0; i < queue_loc_and_queue_infos.second.size(); ++i) {
const auto &queue_info = queue_loc_and_queue_infos.second[i];
GE_CHK_STATUS_RET_NOLOG(AssignDynamicSchedDequeueQueue(queue_info, model_queue_loc,
src_endpoint_idx));
}
}
}
return SUCCESS;
}
Status DeployPlannerBase::DynamicSchedBindGroup2Queue(const int32_t src_idx,
const int32_t dst_idx,
int32_t &group_index) {
DeployPlan::QueueInfo group_info{};
const auto &dst_endpoint_info = deploy_plan_.queues_[src_idx];
group_info.name = dst_endpoint_info.name;
group_info.device_info = dst_endpoint_info.device_info;
group_info.model_instance_name = dst_endpoint_info.model_instance_name;
GE_CHK_STATUS_RET(CreateGroupInfo(group_info, {dst_idx}, group_index));
deploy_plan_.queue_bindings_.emplace_back(group_index, src_idx);
return SUCCESS;
}
Status DeployPlannerBase::DynamicSchedBindQueue2Group(const int32_t src_idx,
const int32_t dst_idx,
int32_t &group_index) {
DeployPlan::QueueInfo group_info{};
const auto &src_endpoint_info = deploy_plan_.queues_[src_idx];
group_info.name = src_endpoint_info.name;
group_info.device_info = src_endpoint_info.device_info;
group_info.model_instance_name = src_endpoint_info.model_instance_name;
GE_CHK_STATUS_RET(CreateGroupInfo(group_info, {dst_idx}, group_index));
deploy_plan_.queue_bindings_.emplace_back(src_idx, group_index);
return SUCCESS;
}
void DeployPlannerBase::BindDynamicSchedDevQueue(const int32_t src_endpoint_idx,
const int32_t dst_endpoint_idx) {
auto src_is_multi_connected = IsOutputMultiConnected(src_endpoint_idx);
auto dst_is_multi_connected = IsInputMultiConnected(dst_endpoint_idx);
GELOGI("DynamicSched, Src endpoint[%s] is one to many = %d, dst endpoint[%s] is many to one = %d",
ToEndpointDesc(src_endpoint_idx).c_str(), src_is_multi_connected, ToEndpointDesc(dst_endpoint_idx).c_str(),
dst_is_multi_connected);
if (src_is_multi_connected && dst_is_multi_connected) {
GELOGW("DynamicSched, shouldn't many to many relation.");
}
deploy_plan_.queue_bindings_.emplace_back(src_endpoint_idx, dst_endpoint_idx);
GELOGI("DynamicSched, Add bind relation[%d -> %d] success, src endpoint[%s], dst endpoint[%s].",
src_endpoint_idx, dst_endpoint_idx, ToEndpointDesc(src_endpoint_idx).c_str(),
ToEndpointDesc(dst_endpoint_idx).c_str());
}
Status DeployPlannerBase::BindDynamicSchedHostQueue(const DeployPlan::DeviceInfo &src_device_info,
const DeployPlan::DeviceInfo &dst_device_info,
DeployPlan::QueueInfo &entry_info,
int32_t &src_endpoint_idx,
int32_t &dst_endpoint_idx) {
if (src_device_info.WithProxy()) {
auto src_proxy_queue_info = deploy_plan_.queues_[src_endpoint_idx];
src_proxy_queue_info.device_info = src_device_info.ProxyDevice();
int32_t proxy_index = -1;
GE_CHK_STATUS_RET_NOLOG(CreateEndpointInfo(src_proxy_queue_info, proxy_index));
deploy_plan_.queue_bindings_.emplace_back(src_endpoint_idx, proxy_index);
GELOGI("DynamicSched, Mul-server add host bind relation[%d -> %d] success, src endpoint[%s], dst endpoint[%s].",
src_endpoint_idx, proxy_index, ToEndpointDesc(src_endpoint_idx).c_str(),
ToEndpointDesc(proxy_index).c_str());
src_endpoint_idx = proxy_index;
}
if (dst_device_info.WithProxy()) {
auto dst_proxy_queue_info = deploy_plan_.queues_[dst_endpoint_idx];
dst_proxy_queue_info.device_info = dst_device_info.ProxyDevice();
entry_info.device_info = dst_proxy_queue_info.device_info;
int32_t proxy_index = -1;
GE_CHK_STATUS_RET_NOLOG(CreateEndpointInfo(dst_proxy_queue_info, proxy_index));
deploy_plan_.queue_bindings_.emplace_back(proxy_index, dst_endpoint_idx);
GELOGI("DynamicSched, Mul-server add host bind relation[%d -> %d] success, src endpoint[%s], dst endpoint[%s].",
proxy_index, dst_endpoint_idx, ToEndpointDesc(proxy_index).c_str(),
ToEndpointDesc(dst_endpoint_idx).c_str());
dst_endpoint_idx = proxy_index;
}
return SUCCESS;
}
Status DeployPlannerBase::CreateDynamicSchedTags(const int32_t src_endpoint_idx,
const int32_t dst_endpoint_idx,
const DeployPlan::QueueInfo &queue_info) {
if (CheckAndAddRelation(src_endpoint_idx, dst_endpoint_idx)) {
return SUCCESS;
}
int32_t group_index = -1;
auto &src_device_info = deploy_plan_.queues_[src_endpoint_idx].device_info;
const auto is_same_node = (src_device_info.GetNodeId() == queue_info.device_info.GetNodeId());
const auto is_same_type = (src_device_info.GetType() == queue_info.device_info.GetType());
if ((src_device_info.GetKey() == queue_info.device_info.GetKey()) || (is_same_node && !is_same_type)) {
BindDynamicSchedDevQueue(src_endpoint_idx, dst_endpoint_idx);
return SUCCESS;
}
auto proxy_src_endpoint_idx = src_endpoint_idx;
auto proxy_dst_endpoint_idx = dst_endpoint_idx;
auto entry_info = queue_info;
if (!is_same_node && (src_device_info.WithProxy() || queue_info.device_info.WithProxy())) {
GE_CHK_STATUS_RET_NOLOG(BindDynamicSchedHostQueue(src_device_info, queue_info.device_info, entry_info,
proxy_src_endpoint_idx, proxy_dst_endpoint_idx));
}
entry_info = deploy_plan_.queues_[proxy_src_endpoint_idx];
if (queue_info.queue_action == DeployPlan::QueueAction::kStatus) {
entry_info.name += deploy_plan_.queues_[proxy_src_endpoint_idx].model_instance_name;
}
std::pair<int32_t, int32_t> tag_pair;
GE_CHK_STATUS_RET(GetOrCreateMappingTagPairEntry(proxy_dst_endpoint_idx, entry_info, tag_pair, false),
"Failed to create mapping tag pair entity.");
GELOGI("DynamicSched, src endpoint [%d] [%s] add output tag [%d] [%s], dst endpoint [%d] [%s]"
" add input tag [%d] [%s].", proxy_src_endpoint_idx, ToEndpointDesc(proxy_src_endpoint_idx).c_str(),
tag_pair.first, ToEndpointDesc(tag_pair.first, true).c_str(), proxy_dst_endpoint_idx,
ToEndpointDesc(proxy_dst_endpoint_idx).c_str(), tag_pair.second, ToEndpointDesc(tag_pair.second, true).c_str());
GE_CHK_STATUS_RET(DynamicSchedBindQueue2Group(proxy_src_endpoint_idx, tag_pair.first, group_index));
GELOGI("DynamicSched, Output group binding added, local = %s, peer = %s.",
deploy_plan_.queues_[proxy_src_endpoint_idx].device_info.GetDesc().c_str(),
ToString(ToEndpointDescs(deploy_plan_.groups_[group_index], true)).c_str());
if (!CheckAndAddRelation(tag_pair.second, proxy_dst_endpoint_idx, kDynamicSchedRelationSuffix)) {
GE_CHK_STATUS_RET(DynamicSchedBindGroup2Queue(proxy_dst_endpoint_idx, tag_pair.second, group_index));
}
GELOGI("DynamicSched, Input group binding added, peer = %s, local = %s.",
ToString(ToEndpointDescs(deploy_plan_.groups_[group_index], true)).c_str(),
deploy_plan_.queues_[proxy_dst_endpoint_idx].device_info.GetDesc().c_str());
return SUCCESS;
}
Status DeployPlannerBase::CreateDynamicSchedOutputQueueDefs(const std::string &model_instance_name,
const std::vector<std::string> &queue_names,
const bool is_owned) {
std::vector<const Endpoint *> endpoints;
GE_CHK_STATUS_RET_NOLOG(relation_reader_->BatchGetEndpoints(queue_names, endpoints));
for (size_t output_idx = 0U; output_idx < endpoints.size(); ++output_idx) {
const auto endpoint = endpoints[output_idx];
int32_t endpoint_index = -1;
auto queue_info = BuildQueueInfo(*endpoint, model_instance_name);
queue_info.owned = is_owned;
GE_CHK_STATUS_RET_NOLOG(CreateEndpointInfo(queue_info, endpoint_index));
deploy_plan_.dynamic_sched_plan_.src_endpoint_indices_[endpoint->GetName()].emplace_back(endpoint_index);
if (queue_info.owned) {
auto &submodel_info = MutableSubmodelInfo(queue_info.model_instance_name);
if (queue_info.queue_action == DeployPlan::QueueAction::kStatus) {
submodel_info.status_output_queue_indices.emplace_back(endpoint_index);
} else {
submodel_info.sched_output_queue_indices.emplace_back(endpoint_index);
}
}
GELOGD("DynamicSched Output endpoint created, model = %s, output_index = %zu, queue name = %s, queue index = %d",
model_instance_name.c_str(), output_idx, endpoint->GetName().c_str(), endpoint_index);
}
return SUCCESS;
}
void DeployPlannerBase::UpdateDynamicSchedDeployPlan() {
deploy_plan_.dynamic_sched_plan_.root_model_info_.status_input_queue_indices =
std::move(head_model_info_.status_output_queue_indices);
deploy_plan_.dynamic_sched_plan_.root_model_info_.sched_input_queue_indices =
std::move(head_model_info_.sched_output_queue_indices);
deploy_plan_.dynamic_sched_plan_.root_model_info_.status_output_queue_indices =
std::move(tail_model_info_.status_input_queue_indices);
deploy_plan_.dynamic_sched_plan_.root_model_info_.sched_output_queue_indices =
std::move(tail_model_info_.sched_input_queue_indices);
}
}