* Copyright (c) 2025 Huawei Technologies Co., Ltd.
* This program is free software, you can redistribute it and/or modify it under the terms and conditions of
* CANN Open Software License Agreement Version 2.0 (the "License").
* Please refer to the License for details. You may not use this file except in compliance with the License.
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED,
* INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE.
* See LICENSE in the root of the software repository for the full text of the License.
*/
#include "dflow_session_impl.h"
#include <cinttypes>
#include <map>
#include <memory>
#include <vector>
#include "dflow/inc/data_flow/model/flow_model_helper.h"
#include "dflow/executor/flow_model_manager.h"
#include "graph/ge_global_options.h"
#include "graph/utils/graph_utils_ex.h"
#include "graph/ge_local_context.h"
#include "graph/ge_context.h"
#include "graph/utils/tensor_adapter.h"
#include "dflow/base/exec_runtime/execution_runtime.h"
#include "external/ge/ge_api_v2.h"
#include "framework/common/helper/model_helper.h"
#include "external/ge/ge_ir_build.h"
#include "dflow/compiler/pne/process_node_engine_manager.h"
#include "common/compile_profiling/ge_call_wrapper.h"
#include "graph/debug/ge_attr_define.h"
namespace ge {
namespace {
std::vector<GeTensor> ToGeTensors(const std::vector<Tensor> &tensors) {
std::vector<GeTensor> ge_tensors;
ge_tensors.reserve(tensors.size());
for (auto &item : tensors) {
ge_tensors.emplace_back(TensorAdapter::AsGeTensor(item));
}
return ge_tensors;
}
std::vector<Tensor> ToTensors(const std::vector<GeTensor> &ge_tensors) {
std::vector<Tensor> tensors;
tensors.reserve(ge_tensors.size());
for (auto &item : ge_tensors) {
tensors.emplace_back(TensorAdapter::AsTensor(item));
}
return tensors;
}
Status ConvertStringMap(const std::map<std::string, std::string> &options,
std::map<ge::AscendString, ge::AscendString> &ascend_options) {
for (auto &option_item : options) {
if (option_item.first.empty()) {
GELOGE(ge::FAILED, "Construct session failed, option key is empty.");
REPORT_INNER_ERR_MSG("E19999", "Construct session failed, option key is empty.");
return FAILED;
}
const ge::AscendString &key = ge::AscendString(option_item.first.c_str());
const ge::AscendString &val = ge::AscendString(option_item.second.c_str());
ascend_options[key] = val;
}
return SUCCESS;
}
class DefaultNpuProcessNodeEngineImpl : public ProcessNodeEngineImpl {
public:
explicit DefaultNpuProcessNodeEngineImpl(std::shared_ptr<GeSession> ge_session)
: ge_session_(std::move(ge_session)) {}
~DefaultNpuProcessNodeEngineImpl() override = default;
Status BuildGraph(uint32_t graph_id, ComputeGraphPtr &compute_graph,
const std::map<std::string, std::string> &options, const std::vector<GeTensor> &inputs,
PneModelPtr &model) override {
GELOGD("Build graph begin, graph_id=%u, graph_name=%s.", graph_id, compute_graph->GetName().c_str());
GE_CHECK_NOTNULL(ge_session_);
std::map<ge::AscendString, ge::AscendString> ascend_options;
GE_CHK_STATUS_RET(ConvertStringMap(options, ascend_options), "Convert string to ascend string map.");
auto graph = GraphUtilsEx::CreateGraphFromComputeGraph(compute_graph);
GE_CHK_STATUS_RET(ge_session_->AddGraph(graph_id, graph, ascend_options),
"Failed to add graph, graph_id=%u, graph_name=%s.", graph_id, compute_graph->GetName().c_str());
ScopeGuard guard([this, graph_id]() { return ge_session_->RemoveGraph(graph_id); });
GE_CHK_STATUS_RET(ge_session_->CompileGraph(graph_id, ge::ToTensors(inputs)),
"Compile graph failed, graph_id=%u, graph_name=%s.", graph_id, compute_graph->GetName().c_str());
ge::ModelBufferData model_buffer_data{};
GE_CHK_STATUS_RET(ge_session_->GetCompiledModel(graph_id, model_buffer_data), "Failed to get built model");
ge::ModelData model_data{};
model_data.model_data = model_buffer_data.data.get();
model_data.model_len = model_buffer_data.length;
model = FlowModelHelper::ToPneModel(model_data, compute_graph);
GE_CHECK_NOTNULL(model);
GELOGD("Build graph end, graph_id=%u, graph_name=%s.", graph_id, compute_graph->GetName().c_str());
return SUCCESS;
}
private:
std::shared_ptr<GeSession> ge_session_;
};
Status EnsureDflowInitialized(const std::map<std::string, std::string> &options) {
static std::mutex mu;
std::lock_guard<std::mutex> lk(mu);
if (ExecutionRuntime::GetInstance() == nullptr) {
GE_TIMESTAMP_START(InitializeExecutionRuntime);
GE_CHK_STATUS_RET_NOLOG(ExecutionRuntime::InitHeterogeneousRuntime(options));
GE_TIMESTAMP_EVENT_END(InitializeExecutionRuntime, "InitializeExecutionRuntime");
}
Status init_pne_status = ProcessNodeEngineManager::GetInstance().Initialize(options);
if (init_pne_status != SUCCESS) {
GELOGE(init_pne_status, "[Init][EngineManager]GE process node engine manager initial failed.");
REPORT_INNER_ERR_MSG("E19999", "Process node engine manager initial failed.");
return init_pne_status;
}
return SUCCESS;
}
}
Status DFlowInitializeInner(const std::map<AscendString, AscendString> &options) {
std::map<std::string, std::string> str_options;
for (const auto &option_item : options) {
if (option_item.first.GetLength() == 0) {
GELOGE(FAILED, "[Check][Param] DFlowInitializeInner failed, option key is empty.");
REPORT_INNER_ERR_MSG("E19999", "Check parameter's options invalid, option key is empty.");
return FAILED;
}
std::string key = option_item.first.GetString();
std::string val = option_item.second.GetString();
str_options[key] = val;
}
GE_CHK_STATUS_RET(EnsureDflowInitialized(str_options), "Failed to init execution runtime");
return SUCCESS;
}
void DFlowFinalizeInner() {
(void)ProcessNodeEngineManager::GetInstance().Finalize();
ExecutionRuntime::FinalizeExecutionRuntime();
}
DFlowSessionImpl::DFlowSessionImpl(uint64_t session_id, const std::map<std::string, std::string> &options)
: session_id_(session_id), options_(options) {}
DFlowSessionImpl::~DFlowSessionImpl() {
(void)Finalize();
}
Status DFlowSessionImpl::Initialize(const std::map<std::string, std::string> &options) {
if (is_initialized_) {
GELOGI("[DFlowSessionImpl:%" PRIu64 "] session already initialize.", session_id_);
return SUCCESS;
}
std::map<ge::AscendString, ge::AscendString> ascend_options;
GE_CHK_STATUS_RET(ConvertStringMap(options, ascend_options), "Convert string to ascend string map.");
ge::AscendString graph_key("ge.graph_key");
ge::AscendString cache_dir("ge.graph_compiler_cache_dir");
ge::AscendString external_weight("ge.externalWeightDir");
if (ascend_options.find(graph_key) != ascend_options.end() && ascend_options.find(cache_dir) != ascend_options.end()) {
if (ascend_options.find(external_weight) != ascend_options.end()) {
ascend_options[external_weight] = ascend_options[cache_dir];
ascend_options.erase(graph_key);
ascend_options.erase(cache_dir);
}
}
ge_session_ = MakeShared<GeSession>(ascend_options);
GE_CHECK_NOTNULL(ge_session_);
GE_CHK_STATUS_RET(EnsureDflowInitialized(options_), "Failed to init dflow.");
auto pneImpl = MakeShared<DefaultNpuProcessNodeEngineImpl>(ge_session_);
GE_CHECK_NOTNULL(pneImpl);
GE_CHK_STATUS_RET(dflow_graph_manager_.Initialize(options_, pneImpl), "Failed to init dflow graph manager");
is_initialized_ = true;
return SUCCESS;
}
Status DFlowSessionImpl::Finalize() {
std::lock_guard<std::mutex> lock(resource_mutex_);
if (!is_initialized_) {
GELOGW("[DFlowSessionImpl:%" PRIu64 "] session does not initialize.", session_id_);
return SUCCESS;
}
{
std::lock_guard<std::mutex> model_lock(model_mutex_);
for (uint32_t model_id : loaded_models_) {
(void)FlowModelManager::GetInstance().Unload(model_id);
}
loaded_models_.clear();
}
dflow_graph_manager_.Finalize();
is_initialized_ = false;
return SUCCESS;
}
Status DFlowSessionImpl::AddGraph(uint32_t graph_id, const dflow::FlowGraph &graph,
const std::map<std::string, std::string> &options) {
std::lock_guard<std::mutex> lock(resource_mutex_);
for (const auto &item : options) {
GELOGI("DFlow option: %s, value: %s, dflowInnerSession:%" PRIu64 ", graph id: %u.", item.first.c_str(), item.second.c_str(),
session_id_, graph_id);
}
const auto &ge_graph = graph.ToGeGraph();
auto compute_graph = GraphUtilsEx::GetComputeGraph(ge_graph);
GE_CHECK_NOTNULL(compute_graph);
const uint64_t ge_session_id = ge_session_->GetSessionId();
compute_graph->SetSessionID(ge_session_id);
std::string session_graph_id = std::to_string(ge_session_id) + "_" + std::to_string(graph_id);
(void)AttrUtils::SetStr(*compute_graph, ATTR_NAME_SESSION_GRAPH_ID, session_graph_id);
for (auto &sub_graph : compute_graph->GetAllSubgraphs()) {
sub_graph->SetSessionID(ge_session_id);
(void)AttrUtils::SetStr(*sub_graph, ATTR_NAME_SESSION_GRAPH_ID, session_graph_id);
}
UpdateThreadContext(options);
GE_CHK_STATUS_RET(dflow_graph_manager_.AddGraph(graph_id, ge_graph, options),
"[Add][Graph] failed, DFlowSessionImpl:%" PRIu64 " graph id: %u.", session_id_, graph_id);
GELOGI("[DFlowSessionImpl:%" PRIu64 "] Add graph success, graph_id=%u.", session_id_, graph_id);
return SUCCESS;
}
Status DFlowSessionImpl::AddGraph(uint32_t graph_id, const Graph &graph,
const std::map<std::string, std::string> &options) {
std::lock_guard<std::mutex> lock(resource_mutex_);
auto compute_graph = GraphUtilsEx::GetComputeGraph(graph);
GE_CHECK_NOTNULL(compute_graph);
const uint64_t ge_session_id = ge_session_->GetSessionId();
compute_graph->SetSessionID(ge_session_id);
std::string session_graph_id = std::to_string(ge_session_id) + "_" + std::to_string(graph_id);
(void)AttrUtils::SetStr(*compute_graph, ATTR_NAME_SESSION_GRAPH_ID, session_graph_id);
for (auto &sub_graph : compute_graph->GetAllSubgraphs()) {
sub_graph->SetSessionID(ge_session_id);
(void)AttrUtils::SetStr(*sub_graph, ATTR_NAME_SESSION_GRAPH_ID, session_graph_id);
}
UpdateThreadContext(options);
GE_CHK_STATUS_RET(dflow_graph_manager_.AddGraph(graph_id, graph, options),
"FlowGraphManager AddGraph failed, DFlowInnerSession:%" PRIu64 " graph id: %u.", session_id_, graph_id);
GELOGI("[DFlowInnerSession:%" PRIu64 "] Add graph success, graph_id=%u.", session_id_, graph_id);
return SUCCESS;
}
Status DFlowSessionImpl::RemoveGraph(uint32_t graph_id) {
std::lock_guard<std::mutex> lock(resource_mutex_);
uint32_t model_id = INVALID_MODEL_ID;
(void)dflow_graph_manager_.GetGraphModelId(graph_id, model_id);
if (model_id != INVALID_MODEL_ID) {
(void)FlowModelManager::GetInstance().Unload(model_id);
loaded_models_.erase(model_id);
}
Status ret = dflow_graph_manager_.RemoveGraph(graph_id);
if (ret != SUCCESS) {
GELOGE(ret, "[Remove][Graph] failed, DFlowSessionImpl:%" PRIu64 " graph id: %u.", session_id_, graph_id);
REPORT_INNER_ERR_MSG("E19999", "FlowGraphManager RemoveGraph failed, DFlowSessionImpl:%" PRIu64 " graph id: %u.", session_id_, graph_id);
return ret;
}
GELOGI("[DFlowSessionImpl:%" PRIu64 "] Remove graph success, graph_id=%u.", session_id_, graph_id);
return SUCCESS;
}
FlowModelPtr DFlowSessionImpl::CompileAndLoadGraph(uint32_t graph_id, const std::vector<GeTensor> &inputs) {
std::lock_guard<std::mutex> lock(build_run_mutex_);
auto flow_model = dflow_graph_manager_.GetFlowModel(graph_id);
if (flow_model == nullptr) {
const auto ret = dflow_graph_manager_.CompileGraph(graph_id, inputs);
if (ret != SUCCESS) {
GELOGE(ret, "Dflow graph manager compile graph failed, graph_id=%u.", graph_id);
return nullptr;
}
flow_model = dflow_graph_manager_.GetFlowModel(graph_id);
if (flow_model == nullptr) {
GELOGE(FAILED, "Dflow graph manager compile graph success, but flow model is null, graph_id=%u.", graph_id);
return nullptr;
}
}
if (!dflow_graph_manager_.GetOptionsRunGraphFlag()) {
GELOGI("RunFlag is false, no need load graph.");
return flow_model;
}
if (flow_model->GetModelId() != INVALID_MODEL_ID) {
return flow_model;
}
GELOGI("Start to load dflow model.");
uint32_t model_id = INVALID_MODEL_ID;
Status load_ret = FlowModelManager::GetInstance().LoadFlowModel(model_id, flow_model);
if (load_ret != SUCCESS) {
GELOGE(load_ret, "Load flow model failed, graph_id=%u.", graph_id);
return nullptr;
}
{
std::lock_guard<std::mutex> model_guard(model_mutex_);
loaded_models_.emplace(model_id);
}
GELOGI("Load flow model success, graph_id=%u, model_id=%u.", graph_id, model_id);
return flow_model;
}
Status DFlowSessionImpl::CompileGraph(uint32_t graph_id, const std::vector<GeTensor> &ge_inputs) {
UpdateThreadContext(graph_id);
GE_CHK_STATUS_RET(dflow_graph_manager_.CompileGraph(graph_id, ge_inputs),
"[DFlowSessionImpl:%" PRIu64 "] compile graph failed, session_id_, graph_id=%u", graph_id);
GELOGI("[DFlowSessionImpl:%" PRIu64 "] Compile graph success, graph_id=%u.", session_id_, graph_id);
return SUCCESS;
}
Status DFlowSessionImpl::BuildGraph(uint32_t graph_id, const std::vector<Tensor> &inputs) {
UpdateThreadContext(graph_id);
std::vector<ge::GeTensor> ge_inputs = ToGeTensors(inputs);
return BuildGraph(graph_id, ge_inputs);
}
Status DFlowSessionImpl::BuildGraph(uint32_t graph_id, const std::vector<ge::GeTensor> &ge_inputs) {
UpdateThreadContext(graph_id);
const auto flow_model = CompileAndLoadGraph(graph_id, ge_inputs);
if (flow_model == nullptr) {
GELOGE(FAILED, "[Compile][Load] failed, DFlowSessionImpl:%" PRIu64 " graph_id=%u.", session_id_, graph_id);
REPORT_INNER_ERR_MSG("E19999", "FlowGraphManager BuildGraph failed, DFlowSessionImpl:%" PRIu64 " graph_id=%u.", session_id_, graph_id);
return FAILED;
}
GELOGI("[DFlowSessionImpl:%" PRIu64 "] Build graph success, graph_id=%u.", session_id_, graph_id);
return SUCCESS;
}
Status DFlowSessionImpl::RunGraph(uint32_t graph_id, const std::vector<Tensor> &inputs, std::vector<Tensor> &outputs) {
UpdateThreadContext(graph_id);
std::vector<ge::GeTensor> ge_inputs = ToGeTensors(inputs);
const auto flow_model = CompileAndLoadGraph(graph_id, ge_inputs);
if (flow_model == nullptr) {
GELOGE(FAILED, "[Compile][Load] failed, DFlowSessionImpl:%" PRIu64 " graph_id=%u.", session_id_, graph_id);
REPORT_INNER_ERR_MSG("E19999", "FlowGraphManager BuildGraph failed, DFlowSessionImpl:%" PRIu64 " graph_id=%u.", session_id_, graph_id);
return FAILED;
}
if (!dflow_graph_manager_.GetOptionsRunGraphFlag()) {
GEEVENT("Skip execute model result of run flag obtained is false.");
return SUCCESS;
}
std::vector<GeTensor> ge_outputs;
GE_CHK_STATUS_RET(
FlowModelManager::GetInstance().ExecuteFlowModel(flow_model->GetModelId(), ge_inputs, ge_outputs),
"execute flow model failed, graph_id=%u, model_id=%u", graph_id, flow_model->GetModelId());
outputs = ToTensors(ge_outputs);
GELOGI("run graph success, graph_id=%u.", session_id_, graph_id);
return SUCCESS;
}
Status DFlowSessionImpl::FeedDataFlowGraph(uint32_t graph_id, const std::vector<uint32_t> &indexes,
const std::vector<Tensor> &inputs, const DataFlowInfo &info, int32_t timeout) {
UpdateThreadContext(graph_id);
std::vector<GeTensor> ge_inputs = ToGeTensors(inputs);
const auto flow_model = CompileAndLoadGraph(graph_id, ge_inputs);
GE_CHK_BOOL_RET_STATUS(flow_model != nullptr, FAILED,
"[Build][Graph] failed, DFlowSessionImpl:%" PRIu64 " graph_id=%u.", session_id_, graph_id);
if (!dflow_graph_manager_.GetOptionsRunGraphFlag()) {
GEEVENT("Skip loading model result of run flag obtained is false.");
return SUCCESS;
}
auto model_id = flow_model->GetModelId();
auto heterogeneous_executor = FlowModelManager::GetInstance().GetHeterogeneousModelExecutor(model_id);
GE_CHECK_NOTNULL(heterogeneous_executor, ", model_id:%u.", model_id);
auto ret = heterogeneous_executor->FeedData(indexes, ge_inputs, info, timeout);
if (ret != SUCCESS && ret != ACL_ERROR_GE_REDEPLOYING && ret != ACL_ERROR_GE_SUBHEALTHY) {
GELOGE(FAILED, "[Feed][Data]failed, DFlowSession:%" PRIu64 ", graph_id=%u, model_id=%u.", session_id_, graph_id, model_id);
} else {
GELOGI("[DFlowSessionImpl:%" PRIu64 "] feed data flow graph success, graph_id=%u, model_id=%u.", session_id_, graph_id,
model_id);
}
return ret;
}
FlowModelPtr DFlowSessionImpl::GetFlowModel(uint32_t graph_id) const {
return dflow_graph_manager_.GetFlowModel(graph_id);
}
Status DFlowSessionImpl::FeedDataFlowGraph(uint32_t graph_id, const std::vector<uint32_t> &indexes,
const std::vector<FlowMsgPtr> &inputs, int32_t timeout) {
UpdateThreadContext(graph_id);
const std::vector<GeTensor> input_no_use = {};
const auto flow_model = CompileAndLoadGraph(graph_id, input_no_use);
GE_CHK_BOOL_RET_STATUS(flow_model != nullptr, FAILED,
"[Build][Graph] failed, DFlowSessionImpl:%" PRIu64 " graph_id=%u.", session_id_, graph_id);
if (!dflow_graph_manager_.GetOptionsRunGraphFlag()) {
GEEVENT("Skip loading model result of run flag obtained is false.");
return SUCCESS;
}
auto model_id = flow_model->GetModelId();
auto heterogeneous_executor = FlowModelManager::GetInstance().GetHeterogeneousModelExecutor(model_id);
GE_CHECK_NOTNULL(heterogeneous_executor, ", model_id:%u.", model_id);
auto ret = heterogeneous_executor->FeedFlowMsg(indexes, inputs, timeout);
GE_CHK_BOOL_RET_STATUS((ret == SUCCESS || ret == ACL_ERROR_GE_REDEPLOYING || ret == ACL_ERROR_GE_SUBHEALTHY),
ret, "[Feed][FlowMsg]failed, DFlowSession:%" PRIu64 ", graph_id=%u, model_id=%u.",
session_id_, graph_id, model_id);
GELOGI("[DFlowSession:%" PRIu64 "] feed data flow graph success, graph_id=%u, model_id=%u.", session_id_, graph_id, model_id);
return ret;
}
Status DFlowSessionImpl::FetchDataFlowGraph(uint32_t graph_id, const std::vector<uint32_t> &indexes,
std::vector<Tensor> &outputs, DataFlowInfo &info, int32_t timeout) {
UpdateThreadContext(graph_id);
uint32_t model_id = INVALID_MODEL_ID;
Status ret = dflow_graph_manager_.GetGraphModelId(graph_id, model_id);
if ((ret != SUCCESS) || (model_id == INVALID_MODEL_ID)) {
GELOGE(ret, "[Get][model] failed, DFlowSessionImpl:%" PRIu64 " graph_id=%u.", session_id_, graph_id);
REPORT_INNER_ERR_MSG("E19999", "Get model failed, DFlowSessionImpl:%" PRIu64 " graph_id=%u.", session_id_, graph_id);
return FAILED;
}
std::vector<GeTensor> ge_outputs;
auto heterogeneous_executor = FlowModelManager::GetInstance().GetHeterogeneousModelExecutor(model_id);
GE_CHECK_NOTNULL(heterogeneous_executor, ", model_id:%u.", model_id);
ret = heterogeneous_executor->FetchData(indexes, ge_outputs, info, timeout);
if (ret != SUCCESS && ret != ACL_ERROR_GE_REDEPLOYING && ret != ACL_ERROR_GE_SUBHEALTHY) {
GELOGE(FAILED, "[Fetch][Data]failed, DFlowSession:%" PRIu64 ", graph_id=%u, model_id=%u.", session_id_, graph_id, model_id);
} else {
outputs = ToTensors(ge_outputs);
GELOGI("[DFlowSession:%" PRIu64 "] Fetch data flow graph success, graph_id=%u, model_id=%u.", session_id_, graph_id,
model_id);
}
return ret;
}
Status DFlowSessionImpl::FetchDataFlowGraph(uint32_t graph_id, const std::vector<uint32_t> &indexes,
std::vector<FlowMsgPtr> &outputs, int32_t timeout) {
UpdateThreadContext(graph_id);
uint32_t model_id = INVALID_MODEL_ID;
Status ret = dflow_graph_manager_.GetGraphModelId(graph_id, model_id);
if ((ret != SUCCESS) || (model_id == INVALID_MODEL_ID)) {
GELOGE(ret, "[Get][model] failed, DFlowSessionImpl:%" PRIu64 " graph_id=%u.", session_id_, graph_id);
REPORT_INNER_ERR_MSG("E19999", "Get model failed, DFlowSessionImpl:%" PRIu64 " graph_id=%u.", session_id_, graph_id);
return FAILED;
}
auto heterogeneous_executor = FlowModelManager::GetInstance().GetHeterogeneousModelExecutor(model_id);
GE_CHECK_NOTNULL(heterogeneous_executor, ", model_id:%u.", model_id);
ret = heterogeneous_executor->FetchFlowMsg(indexes, outputs, timeout);
GE_CHK_BOOL_RET_STATUS((ret == SUCCESS || ret == ACL_ERROR_GE_REDEPLOYING || ret == ACL_ERROR_GE_SUBHEALTHY),
ret, "[Fetch][FlowMsg]failed, DFlowSession:%" PRIu64 ", graph_id=%u, model_id=%u.",
session_id_, graph_id, model_id);
GELOGI("[DFlowSession:%" PRIu64 "] Fetch data flow graph success, graph_id=%u, model_id=%u.", session_id_, graph_id,
model_id);
return ret;
}
Status DFlowSessionImpl::FeedRawData(uint32_t graph_id, const std::vector<RawData> &raw_data_list, const uint32_t index,
const DataFlowInfo &info, int32_t timeout) {
UpdateThreadContext(graph_id);
FlowModelPtr flow_model = dflow_graph_manager_.GetFlowModel(graph_id);
if (flow_model == nullptr) {
GELOGE(FAILED, "[Get][FlowModel] failed. Please make sure graph has been build before feed raw data, "
"DFlowSessionImpl:%" PRIu64 " graph_id=%u.", session_id_, graph_id);
REPORT_INNER_ERR_MSG("E19999", "[Get][FlowModel] failed. Please make sure graph has been build before feed raw data, "
"DFlowSessionImpl:%" PRIu64 " graph_id=%u.", session_id_, graph_id);
return FAILED;
}
if (!dflow_graph_manager_.GetOptionsRunGraphFlag()) {
GELOGI("Skip feed raw data as run flag is false.");
return SUCCESS;
}
GE_CHECK_NOTNULL(flow_model, ", DFlowSessionImpl:%" PRIu64 " graph_id=%u.", session_id_, graph_id);
const auto model_id = flow_model->GetModelId();
auto heterogeneous_executor = FlowModelManager::GetInstance().GetHeterogeneousModelExecutor(model_id);
GE_CHECK_NOTNULL(heterogeneous_executor, ", model_id:%u.", model_id);
const auto ret = heterogeneous_executor->FeedRawData(raw_data_list, index, info, timeout);
if (ret != SUCCESS && ret != ACL_ERROR_GE_REDEPLOYING && ret != ACL_ERROR_GE_SUBHEALTHY) {
GELOGE(FAILED, "[Feed][Data]failed, DFlowSessionImpl:%" PRIu64 ", graph_id=%u, model_id=%u.", session_id_, graph_id,
model_id);
} else {
GELOGI("[DFlowSessionImpl:%" PRIu64 "] feed raw data flow graph success, graph_id=%u, model_id=%u.", session_id_, graph_id,
model_id);
}
return ret;
}
void DFlowSessionImpl::UpdateThreadContext(const std::map<std::string, std::string> &options) const {
UpdateGlobalSessionContext();
GetThreadLocalContext().SetGraphOption(options);
}
void DFlowSessionImpl::UpdateGlobalSessionContext() const {
{
auto &global_options_mutex = GetGlobalOptionsMutex();
const std::lock_guard<std::mutex> lock(global_options_mutex);
GetThreadLocalContext().SetGlobalOption(GetMutableGlobalOptions());
}
GetThreadLocalContext().SetSessionOption(options_);
GetContext().SetSessionId(ge_session_->GetSessionId());
domi::GetContext().train_flag = false;
SetRtSocVersion();
}
void DFlowSessionImpl::UpdateThreadContext(uint32_t graph_id) {
auto options = dflow_graph_manager_.GetGraphOptions(graph_id);
if (options == nullptr) {
GELOGW("graph level options is null.");
UpdateThreadContext(std::map<std::string, std::string>{});
} else {
UpdateThreadContext(*options);
}
}
void DFlowSessionImpl::SetRtSocVersion() const {
auto &global_options_mutex = GetGlobalOptionsMutex();
const std::lock_guard<std::mutex> lock(global_options_mutex);
const auto &global_options = GetMutableGlobalOptions();
auto it = global_options.find(ge::SOC_VERSION);
if (it != global_options.end()) {
rtError_t rt_ret = rtSetSocVersion(it->second.c_str());
if (rt_ret != RT_ERROR_NONE) {
GELOGW("Set soc version %s failed. ret:0x%X", it->second.c_str(), rt_ret);
}
GELOGI("Set soc version %s success.", it->second.c_str());
}
}
}