* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "virtualenv_plugin_mgr_actor.h"
#include "async/async.hpp"
#include "async/asyncafter.hpp"
#include "common/constants/actor_name.h"
#include "common/constants/constants.h"
#include "common/logs/logging.h"
#include "common/proto/pb/message_pb.h"
#include "common/utils/time_utils.h"
#include "function_agent/plugin/process_util.h"
#include "function_agent/plugin/remote_plugin_client.h"
namespace functionsystem::function_agent {
std::unordered_map<std::string, std::string> KEY_WORDS_TO_PLUGIN_ID_MAP = { { "venv", "venvPlugin" } };
inline std::string GetValueOfDeployOptionsMap(const google::protobuf::Map<std::string, std::string> &map,
const std::string &key)
{
if (auto it = map.find(key); it != map.end()) {
return it->second;
}
return "";
}
inline EnvInfo ConstructEnvInfoFromRequestMap(const google::protobuf::Map<std::string, std::string> &map)
{
auto envName = GetValueOfDeployOptionsMap(map, VIRTUALENV_NAME);
auto envType = GetValueOfDeployOptionsMap(map, VIRTUALENV_KIND);
std::string pluginID = "";
if (auto pluginIDIter = KEY_WORDS_TO_PLUGIN_ID_MAP.find(envType);
pluginIDIter != KEY_WORDS_TO_PLUGIN_ID_MAP.end()) {
pluginID = pluginIDIter->second;
}
return EnvInfo{ {}, pluginID, envName, envType };
}
void VirtualEnvPluginMgrActor::Init()
{
ActorBase::Init();
}
VirtualEnvPluginMgrActor::VirtualEnvPluginMgrActor(const std::string &name) : ActorBase(name)
{
}
VirtualEnvPluginMgrActor::~VirtualEnvPluginMgrActor()
{
}
litebus::Future<Status> VirtualEnvPluginMgrActor::LoadPlugin(const PluginGroup &pluginGroup)
{
if (pluginGroup.empty()) {
return Status::OK();
}
for (const auto &pluginInfo : pluginGroup) {
auto result = LoadPlugin(pluginInfo);
if (result.IsError() && pluginInfo.critical) {
return result;
}
}
return Initialize(pluginGroup);
}
litebus::Future<Status> VirtualEnvPluginMgrActor::Initialize(const PluginGroup &pluginGroup)
{
for (const auto &pluginInfo : pluginGroup) {
auto result = InitPlugin(pluginInfo);
if (result.IsError() && pluginInfo.critical) {
return result;
}
}
litebus::AsyncAfter(envRefRecycleInterval_, GetAID(), &VirtualEnvPluginMgrActor::RecycleUnusedEnvs);
litebus::AsyncAfter(healthCheckInterval_, GetAID(), &VirtualEnvPluginMgrActor::HealthCheck);
return Status::OK();
}
void VirtualEnvPluginMgrActor::Finalize()
{
(void)litebus::TimerTools::Cancel(recycleUnusedEnvsTimer_);
(void)litebus::TimerTools::Cancel(healthCheckTimer_);
for (const auto &pluginConfigPair : pluginConfigs_) {
auto pluginID = pluginConfigPair.first;
UnloadPlugin(pluginID);
}
pluginConfigs_.clear();
envReferInfos_.clear();
ActorBase::Finalize();
}
litebus::Future<Status> VirtualEnvPluginMgrActor::CallVirtualEnvMgrMethod(
std::shared_ptr<agent_plugin::PluginRequest> request)
{
auto method = request->method();
auto runtimeID = GetValueOfDeployOptionsMap(request->metadata(), CONSTANT_RUNTIME_ID);
if (method == METHOD_INCREASE_REF) {
functionsystem::messages::DeployInstanceRequest deployInstanceRequest;
if (!request->payload().UnpackTo(&deployInstanceRequest)) {
return Status(StatusCode::FAILED, "failed to unpack PluginRequest to deployInstanceRequest");
}
EnvInfo envInfo = ConstructEnvInfoFromRequestMap(deployInstanceRequest.createoptions());
AddEnvReferInfos(envInfo.envName, envInfo.envType, envInfo.pluginID, runtimeID);
YRLOG_INFO("{}|{}|increase virtual env {}_{} ref with runtime {} succeed", request->traceid(),
request->requestid(), envInfo.envName, envInfo.envType, runtimeID);
} else if (method == METHOD_DECREASE_REF) {
RmEnvReferInfos(runtimeID);
YRLOG_INFO("{}|{}|decrease virtual env ref with runtime {} succeed", request->traceid(), request->requestid(),
runtimeID);
} else {
return Status(StatusCode::FAILED, "method (" + method + ") is not supported.");
}
return Status::OK();
}
std::shared_ptr<PluginClient> VirtualEnvPluginMgrActor::GetPlugin(const std::string &pluginID)
{
auto pluginClientIter = pluginClients_.find(pluginID);
if (pluginClientIter == pluginClients_.end()) {
return nullptr;
}
return pluginClientIter->second;
}
litebus::Future<Status> VirtualEnvPluginMgrActor::CallPluginImpl(std::shared_ptr<agent_plugin::PluginRequest> request,
std::shared_ptr<agent_plugin::PluginResponse> response,
std::shared_ptr<PluginClient> pluginClient)
{
return pluginClient->Call(*request, *response);
}
litebus::Future<Status> VirtualEnvPluginMgrActor::CallSpecificPlugin(
const std::string &pluginID, std::shared_ptr<agent_plugin::PluginRequest> request,
std::shared_ptr<agent_plugin::PluginResponse> response)
{
if (!IsHealthy(pluginID)) {
YRLOG_ERROR("{}|{}|plugin {} is not healthy", request->traceid(), request->requestid(), pluginID);
return Status(StatusCode::FAILED, "plugin is not healthy");
}
auto method = request->method();
if (method == METHOD_PREPARE) {
auto pluginClient = GetPlugin(pluginID);
if (pluginClient == nullptr) {
YRLOG_INFO("{}|{}|Call plugin {} failed, pluginClient is null", request->traceid(), request->requestid(),
pluginID);
return Status(StatusCode::POINTER_IS_NULL);
}
messages::DeployInstanceRequest deployInstanceRequest;
if (!request->payload().UnpackTo(&deployInstanceRequest)) {
return Status(StatusCode::FAILED, "failed to unpack PluginRequest to deployInstanceRequest");
}
agent_plugin::PrepareRequest prepareRequest;
prepareRequest.set_language(deployInstanceRequest.language());
for (const std::string &str : VIRTUALENV_KEYS) {
if (auto iter(deployInstanceRequest.createoptions().find(str));
iter != deployInstanceRequest.createoptions().end()) {
prepareRequest.mutable_createoptions()->insert({ str, iter->second });
}
}
auto delegateIter = deployInstanceRequest.createoptions().find(ENV_DELEGATE_DOWNLOAD);
if (delegateIter != deployInstanceRequest.createoptions().end()) {
prepareRequest.mutable_createoptions()->insert({ ENV_DELEGATE_DOWNLOAD, delegateIter->second });
}
EnvInfo envInfo = ConstructEnvInfoFromRequestMap(deployInstanceRequest.createoptions());
const std::string envNameWithType = envInfo.envName + "_" + envInfo.envType;
if (auto iter = envReferInfos_.find(envNameWithType); iter != envReferInfos_.end()) {
prepareRequest.mutable_createoptions()->insert({ CONSTANT_VIRTUALENV_EXISTS, "true" });
}
request->mutable_payload()->PackFrom(prepareRequest);
return litebus::Async(GetAID(), &VirtualEnvPluginMgrActor::CallPluginImpl, request, response, pluginClient);
}
return Status(StatusCode::FAILED, "method (" + method + ") is not supported.");
}
litebus::Future<Status> VirtualEnvPluginMgrActor::CallPlugin(const std::string &pluginID,
std::shared_ptr<agent_plugin::PluginRequest> request,
std::shared_ptr<agent_plugin::PluginResponse> response)
{
if (pluginID == CONSTANT_VIRTUALENV_PLUGIN_MGR) {
return CallVirtualEnvMgrMethod(request);
}
return CallSpecificPlugin(pluginID, request, response);
}
Status VirtualEnvPluginMgrActor::LoadPlugin(const PluginInfo &pluginInfo)
{
if (!pluginInfo.enabled) {
YRLOG_INFO("no need to load plugin {}, plugin is enabled {}", pluginInfo.id, pluginInfo.enabled);
return Status::OK();
}
SetVirtualEnvIdleTime(pluginInfo);
const auto binaryPathIt = pluginInfo.extra_params.find(BINARY_PATH);
if (binaryPathIt != pluginInfo.extra_params.end()) {
auto execPtr = CreatePluginProcess(pluginInfo);
if (execPtr == nullptr || execPtr->GetPid() == -1) {
YRLOG_WARN("failed to create exec for plugin {}, execPtr is nullptr {}", pluginInfo.id, execPtr == nullptr);
return Status(StatusCode::FAILED, "start plugin failed");
}
pluginID2ExecPtr_[pluginInfo.id] = std::move(execPtr);
}
pluginConfigs_[pluginInfo.id] = std::move(pluginInfo);
YRLOG_INFO("Load plugin {} succeed", pluginInfo.id);
return Status::OK();
}
Status VirtualEnvPluginMgrActor::InitPlugin(const PluginInfo &pluginInfo)
{
if (!pluginInfo.enabled) {
YRLOG_INFO("no need to init plugin {}, plugin is not enabled", pluginInfo.id);
return Status::OK();
}
if (pluginInfo.type == TYPE_GRPC) {
auto remotePluginClient = std::make_shared<RemotePluginClient>(pluginInfo.address, pluginInfo.callTimeout);
pluginClients_[pluginInfo.id] = std::move(remotePluginClient);
} else {
YRLOG_ERROR("plugin type is not supported, id: {}, type: {}", pluginInfo.id, pluginInfo.type);
return Status(StatusCode::FAILED, "plugin type is not supported");
}
healthyCheckResults_[pluginInfo.id] = true;
return Status::OK();
}
void VirtualEnvPluginMgrActor::HealthCheck()
{
for (const auto &pluginConfigPair : pluginConfigs_) {
auto pluginID = pluginConfigPair.first;
auto pluginClient = GetPlugin(pluginID);
if (pluginClient == nullptr) {
YRLOG_DEBUG("Health check of plugin {} failed, pluginClient is null", pluginID);
healthyCheckResults_[pluginID] = false;
continue;
}
const std::string traceID = litebus::uuid_generator::UUID::GetRandomUUID().ToString();
agent_plugin::PluginRequest pluginRequest;
pluginRequest.set_traceid(traceID);
pluginRequest.set_method(METHOD_HEALTHCHECK);
agent_plugin::PluginResponse pluginResponse;
Status status = pluginClient->Call(pluginRequest, pluginResponse);
YRLOG_DEBUG("{}|Health check of plugin {}, result is {}", traceID, pluginID, status.ToString());
healthyCheckResults_[pluginID] = !(status.IsError());
}
std::set<std::string> pluginClientNeeToReload;
for (const auto &healthyCheckResult : healthyCheckResults_) {
if (!healthyCheckResult.second) {
YRLOG_INFO("plugin {} is not healthy, need to reload", healthyCheckResult.first);
pluginClientNeeToReload.insert(healthyCheckResult.first);
}
}
for (const auto &pluginID : pluginClientNeeToReload) {
UnloadPlugin(pluginID);
const auto it = pluginConfigs_.find(pluginID);
if (it != pluginConfigs_.end()) {
LoadPlugin(it->second);
InitPlugin(it->second);
} else {
YRLOG_ERROR("plugin {} is not in cache, we can not reload it", pluginID);
}
}
healthCheckTimer_ = litebus::AsyncAfter(healthCheckInterval_, GetAID(), &VirtualEnvPluginMgrActor::HealthCheck);
}
void VirtualEnvPluginMgrActor::UnloadPlugin(const std::string &pluginID)
{
healthyCheckResults_.erase(pluginID);
auto pluginClient = GetPlugin(pluginID);
if (pluginClient != nullptr) {
pluginClient->Close();
} else {
YRLOG_DEBUG("maybe no need to unload plugin {}, pluginClient is null", pluginID);
}
auto execPtrIter = pluginID2ExecPtr_.find(pluginID);
if (execPtrIter != pluginID2ExecPtr_.end()) {
KillPluginProcess(execPtrIter->second->GetPid(), pluginID);
} else {
YRLOG_DEBUG("no need to stop plugin process, pluginID {}", pluginID);
}
pluginClients_.erase(pluginID);
pluginID2ExecPtr_.erase(pluginID);
}
bool VirtualEnvPluginMgrActor::IsHealthy(const std::string &pluginID)
{
if (pluginID == CONSTANT_VIRTUALENV_PLUGIN_MGR) {
return true;
}
const auto healthCheckResultsIt = healthyCheckResults_.find(pluginID);
if (healthCheckResultsIt != healthyCheckResults_.end()) {
return healthCheckResultsIt->second;
}
return false;
}
litebus::Future<bool> VirtualEnvPluginMgrActor::RecoverCache(const std::string &message)
{
YRLOG_INFO("Start to recover cache");
messages::RegisterRuntimeManagerRequest req;
if (!req.ParseFromString(message)) {
YRLOG_ERROR("failed to parse RuntimeManager register message");
return false;
}
auto requestInstanceInfos = req.runtimeinstanceinfos();
for (const auto &it : requestInstanceInfos) {
auto runtimeID = it.first;
auto info = it.second;
auto envName = GetValueOfDeployOptionsMap(info.deploymentconfig().deployoptions(), VIRTUALENV_NAME);
if (envName == "") {
YRLOG_INFO("invalid envName, maybe runtime {} not use virtual env, envName {}", runtimeID, envName);
continue;
}
EnvInfo envInfo = ConstructEnvInfoFromRequestMap(info.deploymentconfig().deployoptions());
if (envInfo.envType == "" || envInfo.pluginID == "") {
YRLOG_INFO("invalid env message, maybe runtime {} not use virtual env, envName {}, envType {}, pluginID {}",
runtimeID, envInfo.envName, envInfo.envType, envInfo.pluginID);
continue;
}
YRLOG_INFO("Recover env refer info with runtimeID {}, envName {}, envType {}, pluginID {}", runtimeID,
envInfo.envName, envInfo.envType, envInfo.pluginID);
AddEnvReferInfos(envInfo.envName, envInfo.envType, envInfo.pluginID, runtimeID);
}
YRLOG_INFO("Finish recovering cache");
return true;
}
litebus::Future<std::string> VirtualEnvPluginMgrActor::GetPluginID(std::shared_ptr<agent_plugin::PluginRequest> request)
{
functionsystem::messages::DeployInstanceRequest deployInstanceRequest;
if (!request->payload().UnpackTo(&deployInstanceRequest)) {
YRLOG_WARN("{}|{}|failed to unpack PluginRequest to deployInstanceRequest", request->traceid(),
request->requestid());
return "";
}
if (auto it = deployInstanceRequest.createoptions().find(VIRTUALENV_KIND);
it != deployInstanceRequest.createoptions().end()) {
auto keyWords = it->second;
YRLOG_DEBUG("{}|{}|GetPluginID of {}, pluginID is {}", request->traceid(), request->requestid(), it->first,
keyWords);
if (auto pluginIDIter = KEY_WORDS_TO_PLUGIN_ID_MAP.find(keyWords);
pluginIDIter != KEY_WORDS_TO_PLUGIN_ID_MAP.end()) {
return pluginIDIter->second;
}
}
return "";
}
void VirtualEnvPluginMgrActor::AddEnvReferInfos(const std::string &envName, const std::string &envType,
const std::string &pluginID, const std::string &runtimeID)
{
YRLOG_INFO("env {}_{} add one runtime {} with plugin {}", envName, envType, runtimeID, pluginID);
const std::string envNameWithType = envName + "_" + envType;
if (auto iter = envReferInfos_.find(envNameWithType); iter == envReferInfos_.end()) {
(void)envReferInfos_.emplace(envNameWithType, EnvInfo{ { runtimeID }, pluginID, envName, envType });
} else {
(void)iter->second.runtimeIds.emplace(runtimeID);
}
}
void VirtualEnvPluginMgrActor::RmEnvReferInfos(const std::string &runtimeId)
{
for (auto &pair : envReferInfos_) {
if (!pair.second.runtimeIds.count(runtimeId)) {
continue;
}
pair.second.runtimeIds.erase(runtimeId);
YRLOG_INFO("runtimeId:{} is removed on envName:{}", runtimeId, pair.first);
if (pair.second.runtimeIds.empty()) {
pair.second.lastRemoveTimestamp = GetCurrentTimestampMs();
}
break;
}
}
void VirtualEnvPluginMgrActor::GetEnvNameNeed2Clear(std::map<std::string, EnvInfo> &pluginID2EnvInfo)
{
for (auto envReferInfoIter = envReferInfos_.begin(); envReferInfoIter != envReferInfos_.end();) {
auto envNameWithType = envReferInfoIter->first;
bool envIsIdle = envReferInfoIter->second.runtimeIds.empty();
const auto now = GetCurrentTimestampMs();
bool envIdleExceedLimit =
now - envReferInfoIter->second.lastRemoveTimestamp >= static_cast<uint64_t>(virtualEnvIdleTime_);
if (!envIsIdle || !envIdleExceedLimit) {
YRLOG_DEBUG("Env {} will not be deleted, envIsIdle {}, envIdleExceedLimit {}", envNameWithType, envIsIdle,
envIdleExceedLimit);
(void)++envReferInfoIter;
continue;
}
auto pluginID = envReferInfoIter->second.pluginID;
if (!IsHealthy(pluginID)) {
YRLOG_WARN("plugin {} not healthy, failed to clear env {}", pluginID, envNameWithType);
(void)++envReferInfoIter;
continue;
}
pluginID2EnvInfo.insert(
{ pluginID, EnvInfo{ {}, pluginID, envReferInfoIter->second.envName, envReferInfoIter->second.envType } });
(void)++envReferInfoIter;
}
}
void VirtualEnvPluginMgrActor::DoClearVirtualEnv()
{
std::map<std::string, EnvInfo> pluginID2EnvInfo;
GetEnvNameNeed2Clear(pluginID2EnvInfo);
for (auto pluginIDEnvInfoPair : pluginID2EnvInfo) {
auto pluginID = pluginIDEnvInfoPair.first;
auto envInfo = pluginIDEnvInfoPair.second;
agent_plugin::ClearRequest clearRequest;
clearRequest.mutable_clearoptions()->insert({ VIRTUALENV_NAME, envInfo.envName });
const std::string traceID = litebus::uuid_generator::UUID::GetRandomUUID().ToString();
auto pluginRequest = std::make_shared<agent_plugin::PluginRequest>();
pluginRequest->set_traceid(traceID);
pluginRequest->set_method(METHOD_CLEAR);
pluginRequest->mutable_payload()->PackFrom(clearRequest);
auto pluginResponse = std::make_shared<agent_plugin::PluginResponse>();
auto pluginClient = GetPlugin(pluginID);
if (pluginClient == nullptr) {
YRLOG_WARN("Clear virtual env {}_{} with plugin {} failed, pluginClient is null", envInfo.envName,
envInfo.envType, pluginID);
} else {
litebus::Async(GetAID(), &VirtualEnvPluginMgrActor::CallPluginImpl, pluginRequest, pluginResponse,
pluginClient)
.OnComplete([=](const litebus::Future<Status> &statusFut) {
const std::string envNameWithType = envInfo.envName + "_" + envInfo.envType;
if (!statusFut.IsError()) {
if (statusFut.Get().IsError()) {
YRLOG_WARN("{}|failed to recycle env {}, error message {}", traceID, envNameWithType,
statusFut.Get().GetMessage());
} else {
YRLOG_INFO("{}||env {}_{} is recycled", traceID, envInfo.envName, envInfo.envType);
envReferInfos_.erase(envNameWithType);
}
} else {
YRLOG_WARN("{}|failed to recycle env {}, get future error", traceID, envNameWithType);
}
});
}
}
}
void VirtualEnvPluginMgrActor::RecycleUnusedEnvs()
{
if (virtualEnvIdleTime_ <= -1) {
YRLOG_INFO("No need to recycle envs");
return;
}
DoClearVirtualEnv();
recycleUnusedEnvsTimer_ =
litebus::AsyncAfter(envRefRecycleInterval_, GetAID(), &VirtualEnvPluginMgrActor::RecycleUnusedEnvs);
}
void VirtualEnvPluginMgrActor::SetVirtualEnvIdleTime(const PluginInfo &pluginInfo)
{
const auto virtualEnvIdleTimeIt = pluginInfo.extra_params.find(VIRTUAL_ENV_IDLE_TIME);
if (virtualEnvIdleTimeIt != pluginInfo.extra_params.end()) {
try {
this->virtualEnvIdleTime_ = std::stoi(virtualEnvIdleTimeIt->second);
} catch (...) {
this->virtualEnvIdleTime_ = -1;
}
}
}
}