* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FUNCTION_AGENT_AGENT_SERVICE_ACTOR_H
#define FUNCTION_AGENT_AGENT_SERVICE_ACTOR_H
#include <actor/actor.hpp>
#include <memory>
#include <queue>
#include "async/future.hpp"
#include "common/constants/constants.h"
#include "common/heartbeat/heartbeat_client.h"
#include "common/network/network_isolation.h"
#include "common/register/register_helper.h"
#include "common/static_function_util.h"
#include "common/types/instance_state.h"
#include "common/utils/struct_transfer.h"
#include "function_agent/code_deployer/deployer.h"
#include "function_agent/code_deployer/s3_deployer.h"
#include "function_agent/common/constants.h"
#include "function_agent/common/types.h"
#include "function_agent/network/network_tool.h"
#include "function_agent/plugin/multi_plugin_client.h"
namespace functionsystem::function_agent {
const uint32_t DEFAULT_INTERVAL = 5000;
const uint32_t DOWNLOAD_CODE_RETRY_INTERVAL = 3000;
const uint32_t STATIC_FUNCTION_SCHEDULE_RETRY_INTERVAL = 3000;
const std::string TENANT_PODIP_IPSET_NAME = "tenant-podip-whitelist";
struct DeployerParameters {
std::shared_ptr<Deployer> deployer;
std::string destination;
std::shared_ptr<messages::DeployRequest> request;
};
using DeployInstanceRequest = std::shared_ptr<messages::DeployInstanceRequest>;
struct DeployInstanceRequestWrapper {
litebus::AID from;
DeployInstanceRequest request;
};
using KillInstanceRequest = std::shared_ptr<messages::KillInstanceRequest>;
struct KillInstanceRequestWrapper {
litebus::AID from;
KillInstanceRequest request;
};
class AgentServiceActor : public litebus::ActorBase {
public:
struct RuntimeManagerContext {
std::string name;
std::string address;
std::string id;
bool registered;
};
struct Config {
litebus::AID localSchedFuncAgentMgrAID;
S3Config s3Config;
messages::CodePackageThresholds codePackageThresholds;
std::string codePkgThresholdsCfgPath;
uint32_t pingTimeoutMs = 0;
std::string ipsetName = TENANT_PODIP_IPSET_NAME;
std::string nodeID;
};
AgentServiceActor(const std::string &name, const std::string &agentID, const Config &config,
const std::string &alias = "", const std::string &componentName = "")
: ActorBase(name),
agentID_(agentID),
alias_(alias),
codeReferInfos_(std::make_shared<std::unordered_map<std::string, CodeReferInfo>>()),
localSchedFuncAgentMgrAID_(config.localSchedFuncAgentMgrAID),
runtimesDeploymentCache_(std::make_shared<RuntimesDeploymentCache>()),
registeredResourceUnit_(std::make_shared<resources::ResourceUnit>()),
s3Config_(config.s3Config),
codePackageThresholds_(config.codePackageThresholds),
codePkgThresholdsCfgPath_(config.codePkgThresholdsCfgPath),
agentServiceName_(name),
isRegisterCompleted_(false),
pingTimeoutMs_(config.pingTimeoutMs),
ipsetIsolation_(std::make_shared<IpsetIpv4NetworkIsolation>(config.ipsetName)),
componentName_(componentName),
nodeID_(config.nodeID)
{
randomUuid_ = litebus::uuid_generator::UUID::GetRandomUUID().ToString();
}
~AgentServiceActor() override = default;
virtual void Registered(const litebus::AID &from, std::string &&name, std::string &&msg);
void TimeOutEvent();
litebus::Future<messages::Registered> StartPingPong(const messages::Registered ®istered);
* request to deploy an instance from local scheduler to function agent
* @param from: local scheduler's AID
* @param name: function name
* @param msg: request data, type is messages::DeployInstanceRequest
*/
virtual void DeployInstance(const litebus::AID &from, std::string &&name, std::string &&msg);
* request to kill an instance from local scheduler to function agent
* @param from: local scheduler's AID
* @param name: function name
* @param msg: request data, type is messages::KillInstanceRequest
*/
virtual void KillInstance(const litebus::AID &from, std::string &&name, std::string &&msg);
* response of starting an instance from runtime manager to function agent
* @param from: runtime manager's AID
* @param name: function name
* @param msg: response data, type is messages::StartInstanceResponse
*/
virtual void StartInstanceResponse(const litebus::AID &from, std::string &&name, std::string &&msg);
* response of stopping an instance from runtime manager to function agent
* @param from: runtime manager's AID
* @param name: function name
* @param msg: response data, type is messages::StartInstanceResponse
*/
virtual void StopInstanceResponse(const litebus::AID &from, std::string &&name, std::string &&msg);
* request to update resources from runtime manager to function agent
* @param from: runtime manager's AID
* @param name: function name
* @param msg: request data, type is messages::UpdateResourcesRequest
*/
virtual void UpdateResources(const litebus::AID &from, std::string &&name, std::string &&msg);
* request to update resources from runtime manager to function agent
* @param from: runtime manager's AID
* @param name: function name
* @param msg: request data, type is messages::UpdateResourcesRequest
*/
virtual void UpdateMetrics(const litebus::AID &from, std::string &&name, std::string &&msg);
* request to update an instance's status from runtime manager to function agent
* @param from: runtime manager's AID
* @param name: function name
* @param msg: request data, type is messages::UpdateInstanceStatusRequest
*/
virtual void UpdateInstanceStatus(const litebus::AID &from, std::string &&name, std::string &&msg);
* response of updating an instance's status from local scheduler to function agent
* @param from: local scheduler's AID
* @param name: function name
* @param msg: response data, type is messages::UpdateInstanceStatusResponse
*/
virtual void UpdateInstanceStatusResponse(const litebus::AID &from, std::string &&name, std::string &&msg);
* response of updating an agent's status from local scheduler to function agent
* @param from: local scheduler's AID
* @param name: function name
* @param msg: response data, type is messages::UpdateAgentStatusResponse
*/
virtual void UpdateAgentStatusResponse(const litebus::AID &from, std::string &&name, std::string &&msg);
* request to update a runtime manager's status from runtime manager to function agent
* @param from: local scheduler's AID
* @param name: function name
* @param msg: response data, type is messages::UpdateDiskUsageRequest
*/
virtual void UpdateRuntimeStatus(const litebus::AID &from, std::string &&name, std::string &&msg);
virtual void MarkRuntimeManagerUnavailable(const std::string &id);
virtual void QueryInstanceStatusInfo(const litebus::AID &from, std::string &&name, std::string &&msg);
virtual void QueryInstanceStatusInfoResponse(const litebus::AID &from, std::string &&name, std::string &&msg);
virtual void CleanStatus(const litebus::AID &from, std::string &&name, std::string &&msg);
virtual void CleanStatusResponse(const litebus::AID &from, std::string &&name, std::string &&msg);
virtual void UpdateCred(const litebus::AID &, std::string &&, std::string &&msg);
virtual void UpdateCredResponse(const litebus::AID &, std::string &&, std::string &&msg);
void GracefulShutdownFinish(const litebus::AID &, std::string &&, std::string &&msg);
virtual void SetNetworkIsolationRequest(const litebus::AID &, std::string &&, std::string &&msg);
* request to snapshot a runtime instance from function agent manager to function agent
* @param from: function agent manager's AID
* @param name: function name
* @param msg: request data, type is messages::SnapshotRuntimeRequest
*/
virtual void SnapshotRuntime(const litebus::AID &from, std::string &&name, std::string &&msg);
* response to snapshot runtime request from runtime manager to function agent
* @param from: runtime manager's AID
* @param name: function name
* @param msg: response data, type is messages::SnapshotRuntimeResponse
*/
virtual void SnapshotRuntimeResponse(const litebus::AID &from, std::string &&name, std::string &&msg);
litebus::Future<bool> GracefulShutdown();
litebus::Future<Status> SetDeployers(const std::string &storageType, const std::shared_ptr<Deployer> &deployer);
litebus::Future<Status> Readiness();
litebus::Future<Status> IsAgentReadiness();
void SetRegisterHelper(const std::shared_ptr<RegisterHelper> &helper);
virtual void QueryDebugInstanceInfos(const litebus::AID &, std::string &&, std::string &&msg);
virtual void QueryDebugInstanceInfosResponse(const litebus::AID &, std::string &&, std::string &&msg);
litebus::Future<Status> CreateStaticFunctionInstance();
litebus::Future<Status> LoadPlugins(const std::string &configs);
void CodePkgThresholdsCfgCallback(const std::string &path, const std::string &name, uint32_t mask);
void LoadCodePkgThresholdsCfg();
[[maybe_unused]] void SetIpsetName(std::string ipsetName)
{
ipsetName_ = ipsetName;
}
std::string GetIpsetName()
{
return ipsetName_;
}
[[maybe_unused]] std::string GetRegisteredResourceUnitID() const
{
return registeredResourceUnit_->id();
}
[[maybe_unused]] void SetRegisteredResourceUnit(const std::shared_ptr<resources::ResourceUnit> &unit)
{
registeredResourceUnit_ = unit;
}
[[maybe_unused]] std::shared_ptr<HeartbeatClientDriver> GetPingPongDriver() const
{
return pingPongDriver_;
}
[[maybe_unused]] std::shared_ptr<RuntimesDeploymentCache> GetRuntimesDeploymentCache() const
{
return runtimesDeploymentCache_;
}
[[maybe_unused]] void UpdateRuntimesDeploymentCache(
std::shared_ptr<RuntimesDeploymentCache> runtimesDeploymentCache)
{
runtimesDeploymentCache_ = std::move(runtimesDeploymentCache);
}
[[maybe_unused]] void SetRuntimeManagerAID(const litebus::AID &aid, bool registered = true,
const std::string &id = "")
{
registerRuntimeMgr_.name = aid.Name();
registerRuntimeMgr_.address = aid.Url();
registerRuntimeMgr_.registered = registered;
registerRuntimeMgr_.id = id;
}
[[maybe_unused]] RuntimeManagerContext GetRuntimeManagerContext() const
{
return registerRuntimeMgr_;
}
[[maybe_unused]] void SetLocalSchedFuncAgentMgrAID(const litebus::AID &aid)
{
localSchedFuncAgentMgrAID_ = aid;
}
[[maybe_unused]] void SetCodeReferManager(
const std::shared_ptr<std::unordered_map<std::string, function_agent::CodeReferInfo>> &codeReferManager)
{
codeReferInfos_ = codeReferManager;
}
[[maybe_unused]] std::shared_ptr<std::unordered_map<std::string, CodeReferInfo>> GetCodeReferManager() const
{
return codeReferInfos_;
}
[[maybe_unused]] void SetRegisterComplete(bool status)
{
isRegisterCompleted_ = status;
}
[[maybe_unused]] bool GetRegisterComplete() const
{
return isRegisterCompleted_;
}
[[maybe_unused]] void SetRegisterInfo(const RegisterInfo ®isterInfo)
{
registerInfo_ = registerInfo;
}
[[maybe_unused]] bool ProtectedSetNetwork(const std::vector<NetworkConfig> &configs)
{
return SetNetwork(configs);
}
[[maybe_unused]] void ProtectedStartProbers(const std::vector<ProberConfig> &config)
{
StartProbers(config);
}
[[maybe_unused]] void SetUpdateAgentStatusInfos(const std::unordered_map<std::string, litebus::Timer> &infoMap)
{
updateAgentStatusInfos_ = infoMap;
}
[[maybe_unused]] void ProtectedReceiveRegister(const std::string &message)
{
ReceiveRegister(message);
}
[[maybe_unused]] litebus::Future<messages::Registered> ProtectedRegisterAgent()
{
return RegisterAgent();
}
[[maybe_unused]] void ProtectedRetryRegisterAgent(const std::string &msg)
{
RetryRegisterAgent(msg);
}
[[maybe_unused]] void ProtectedAddCodeReferInfo(const messages::RuntimeInstanceInfo &info)
{
AddCodeReferByRuntimeInstanceInfo(info);
}
[[maybe_unused]] void SetClearCodePackageInterval(uint32_t interval)
{
clearCodePackageInterval_ = interval;
}
[[maybe_unused]] void SetRetrySendCleanStatusInterval(uint32_t interval)
{
retrySendCleanStatusInterval_ = interval;
}
[[maybe_unused]] void SetRetryRegisterInterval(uint32_t interval)
{
retryRegisterInterval_ = interval;
}
[[maybe_unused]] void SetUnitTestSituation(bool state)
{
isUnitTestSituation_ = state;
}
[[maybe_unused]] std::shared_ptr<IpsetIpv4NetworkIsolation> GetIpsetIpv4NetworkIsolation()
{
return ipsetIsolation_;
}
[[maybe_unused]] void SetIpsetIpv4NetworkIsolation(std::shared_ptr<IpsetIpv4NetworkIsolation> ipsetIsolation)
{
ipsetIsolation_ = ipsetIsolation;
}
[[maybe_unused]] void SetFailedDownloadRequests(const std::string &requestID)
{
DeployResult result;
result.status = Status(StatusCode::ERR_USER_CODE_LOAD, "code package download failed");
failedDownloadRequests_[requestID] = result;
}
[[maybe_unused]] void SetFailedDeployingObjects(const std::string &destination)
{
litebus::Promise<DeployResult> promise;
DeployResult result;
result.status = Status(StatusCode::ERR_USER_CODE_LOAD, "code package download failed");
promise.SetValue(result);
deployingObjects_[destination] = promise;
}
[[maybe_unused]] void SetS3Config(const S3Config &s3Config)
{
s3Config_ = s3Config;
}
[[maybe_unused]] S3Config GetS3Config()
{
return s3Config_;
}
[[maybe_unused]] void SetPingTimeoutMs(uint32_t pingTimeoutMs)
{
pingTimeoutMs_ = pingTimeoutMs;
}
protected:
void Init() override;
void Finalize() override;
void DownloadCodeAndStartRuntime(const std::shared_ptr<std::queue<DeployerParameters>> &deployObjects,
const std::shared_ptr<messages::DeployInstanceRequest> &req);
private:
bool SetNetwork(const std::vector<NetworkConfig> &configs);
void StartProbers(const std::vector<ProberConfig> &config);
messages::DeployInstanceResponse InitDeployInstanceResponse(const int32_t code, const std::string &message,
const messages::DeployInstanceRequest &source);
void InitKillInstanceResponse(messages::KillInstanceResponse *target, const messages::KillInstanceRequest &source);
Status StartRuntime(const DeployInstanceRequest &request, const litebus::Future<Status> &prepareEnvRes);
litebus::Future<messages::Registered> RegisterAgent();
void RetryRegisterAgent(const std::string &msg);
void ReceiveRegister(const std::string &message);
bool UpdateDeployedObjectByDestination(const std::shared_ptr<messages::DeployInstanceRequest> &req,
const std::string &destination, const DeployResult &result);
std::shared_ptr<std::queue<DeployerParameters>> BuildDeployerParameters(
const std::shared_ptr<messages::DeployInstanceRequest> &req);
void AddCodeReferByRuntimeInstanceInfo(const messages::RuntimeInstanceInfo &info);
void AddCodeRefer(const std::string &dstDir, const std::string &instanceID,
const std::shared_ptr<Deployer> &deployer);
void DeleteCodeReferByDeployInstanceRequest(const std::shared_ptr<messages::DeployInstanceRequest> &req);
void DeleteCodeReferByRuntimeInstanceInfo(const messages::RuntimeInstanceInfo &info);
void DeleteFunction(const std::string &functionDestination, const std::string &instanceID);
litebus::Future<bool> UpdateAgentStatusToLocal(int32_t status, const std::string &msg = "");
void RetryUpdateAgentStatusToLocal(const std::string &requestID, const std::string &msg);
void RemoveCodePackageAsync();
void CommitSuicide();
void CleanRuntimeManagerStatus(uint32_t retryTimes);
void GetDownloadCodeResult(const std::shared_ptr<std::queue<DeployerParameters>> &deployObjects,
const std::shared_ptr<messages::DeployInstanceRequest> &req,
const std::string &destination, const litebus::Future<DeployResult> &result);
bool IsDownloadFailed(const std::shared_ptr<messages::DeployInstanceRequest> &req);
void DownloadCode(const std::shared_ptr<messages::DeployRequest> &request,
const std::shared_ptr<Deployer> &deployer,
const std::shared_ptr<litebus::Promise<DeployResult>> &promise, const uint32_t retryTimes);
litebus::Future<DeployResult> AsyncDownloadCode(const std::shared_ptr<messages::DeployRequest> &request,
const std::shared_ptr<Deployer> &deployer);
litebus::Future<Status> PrepareEnv(const DeployInstanceRequest &request);
litebus::Future<bool> RecoverPluginCache(const std::string &message);
void AttachTemporaryAccesskey(const std::string &storageType,
std::shared_ptr<messages::DeployRequest> &deployRequest,
const std::shared_ptr<messages::DeployInstanceRequest> &req);
std::shared_ptr<messages::ScheduleRequest> CreateScheduleRequest(const StaticFunctionConfig &config,
const FunctionMeta &meta);
void RetryInstanceSchedule(const std::string &msg, const std::string &requestId, uint32_t retryTime);
virtual void StaticFunctionScheduleResponse(const litebus::AID &from, std::string &&, std::string &&msg);
virtual void NotifyFunctionStatusChange(const litebus::AID &from, std::string &&, std::string &&msg);
DeployResult PrepareSharedDir(std::shared_ptr<messages::DeployInstanceRequest> &req);
bool IsDelegateWorkingDirPath(const DeployerParameters &deployObject);
private:
std::unordered_map<std::string, std::shared_ptr<Deployer>> deployers_;
std::shared_ptr<MultiPluginClient> pluginClient_;
std::unordered_map<std::string, DeployInstanceRequestWrapper> deployingRequest_;
std::unordered_map<std::string, KillInstanceRequestWrapper> killingRequest_;
std::unordered_map<std::string, litebus::AID> snapshotRequests_;
std::string agentID_;
std::string alias_;
std::unordered_map<std::string, litebus::Promise<DeployResult>> deployingObjects_;
std::unordered_map<std::string, DeployResult> failedDownloadRequests_;
std::unordered_map<std::string, std::shared_ptr<litebus::Promise<Status>>> prepareEnvRequest_;
std::shared_ptr<std::unordered_map<std::string, CodeReferInfo>> codeReferInfos_{ nullptr };
litebus::AID localSchedFuncAgentMgrAID_;
std::shared_ptr<RuntimesDeploymentCache> runtimesDeploymentCache_{ nullptr };
std::shared_ptr<resources::ResourceUnit> registeredResourceUnit_{ nullptr };
std::unordered_map<std::string, std::shared_ptr<litebus::Promise<bool>>> updateAgentStatusInfoPromises_;
std::unordered_map<std::string, litebus::Timer> updateAgentStatusInfos_;
S3Config s3Config_;
messages::CodePackageThresholds codePackageThresholds_;
std::string codePkgThresholdsCfgPath_;
std::string agentServiceName_;
std::shared_ptr<HeartbeatClientDriver> pingPongDriver_{ nullptr };
std::shared_ptr<RegisterHelper> registerHelper_{ nullptr };
RegisterInfo registerInfo_;
RuntimeManagerContext registerRuntimeMgr_{ "", "", "", false };
bool isRegisterCompleted_;
uint32_t pingTimeoutMs_;
uint32_t retryRegisterInterval_{ REGISTER_AGENT_TIMEOUT };
uint32_t retryDownloadInterval_{ DOWNLOAD_CODE_RETRY_INTERVAL };
litebus::Timer clearCodePackageTimer_;
uint32_t clearCodePackageInterval_{ DEFAULT_INTERVAL };
uint32_t retrySendCleanStatusInterval_{ DEFAULT_RETRY_SEND_CLEAN_STATUS_INTERVAL };
int remainedClearCodePackageRetryTimes_{ -1 };
bool isCleaningStatus_{ false };
litebus::Promise<StatusCode> clearCodePackagePromise_;
litebus::Promise<StatusCode> sendCleanStatusPromise_;
bool monopolyUsed_{ false };
bool isUnitTestSituation_{ false };
bool exiting_ = false;
litebus::Promise<bool> runtimeManagerGracefulShutdown_;
int64_t gracefulShutdownTime_{ 0 };
std::string ipsetName_{ TENANT_PODIP_IPSET_NAME };
std::shared_ptr<IpsetIpv4NetworkIsolation> ipsetIsolation_{ nullptr };
bool enableRestartForReuse_{ false };
std::string randomUuid_;
uint32_t retryScheduleInterval_{ STATIC_FUNCTION_SCHEDULE_RETRY_INTERVAL };
std::shared_ptr<litebus::Promise<messages::ScheduleResponse>> scheduleResponsePromise_;
std::string componentName_;
std::string nodeID_;
bool HandleNetworkIsolation(const std::shared_ptr<messages::SetNetworkIsolationRequest> &req);
std::unordered_map<std::string, int32_t> instanceHealthyMap_;
bool isReady_ = false;
};
}
#endif