* Copyright (c) 2025 Huawei Technologies Co., Ltd.
* This program is free software, you can redistribute it and/or modify it under the terms and conditions of
* CANN Open Software License Agreement Version 2.0 (the "License").
* Please refer to the License for details. You may not use this file except in compliance with the License.
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED,
* INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE.
* See LICENSE in the root of the software repository for the full text of the License.
*/
#include "dynamic_stream_allocator.h"
#include "common/checker.h"
#include "graph_metadef/common/ge_common/util.h"
#include "framework/common/framework_types_internal.h"
#include "graph/build/stream/assign_attached_notify_pass.h"
#include "graph/build/stream/assign_attached_stream_pass.h"
#include "graph/debug/ge_attr_define.h"
#include "graph/ge_context.h"
#include "graph/utils/constant_utils.h"
#include "graph/utils/graph_utils.h"
#include "graph/utils/op_type_utils.h"
#include "base/err_msg.h"
namespace ge {
namespace {
const std::set<std::string> kAicpuEngineIds = {"DNN_VM_AICPU", "DNN_VM_AICPU_FFTS_PLUS", "DNN_VM_AICPU_ASCEND",
"DNN_VM_AICPU_ASCEND_FFTS_PLUS"};
const std::set<std::string> kIndependentAicpu = {DROPOUTGENMASK, DROPOUTDOMASKV3, GETNEXT, DYNAMICGETNEXT,
DYNAMICGETNEXTV2};
const std::set<std::string> kEnginesOwningStream = {"AIcoreEngine", "VectorEngine", "DNN_HCCL", "DSAEngine"};
const std::set<std::string> kEnginesNoTask = {"DNN_VM_GE_LOCAL"};
const std::set<std::string> kNodesForcedInMainStream = {NETOUTPUT, FILECONSTANT};
bool TopoOrderCompare(const NodePtr &n0, const NodePtr &n1) {
if ((n0->GetOpDesc() == nullptr) || (n1->GetOpDesc() == nullptr)) {
return false;
}
return (n0->GetOpDesc()->GetId() < n1->GetOpDesc()->GetId());
}
}
Status DynamicStreamAllocator::AssignStreamsForDynamicShapeGraph(const ComputeGraphPtr &root_graph,
const Graph2SubGraphInfoList &subgraph_map) {
GE_CHECK_NOTNULL(root_graph);
GE_ASSERT_SUCCESS(GetAcParallelEnableConfig());
GE_ASSERT_SUCCESS(AssignStreams(root_graph, subgraph_map));
GE_ASSERT_SUCCESS(InsertEvents(root_graph));
GELOGI("Graph: %s, stream num: %lld, event num: %lld.", root_graph->GetName().c_str(), stream_num_, event_num_);
return SUCCESS;
}
Status DynamicStreamAllocator::AssignAttachedResource(const ComputeGraphPtr &compute_graph, int64_t &stream_num,
int64_t &event_num, int64_t ¬ify_num,
std::vector<uint32_t> ¬ify_types) {
(void)event_num;
std::vector<ComputeGraphPtr> dyn_graphs{compute_graph};
for (const auto &sub_graph : compute_graph->GetAllSubgraphs()) {
if (sub_graph->GetGraphUnknownFlag()) {
dyn_graphs.push_back(sub_graph);
}
}
AssignAttachedStreamPass attach_stream_pass;
AssignAttachedNotifyPass attached_notify_pass;
notify_types.resize(notify_num, RT_NOTIFY_DEFAULT);
uint32_t cur_notify_num = static_cast<uint32_t>(notify_num);
for (const auto &dyn_graph : dyn_graphs) {
GE_ASSERT_SUCCESS(attach_stream_pass.Run(dyn_graph, stream_num));
GE_ASSERT_SUCCESS(attached_notify_pass.Run(dyn_graph, cur_notify_num, notify_types));
}
notify_num = cur_notify_num;
return SUCCESS;
}
Status DynamicStreamAllocator::GetAcParallelEnableConfig() {
std::string ac_parallel_enable;
(void)GetContext().GetOption(AC_PARALLEL_ENABLE, ac_parallel_enable);
const std::set<std::string> options = {"", "0", "1"};
if (options.count(ac_parallel_enable) <= 0U) {
REPORT_PREDEFINED_ERR_MSG("E10001", std::vector<const char_t *>({"parameter", "value", "reason"}),
std::vector<const char_t *>({"ac_parallel_enable", ac_parallel_enable.c_str(),
"The value can only be empty, 0 and 1."}));
GELOGE(PARAM_INVALID, "[Check][Param] ac_parallel_enable: %s is invalid, only can be empty, 0 and 1.",
ac_parallel_enable.c_str());
return PARAM_INVALID;
}
GELOGI("ac_parallel_enable: %s.", ac_parallel_enable.c_str());
ac_parallel_enable_ = (ac_parallel_enable == "1");
return SUCCESS;
}
Status DynamicStreamAllocator::AssignStreams(const ComputeGraphPtr &root_graph,
const Graph2SubGraphInfoList &subgraph_map) {
const auto engine_confs = StreamUtils::GetEngineConfs();
GE_ASSERT_SUCCESS(AssignStreamsForGraph(root_graph, subgraph_map, engine_confs),
"Assign stream for graph: %s failed.", root_graph->GetName().c_str());
for (const auto &subgraph : root_graph->GetAllSubgraphs()) {
if (subgraph->GetGraphUnknownFlag()) {
GE_ASSERT_SUCCESS(AssignStreamsForGraph(subgraph, subgraph_map, engine_confs),
"Assign stream for graph: %s failed.", subgraph->GetName().c_str());
}
}
GE_ASSERT_SUCCESS(RefreshContinuousStreams(root_graph));
GE_ASSERT_SUCCESS(StreamUtils::RunCustomStreamPass(root_graph, stream_num_));
return SUCCESS;
}
Status DynamicStreamAllocator::AssignStreamsForGraph(const ComputeGraphPtr &graph,
const Graph2SubGraphInfoList &subgraph_map,
const std::map<std::string, EngineConfPtr> &engine_confs) {
GE_CHECK_NOTNULL(graph);
std::map<std::string, int32_t> max_parallel_num;
std::vector<SubgraphPtr> subgraphs;
GE_ASSERT_SUCCESS(StreamUtils::ConvertSubgraphs(graph, subgraph_map, engine_confs, max_parallel_num, subgraphs));
GE_ASSERT_SUCCESS(AssignEnginesOwningStream(subgraphs));
const auto end_subgraph_map = StreamUtils::InitEndSubgraphMap(subgraphs);
const auto pld_subgraph_map = StreamUtils::InitPldSubgraphMap(subgraphs);
if (ac_parallel_enable_) {
GE_ASSERT_SUCCESS(AssignAicpuCanParallel(subgraphs, end_subgraph_map, pld_subgraph_map));
} else {
GE_ASSERT_SUCCESS(AssignIndependentAicpuNode(subgraphs));
}
GE_ASSERT_SUCCESS(AssignWithReuse(subgraphs, end_subgraph_map, pld_subgraph_map));
GE_ASSERT_SUCCESS(AssignRemainSubgraphNeedAssignStream(subgraphs));
GE_ASSERT_SUCCESS(SetSubgraphStreamToNodes(graph, subgraphs));
GE_ASSERT_SUCCESS(ReassignStreamByStreamLabel(graph));
return SUCCESS;
}
void DynamicStreamAllocator::AssignStreamForSubgraph(const SubgraphPtr &subgraph) {
const auto iter = engine_streams_.find(subgraph->engine_conf.id);
if (iter == engine_streams_.end()) {
subgraph->stream_id = next_stream_;
engine_streams_.emplace(subgraph->engine_conf.id, next_stream_);
++next_stream_;
} else {
subgraph->stream_id = iter->second;
}
GELOGI("Assign stream_id: %lld for engine: %s, subgraph: %s.", subgraph->stream_id, subgraph->engine_conf.id.c_str(),
subgraph->name.c_str());
}
Status DynamicStreamAllocator::AssignEnginesOwningStream(const std::vector<SubgraphPtr> &subgraphs) {
for (const auto &subgraph : subgraphs) {
if (kEnginesOwningStream.count(subgraph->engine_conf.id) == 0U) {
continue;
}
AssignStreamForSubgraph(subgraph);
}
return SUCCESS;
}
Status DynamicStreamAllocator::AssignAicpuCanParallel(
const std::vector<SubgraphPtr> &subgraphs, const std::unordered_map<NodePtr, SubgraphPtr> &end_subgraph_map,
const std::unordered_map<NodePtr, SubgraphPtr> &pld_subgraph_map) {
for (const auto &subgraph : subgraphs) {
if (kAicpuEngineIds.count(subgraph->engine_conf.id) == 0U) {
continue;
}
const bool has_no_input =
std::all_of(subgraph->subgraph_info.GetPld2EndMap().begin(), subgraph->subgraph_info.GetPld2EndMap().end(),
[](const std::pair<NodePtr, NodePtr> &pair) { return (pair.second == nullptr); });
const bool has_only_no_task_input = HasOnlyNoTaskInput(subgraph, end_subgraph_map);
const bool can_reuse_pred_subgraph = CanReusePredSubgraph(subgraph, end_subgraph_map, pld_subgraph_map);
GELOGI("Subgraph: %s, has_no_input: %d, has_only_no_task_input: %d, can_reuse_pred_subgraph: %d.",
subgraph->name.c_str(), static_cast<int32_t>(has_no_input), static_cast<int32_t>(has_only_no_task_input),
static_cast<int32_t>(can_reuse_pred_subgraph));
if (has_no_input || has_only_no_task_input || (!can_reuse_pred_subgraph)) {
AssignStreamForSubgraph(subgraph);
}
}
return SUCCESS;
}
bool DynamicStreamAllocator::HasOnlyNoTaskInput(
const SubgraphPtr &subgraph, const std::unordered_map<NodePtr, SubgraphPtr> &end_subgraph_map) const {
for (const auto &pld_2_end : subgraph->subgraph_info.GetPld2EndMap()) {
const auto iter = end_subgraph_map.find(pld_2_end.second);
if ((iter == end_subgraph_map.end()) || (iter->second == nullptr)) {
return false;
}
const auto &pred_subgraph = iter->second;
if (kEnginesNoTask.count(pred_subgraph->engine_conf.id) == 0U) {
return false;
}
if (std::any_of(pred_subgraph->subgraph_info.GetPld2EndMap().begin(),
pred_subgraph->subgraph_info.GetPld2EndMap().end(),
[](const std::pair<NodePtr, NodePtr> &pair) { return (pair.second != nullptr); })) {
return false;
}
}
return true;
}
bool DynamicStreamAllocator::CanReusePredSubgraph(
const SubgraphPtr &subgraph, const std::unordered_map<NodePtr, SubgraphPtr> &end_subgraph_map,
const std::unordered_map<NodePtr, SubgraphPtr> &pld_subgraph_map) const {
for (const auto &pld_2_end : subgraph->subgraph_info.GetPld2EndMap()) {
const auto iter = end_subgraph_map.find(pld_2_end.second);
if ((iter == end_subgraph_map.end()) || (iter->second == nullptr)) {
continue;
}
const auto &pred_subgraph = iter->second;
if (CanAicpuReusePredSubgraph(subgraph, pred_subgraph, pld_subgraph_map)) {
return true;
}
}
return false;
}
bool DynamicStreamAllocator::CanAicpuReusePredSubgraph(
const SubgraphPtr &subgraph, const SubgraphPtr &pred_subgraph,
const std::unordered_map<NodePtr, SubgraphPtr> &pld_subgraph_map) const {
if ((kEnginesOwningStream.count(pred_subgraph->engine_conf.id) == 0U) ||
StreamUtils::IsEngineIndependent(*pred_subgraph)) {
return false;
}
const auto is_higher_priority_brother = [&subgraph, &pred_subgraph,
&pld_subgraph_map](const std::pair<NodePtr, NodePtr> &pair) {
const auto iter = pld_subgraph_map.find(pair.second);
if (iter == pld_subgraph_map.end()) {
return false;
}
const auto &pred_subgraph_succ = iter->second;
return ((pred_subgraph_succ != subgraph) && (pred_subgraph_succ->engine_conf.id == pred_subgraph->engine_conf.id));
};
if (std::any_of(pred_subgraph->subgraph_info.GetEnd2PldMap().begin(),
pred_subgraph->subgraph_info.GetEnd2PldMap().end(), is_higher_priority_brother)) {
return false;
}
return true;
}
Status DynamicStreamAllocator::AssignIndependentAicpuNode(const std::vector<SubgraphPtr> &subgraphs) {
for (const auto &subgraph : subgraphs) {
if (kAicpuEngineIds.count(subgraph->engine_conf.id) == 0U) {
continue;
}
const auto &graph = subgraph->subgraph_info.GetSubGraph();
GE_CHECK_NOTNULL(graph);
std::set<NodePtr> aicpu_nodes;
for (const auto &node : graph->GetDirectNode()) {
if (kIndependentAicpu.count(node->GetType()) != 0U) {
aicpu_nodes.emplace(node);
}
}
if (aicpu_nodes.empty()) {
continue;
}
AssignStreamForSubgraph(subgraph);
}
return SUCCESS;
}
Status DynamicStreamAllocator::AssignWithReuse(const std::vector<SubgraphPtr> &subgraphs,
const std::unordered_map<NodePtr, SubgraphPtr> &end_subgraph_map,
const std::unordered_map<NodePtr, SubgraphPtr> &pld_subgraph_map) const {
for (const auto &subgraph : subgraphs) {
GE_CHECK_NOTNULL(subgraph->subgraph_info.GetSubGraph());
if (StreamUtils::HasAssignedStream(*subgraph)) {
continue;
}
SubgraphPtr reusable_subgraph = GetPredReusableSubgraph(subgraph, end_subgraph_map);
if (reusable_subgraph == nullptr) {
reusable_subgraph = GetSuccReusableSubgraph(subgraph, pld_subgraph_map);
}
if (reusable_subgraph == nullptr) {
GELOGI("Cannot find reusable subgraph of subgraph: %s.", subgraph->name.c_str());
continue;
}
if (reusable_subgraph->reused_subgraph != nullptr) {
reusable_subgraph = reusable_subgraph->reused_subgraph;
}
if (!StreamUtils::HasAssignedStream(*reusable_subgraph)) {
GELOGI("Reusable subgraph: %s of %s has not assigned stream.", reusable_subgraph->name.c_str(),
subgraph->name.c_str());
continue;
}
subgraph->stream_id = reusable_subgraph->stream_id;
subgraph->reused_subgraph = reusable_subgraph;
GELOGI("Subgraph: %s of engine: %s reuses stream id: %lld of subgraph: %s of engine: %s.", subgraph->name.c_str(),
subgraph->engine_conf.id.c_str(), subgraph->stream_id, reusable_subgraph->name.c_str(),
reusable_subgraph->engine_conf.id.c_str());
}
return SUCCESS;
}
SubgraphPtr DynamicStreamAllocator::GetPredReusableSubgraph(
const SubgraphPtr &subgraph, const std::unordered_map<NodePtr, SubgraphPtr> &end_subgraph_map) const {
std::set<SubgraphPtr> reusable_subgraphs;
const SubGraphInfo &subgraph_info = subgraph->subgraph_info;
for (const auto &pld_2_end : subgraph_info.GetPld2EndMap()) {
const auto iter = end_subgraph_map.find(pld_2_end.second);
if ((iter != end_subgraph_map.end()) && (iter->second != nullptr) &&
(reusable_subgraphs.count(iter->second) == 0U)) {
if (CouldReuse(subgraph, iter->second)) {
reusable_subgraphs.emplace(iter->second);
}
}
}
return StreamUtils::GetTopPrioritySubgraph(reusable_subgraphs);
}
SubgraphPtr DynamicStreamAllocator::GetSuccReusableSubgraph(
const SubgraphPtr &subgraph, const std::unordered_map<NodePtr, SubgraphPtr> &pld_subgraph_map) const {
std::set<SubgraphPtr> reusable_subgraphs;
const SubGraphInfo &subgraph_info = subgraph->subgraph_info;
for (const auto &end_2_pld : subgraph_info.GetEnd2PldMap()) {
const auto iter = pld_subgraph_map.find(end_2_pld.second);
if ((iter != pld_subgraph_map.end()) && (iter->second != nullptr) &&
(reusable_subgraphs.count(iter->second) == 0U)) {
if (CouldReuse(subgraph, iter->second)) {
reusable_subgraphs.emplace(iter->second);
}
}
}
return StreamUtils::GetTopPrioritySubgraph(reusable_subgraphs);
}
bool DynamicStreamAllocator::CouldReuse(const SubgraphPtr &subgraph, const SubgraphPtr &peer_subgraph) const {
if (kAicpuEngineIds.count(subgraph->engine_conf.id) > 0U) {
if (StreamUtils::IsEngineIndependent(*peer_subgraph)) {
return false;
}
}
if ((subgraph->engine_conf.id == peer_subgraph->engine_conf.id) || StreamUtils::IsEngineAttach(*subgraph)) {
return true;
}
if ((peer_subgraph->reused_subgraph != nullptr) &&
(peer_subgraph->reused_subgraph->engine_conf.id == subgraph->engine_conf.id)) {
return true;
}
return false;
}
Status DynamicStreamAllocator::AssignRemainSubgraphNeedAssignStream(const std::vector<SubgraphPtr> &subgraphs) {
for (const auto &subgraph : subgraphs) {
GE_CHECK_NOTNULL(subgraph->subgraph_info.GetSubGraph());
if (StreamUtils::HasAssignedStream(*subgraph)) {
continue;
}
if (StreamUtils::IsEngineSkip(*subgraph)) {
continue;
}
AssignStreamForSubgraph(subgraph);
}
return SUCCESS;
}
Status DynamicStreamAllocator::SetSubgraphStreamToNodes(const ComputeGraphPtr &graph,
const std::vector<SubgraphPtr> &subgraphs) const {
for (const auto &subgraph : subgraphs) {
const std::string &engine_name = subgraph->engine_conf.id;
GELOGI("[Assign][StreamId] %lld for Subgraph %s (engine: %s).", subgraph->stream_id, subgraph->name.c_str(),
engine_name.c_str());
}
for (const auto &node : graph->GetDirectNode()) {
GE_CHECK_NOTNULL(node->GetOpDesc());
node->GetOpDesc()->SetStreamId(kInvalidStream);
}
for (const auto &subgraph : subgraphs) {
if (!StreamUtils::HasAssignedStream(*subgraph)) {
continue;
}
const int64_t stream_id = subgraph->stream_id;
const std::string &engine_name = subgraph->engine_conf.id;
const auto &compute_graph = subgraph->subgraph_info.GetSubGraph();
GE_CHECK_NOTNULL(compute_graph);
for (const NodePtr &node : compute_graph->GetDirectNode()) {
GE_CHECK_NOTNULL(node->GetOpDesc());
node->GetOpDesc()->SetStreamId(stream_id);
GELOGD("[Assign][StreamId]id:%lld for Node %s of type %s in subgraph %s (engine: %s).", stream_id,
node->GetName().c_str(), node->GetType().c_str(), subgraph->name.c_str(), engine_name.c_str());
}
}
return SUCCESS;
}
Status DynamicStreamAllocator::ReassignStreamByStreamLabel(const ComputeGraphPtr &graph) {
for (const auto &node : graph->GetDirectNode()) {
const auto &op_desc = node->GetOpDesc();
GE_CHECK_NOTNULL(op_desc);
std::string stream_label;
if ((!AttrUtils::GetStr(op_desc, ATTR_NAME_STREAM_LABEL, stream_label)) || (stream_label.empty())) {
continue;
}
const auto iter = stream_label_ids_.find(stream_label);
if (iter == stream_label_ids_.end()) {
op_desc->SetStreamId(next_stream_);
stream_label_ids_.emplace(stream_label, next_stream_);
++next_stream_;
} else {
op_desc->SetStreamId(iter->second);
}
GELOGI("Node: %s, stream_label: %s, reassign stream id: %lld.", node->GetName().c_str(), stream_label.c_str(),
op_desc->GetStreamId());
}
return SUCCESS;
}
Status DynamicStreamAllocator::GetNodeNumOfPerStream(const ComputeGraphPtr &graph,
std::vector<int64_t> &stream_node_num) const {
for (const auto &node : graph->GetDirectNode()) {
const auto &op_desc = node->GetOpDesc();
GE_CHECK_NOTNULL(op_desc);
if (op_desc->GetStreamId() == kInvalidStream) {
continue;
}
GE_ASSERT_TRUE(op_desc->GetStreamId() < next_stream_, "Node: %s, stream_id: %lld, should less than %lld.",
node->GetName().c_str(), op_desc->GetStreamId(), next_stream_);
stream_node_num[static_cast<size_t>(op_desc->GetStreamId())]++;
}
return SUCCESS;
}
Status DynamicStreamAllocator::RetainEngineOwingNodes(const ComputeGraphPtr &root_graph) {
std::vector<int64_t> stream_node_num(next_stream_, kMainStream);
GE_ASSERT_SUCCESS(GetNodeNumOfPerStream(root_graph, stream_node_num));
for (const auto &subgraph : root_graph->GetAllSubgraphs()) {
if (subgraph->GetGraphUnknownFlag()) {
GE_ASSERT_SUCCESS(GetNodeNumOfPerStream(subgraph, stream_node_num));
}
}
std::map<std::string, int64_t> final_engine_streams;
for (const auto &engine_stream : engine_streams_) {
const int64_t stream_id = engine_stream.second;
GE_ASSERT_TRUE(stream_id >= kMainStream, "Engine: %s, stream_id: %lld, should greater than 0.",
engine_stream.first.c_str(), stream_id);
GE_ASSERT_TRUE(stream_id < next_stream_, "Engine: %s, stream_id: %lld, should less than %lld.",
engine_stream.first.c_str(), stream_id, next_stream_);
if (stream_node_num[stream_id] > 0) {
final_engine_streams.emplace(engine_stream);
}
}
engine_streams_ = std::move(final_engine_streams);
return SUCCESS;
}
Status DynamicStreamAllocator::RefreshContinuousStreams(const ComputeGraphPtr &root_graph) {
GE_ASSERT_SUCCESS(RetainEngineOwingNodes(root_graph));
const auto &engine_priority = StreamUtils::GetEnginePriority();
auto compare = [&engine_priority](const std::string &engine1, const std::string &engine2) {
if (engine_priority.at(engine1) < engine_priority.at(engine2)) {
return true;
}
return engine1 < engine2;
};
std::map<std::string, int64_t, decltype(compare)> engine_sort(compare);
for (const auto &engine_stream : engine_streams_) {
GE_ASSERT_TRUE(!engine_stream.first.empty(), "Engine name is empty, please check!");
GE_ASSERT_TRUE(engine_priority.find(engine_stream.first) != engine_priority.end(),
"Engine name %s is invaild, please check!", engine_stream.first.c_str());
engine_sort.emplace(engine_stream.first, engine_stream.second);
}
stream_num_ = 0;
std::map<int64_t, int64_t> old_to_new_streams;
for (const auto &engine_stream : engine_sort) {
old_to_new_streams[engine_stream.second] = stream_num_;
GELOGI("Refresh stream of engine: %s from %lld to %lld.", engine_stream.first.c_str(), engine_stream.second,
stream_num_);
++stream_num_;
}
for (const auto &stream_label_id : stream_label_ids_) {
old_to_new_streams[stream_label_id.second] = stream_num_;
GELOGI("Refresh stream of label: %s from %lld to %lld.", stream_label_id.first.c_str(), stream_label_id.second,
stream_num_);
++stream_num_;
}
if (stream_num_ == 0) {
GELOGI("None of nodes need to assign stream, stream num is 0, it will cause problem, so change it to 1");
stream_num_ = 1;
}
GE_ASSERT_SUCCESS(RefreshStreamsForGraph(root_graph, old_to_new_streams));
for (const auto &subgraph : root_graph->GetAllSubgraphs()) {
if (subgraph->GetGraphUnknownFlag()) {
GE_ASSERT_SUCCESS(RefreshStreamsForGraph(subgraph, old_to_new_streams));
}
}
return SUCCESS;
}
bool DynamicStreamAllocator::IsForcedAssignMainStream(const NodePtr &node) const {
const auto &node_type = node->GetType();
if (OpTypeUtils::IsDataNode(node_type) || (kNodesForcedInMainStream.count(node_type) > 0U) ||
OpTypeUtils::IsVariableNode(node_type) || OpTypeUtils::IsConstPlaceHolderNode(node_type) ||
ConstantUtils::IsConstant(node)) {
GELOGD("Node: %s, type: %s, is forced to assign main stream.", node->GetNamePtr(), node_type.c_str());
return true;
}
return false;
}
Status DynamicStreamAllocator::RefreshStreamsForGraph(const ComputeGraphPtr &graph,
const std::map<int64_t, int64_t> &old_to_new_streams) {
for (const auto &node : graph->GetDirectNode()) {
const auto &op_desc = node->GetOpDesc();
GE_CHECK_NOTNULL(op_desc);
if ((op_desc->GetStreamId() == kInvalidStream) || (!op_desc->GetSubgraphInstanceNames().empty()) ||
old_to_new_streams.empty() || IsForcedAssignMainStream(node)) {
op_desc->SetStreamId(kMainStream);
} else {
const auto iter = old_to_new_streams.find(op_desc->GetStreamId());
GE_ASSERT_TRUE(iter != old_to_new_streams.end(), "Cannot find stream: %lld of %s.", op_desc->GetStreamId(),
op_desc->GetName().c_str());
op_desc->SetStreamId(iter->second);
}
stream_nodes_[op_desc->GetStreamId()].emplace_back(node);
GELOGD("Refresh stream of graph: %s, stream_id: %lld, type: %s, name: %s.", graph->GetName().c_str(),
op_desc->GetStreamId(), node->GetType().c_str(), node->GetName().c_str());
}
return SUCCESS;
}
Status DynamicStreamAllocator::InsertEvents(const ComputeGraphPtr &root_graph) {
GE_ASSERT_SUCCESS(ConvertEdgesToEvents(root_graph));
for (const auto &subgraph : root_graph->GetAllSubgraphs()) {
if (subgraph->GetGraphUnknownFlag()) {
GE_ASSERT_SUCCESS(ConvertEdgesToEvents(subgraph));
}
}
GE_ASSERT_SUCCESS(OptimizeEvents());
uint32_t tmp_event = 0U;
GE_ASSERT_SUCCESS(StreamUtils::RefreshContinuousEvents(tmp_event, node_to_send_events_, node_to_recv_events_));
event_num_ = static_cast<int64_t>(tmp_event);
GE_ASSERT_SUCCESS(SetEventAttrToNodes(root_graph));
for (const auto &subgraph : root_graph->GetAllSubgraphs()) {
if (subgraph->GetGraphUnknownFlag()) {
GE_ASSERT_SUCCESS(SetEventAttrToNodes(subgraph));
}
}
return SUCCESS;
}
Status DynamicStreamAllocator::ConvertEdgesToEvents(const ComputeGraphPtr &graph) {
for (const auto &cur_node : graph->GetDirectNode()) {
for (const OutDataAnchorPtr &anchor : cur_node->GetAllOutDataAnchors()) {
for (const InDataAnchorPtr &peer_in_anchor : anchor->GetPeerInDataAnchors()) {
NodePtr next_node = peer_in_anchor->GetOwnerNode();
GE_ASSERT_SUCCESS(InsertEventBetweenTwoNodes(cur_node, next_node), "Insert event from %s to %s failed.",
cur_node->GetName().c_str(), next_node->GetName().c_str());
}
}
if (cur_node->GetOutControlAnchor() != nullptr) {
for (const auto peer_in_anchor : cur_node->GetOutControlAnchor()->GetPeerAnchorsPtr()) {
NodePtr next_node = peer_in_anchor->GetOwnerNode();
GE_ASSERT_SUCCESS(InsertEventBetweenTwoNodes(cur_node, next_node), "Insert event from %s to %s failed.",
cur_node->GetName().c_str(), next_node->GetName().c_str());
}
}
}
return SUCCESS;
}
Status DynamicStreamAllocator::InsertEventBetweenTwoNodes(const NodePtr &cur_node, const NodePtr &next_node) {
const auto &cur_desc = cur_node->GetOpDesc();
GE_CHECK_NOTNULL(cur_desc);
const auto &next_desc = next_node->GetOpDesc();
GE_CHECK_NOTNULL(next_desc);
const int64_t cur_stream_id = cur_desc->GetStreamId();
GE_ASSERT_TRUE(cur_stream_id != kInvalidStream, "Node: %s, stream id is invalid", cur_node->GetName().c_str());
const int64_t next_stream_id = next_desc->GetStreamId();
GE_ASSERT_TRUE(next_stream_id != kInvalidStream, "Node: %s, stream id is invalid", next_node->GetName().c_str());
if (cur_stream_id == next_stream_id) {
return SUCCESS;
}
StreamUtils::AddSendEventId(cur_node, static_cast<uint32_t>(event_num_), node_to_send_events_);
StreamUtils::AddRecvEventId(next_node, static_cast<uint32_t>(event_num_), node_to_recv_events_);
++event_num_;
GELOGI("Insert event %lld between node %s(stream %lld) and %s(stream %lld)", event_num_, cur_node->GetName().c_str(),
cur_stream_id, next_node->GetName().c_str(), next_stream_id);
return SUCCESS;
}
Status DynamicStreamAllocator::OptimizeEvents() {
for (auto &stream_nodes : stream_nodes_) {
std::sort(stream_nodes.second.begin(), stream_nodes.second.end(), TopoOrderCompare);
}
GE_ASSERT_SUCCESS(StreamUtils::OptimizeBySendEvents(stream_nodes_, node_to_send_events_, node_to_recv_events_));
GE_ASSERT_SUCCESS(StreamUtils::OptimizeByRecvEvents(stream_nodes_, node_to_send_events_, node_to_recv_events_));
return SUCCESS;
}
Status DynamicStreamAllocator::SetEventAttrToNodes(const ComputeGraphPtr &graph) {
for (const auto &node : graph->GetDirectNode()) {
const auto &op_desc = node->GetOpDesc();
GE_CHECK_NOTNULL(op_desc);
const auto send_event_ids = StreamUtils::GetSyncIdList(node, node_to_send_events_);
if (!send_event_ids.empty()) {
GE_ASSERT_TRUE(AttrUtils::SetListInt(op_desc, ATTR_NAME_SEND_EVENT_IDS, send_event_ids),
"Set attr _send_event_ides to %s failed, send_event_ids count: %zu.", op_desc->GetName().c_str(),
send_event_ids.size());
GELOGD("Node: %s, send_event_ids size: %zu.", op_desc->GetName().c_str(), send_event_ids.size());
}
const auto recv_event_ids = StreamUtils::GetSyncIdList(node, node_to_recv_events_);
if (!recv_event_ids.empty()) {
GE_ASSERT_TRUE(AttrUtils::SetListInt(op_desc, ATTR_NAME_RECV_EVENT_IDS, recv_event_ids),
"Set attr _recv_event_ides to %s failed, recv_event_ids count: %zu.", op_desc->GetName().c_str(),
recv_event_ids.size());
GELOGD("Node: %s, recv_event_ids size: %zu.", op_desc->GetName().c_str(), recv_event_ids.size());
}
}
return SUCCESS;
}
}