* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "bootstrap_actor.h"
#include <sys/inotify.h>
#include "function_master/system_function_loader/constants.h"
#include "async/asyncafter.hpp"
#include "async/collect.hpp"
#include "async/defer.hpp"
#include "common/constants/actor_name.h"
#include "common/constants/constants.h"
#include "common/constants/signal.h"
#include "common/proto/pb/posix_pb.h"
#include "common/resource_view/resource_type.h"
#include "common/types/instance_state.h"
#include "common/hex/hex.h"
#include "common/utils/files.h"
#include "common/utils/meta_store_kv_operation.h"
#include "common/utils/struct_transfer.h"
#include "utils/string_utils.hpp"
namespace functionsystem::system_function_loader {
using namespace functionsystem::explorer;
BootstrapActor::BootstrapActor(const std::shared_ptr<MetaStoreClient> &metaClient,
std::shared_ptr<global_scheduler::GlobalSched> globalSched, uint32_t sysFuncRetryPeriod,
const std::string &instanceManagerAddress, bool enableFrontendPool)
: ActorBase(SYSTEM_FUNCTION_LOADER_BOOTSTRAP_ACTOR), retryTimeoutMs_(RETRY_TIMEOUT_MS)
{
member_ = std::make_shared<Member>();
member_->globalSched = std::move(globalSched);
member_->sysFuncRetryPeriod = sysFuncRetryPeriod;
member_->instanceManagerAddress = instanceManagerAddress;
metaStorageAccessor_ = std::make_shared<MetaStorageAccessor>(metaClient);
if (enableFrontendPool) {
member_->dynamicFunctionSet.insert(FRONTEND_FUNCTION_NAME);
}
}
void BootstrapActor::Init()
{
ASSERT_IF_NULL(member_);
auto masterBusiness = std::make_shared<MasterBusiness>(shared_from_this(), member_);
auto slaveBusiness = std::make_shared<SlaveBusiness>(shared_from_this(), member_);
(void)businesses_.emplace(MASTER_BUSINESS, masterBusiness);
(void)businesses_.emplace(SLAVE_BUSINESS, slaveBusiness);
(void)Explorer::GetInstance().AddLeaderChangedCallback(
"systemFunction", [aid(GetAID())](const LeaderInfo &leaderInfo) {
litebus::Async(aid, &BootstrapActor::UpdateLeaderInfo, leaderInfo);
});
curStatus_ = SLAVE_BUSINESS;
business_ = slaveBusiness;
Receive("ForwardCustomSignalResponse", &BootstrapActor::ForwardCustomSignalResponse);
}
void BootstrapActor::LoadBootstrapConfig(const std::string &customArgs)
{
(void)LoadSysFuncCustomArgs(customArgs);
if (auto status = LoadCurrentSysFuncMetas(); status.IsError()) {
return;
}
if (auto status = LoadSysFuncMetas(); status.IsError()) {
return;
}
if (auto status = LoadSysFuncPayloads(); status.IsError()) {
return;
}
if (auto status = LoadFunctionConfigs(); status.IsError()) {
return;
}
if (auto status = LoadCurrentFunctionConfigs(); status.IsError()) {
YRLOG_INFO("First time to load system functions");
member_->currFunctionConfigMap = member_->functionConfigMap;
}
SystemFunctionKeepAlive();
UpdateConfigHandler();
UpdateSysFunctionPayload();
}
void BootstrapActor::SysFunctionConfigCallBack(const std::string &path, const std::string &name, uint32_t mask)
{
YRLOG_DEBUG("path: {}, name: {}, mask: {}", path, name, mask);
if (mask & IN_DELETE) {
YRLOG_INFO("ReloadBootstrapConfig");
litebus::AsyncAfter(waitUpdateConfigMapMs_, GetAID(), &BootstrapActor::UpdateConfigHandler);
}
}
void BootstrapActor::SysFunctionPayloadCallBack(const std::string &path, const std::string &name, uint32_t mask)
{
YRLOG_DEBUG("path: {}, name: {}, mask: {}", path, name, mask);
if (mask & IN_DELETE) {
YRLOG_INFO("ReloadSysFuncPayloads");
litebus::AsyncAfter(waitUpdateConfigMapMs_, GetAID(), &BootstrapActor::UpdatePayloadHandler);
}
}
void BootstrapActor::SysFunctionMetaCallBack(const std::string &path, const std::string &name, uint32_t mask)
{
YRLOG_DEBUG("path: {}, name: {}, mask: {}", path, name, mask);
if (mask & IN_DELETE) {
YRLOG_INFO("ReloadSysFuncMetas");
litebus::AsyncAfter(waitUpdateConfigMapMs_, GetAID(), &BootstrapActor::UpdateMetaHandler);
}
}
void BootstrapActor::UpdateConfigHandler()
{
if (auto status = LoadFunctionConfigs(); status.IsError()) {
return;
}
UpdateSysFunctionConfig();
}
void BootstrapActor::UpdatePayloadHandler()
{
systemFuncPayloadMap_.clear();
if (auto status = LoadSysFuncPayloads(); status.IsError()) {
return;
}
UpdateSysFunctionPayload();
}
void BootstrapActor::UpdateMetaHandler()
{
LoadSysFuncMetas();
}
void BootstrapActor::UpdateSysFunctionConfig()
{
for (auto iter : member_->functionConfigMap) {
std::string funcName = iter.first;
auto newConfig = GetFunctionConfig(funcName);
if (newConfig.IsNone()) {
YRLOG_ERROR("temp function config of ({}) does not exist", funcName);
continue;
}
auto currConfig = GetCurrFunctionConfig(funcName);
if (CheckSysFunctionNeedUpgrade(funcName, newConfig.Get(), currConfig)) {
UpgradeSystemFunction(funcName, newConfig.Get(), currConfig);
continue;
}
}
}
void BootstrapActor::UpdateSysFunctionPayload()
{
for (auto iter = systemFuncPayloadMap_.begin(); iter != systemFuncPayloadMap_.end(); ++iter) {
if (member_->dynamicFunctionSet.find(iter->first) != member_->dynamicFunctionSet.end()
&& member_->dynamicFunctionInstance.find(iter->first) == member_->dynamicFunctionInstance.end()) {
continue;
}
SendUpdateArgsSignal(iter->first, iter->second, 0);
}
}
void BootstrapActor::UpdateSysFunctionPayloadByName(const std::string &functionName)
{
for (auto iter = systemFuncPayloadMap_.begin(); iter != systemFuncPayloadMap_.end(); ++iter) {
if (std::strcmp(iter->second.sysFuncName.c_str(), functionName.c_str()) == 0) {
SendUpdateArgsSignal(iter->first, iter->second, 0);
}
}
}
void BootstrapActor::DoDynamicScaleByFunctionName(const std::string &funcName, uint32_t instanceNum,
const FunctionPayload &payload)
{
YRLOG_INFO("dynamic scale function({}) instanceNum to {}", funcName, instanceNum);
SendUpdateArgsSignal(funcName, payload, 0);
}
void BootstrapActor::UpdateSysFunctionMeta(const std::string &jsonStr)
{
auto funcMeta = GetFuncMetaFromJson(jsonStr);
auto funcKey = FUNC_META_PATH_PREFIX + "/" + funcMeta.funcMetaData.tenantId + "/function/" +
funcMeta.funcMetaData.name + "/version/" + funcMeta.funcMetaData.version;
auto funcName = funcMeta.funcMetaData.name;
auto funcMetaQueueIter = systemFuncMetaQueue_.find(funcName);
if (funcMetaQueueIter == systemFuncMetaQueue_.end()) {
std::vector<std::pair<std::string, FunctionMeta>> queue;
queue.emplace_back(funcKey, funcMeta);
systemFuncMetaQueue_[funcName] = queue;
metaStorageAccessor_->Put(funcKey, jsonStr);
return;
}
for (auto iter = funcMetaQueueIter->second.begin(); iter != funcMetaQueueIter->second.end();) {
if (std::strcmp(iter->second.funcMetaData.version.c_str(), funcMeta.funcMetaData.version.c_str()) == 0) {
systemFuncMetaQueue_[funcName].erase(iter);
systemFuncMetaQueue_[funcName].emplace_back(funcKey, funcMeta);
return;
}
++iter;
}
systemFuncMetaQueue_[funcName].emplace_back(funcKey, funcMeta);
metaStorageAccessor_->Put(funcKey, jsonStr);
if (systemFuncMetaQueue_[funcName].size() > MAX_SYSFUNCTION_META_INFO) {
metaStorageAccessor_->Delete(systemFuncMetaQueue_[funcName].front().first);
systemFuncMetaQueue_[funcName].erase(systemFuncMetaQueue_[funcName].begin());
}
}
void BootstrapActor::UpgradeSystemFunction(const std::string &funcName, const FunctionConfig &newConfig,
const litebus::Option<FunctionConfig> &currConfigOpt)
{
StopSystemFunctionKeepAlive();
FunctionConfig currConfig = newConfig;
if (currConfigOpt.IsNone()) {
currConfig = newConfig;
}
YRLOG_INFO("Start to upgrade function ({})", funcName);
(void)SendUpgradeFunctionSignal(funcName, newConfig, currConfig, 0)
.OnComplete(litebus::Defer(GetAID(), &BootstrapActor::SendUpgradeFunctionSignalCallBack, funcName, newConfig,
std::placeholders::_1));
}
void BootstrapActor::SendUpgradeFunctionSignalCallBack(const std::string &funcName, const FunctionConfig &newConfig,
const litebus::Future<Status> &future)
{
if (future.IsError() || future.Get().IsError()) {
YRLOG_WARN("function ({}) upgrade failed, rollback to previous version", funcName);
} else {
YRLOG_INFO("function ({}) upgrade succeed, save new bootstrap config", funcName);
member_->currFunctionConfigMap[funcName] = newConfig;
metaStorageAccessor_->Put(SYSFUNCTION_CONFIG_KEY + "/" + funcName, newConfig.jsonStr);
}
litebus::Async(GetAID(), &BootstrapActor::SystemFunctionKeepAlive);
litebus::AsyncAfter(waitStartInstanceMs_, GetAID(), &BootstrapActor::UpdateSysFunctionPayloadByName, funcName);
}
litebus::Future<std::list<Status>> BootstrapActor::KillSystemFuncInstances()
{
ASSERT_IF_NULL(business_);
return business_->KillSystemFuncInstances();
}
litebus::Future<Status> BootstrapActor::KillInstance(const FuncInstanceParams &instanceParams, const int32_t signal,
const std::string functionName)
{
auto instanceKeyOpt =
GenInstanceKey(instanceParams.functionKey, instanceParams.instanceID, instanceParams.requestID);
if (instanceKeyOpt.IsNone()) {
YRLOG_ERROR("failed to find instance({}), invalid instance key", instanceParams.instanceID);
return Status(FAILED);
}
auto instanceKey = instanceKeyOpt.Get();
auto instance = metaStorageAccessor_->GetWithPrefix(instanceKey);
if (instance.IsNone()) {
YRLOG_WARN("instance({}) not existed", instanceKey);
return signal == SHUT_DOWN_SIGNAL_SYNC ? Status(SUCCESS) : Status(FAILED);
}
auto instancePtr = std::make_shared<resource_view::InstanceInfo>();
if (!TransToInstanceInfoFromJson(*instancePtr, instance.Get().second)) {
YRLOG_WARN("invalid instance({}) body from meta storage", instanceKey);
return signal == SHUT_DOWN_SIGNAL_SYNC ? Status(SUCCESS) : Status(FAILED);
}
if (instancePtr->functionproxyid().empty() || instancePtr->functionproxyid() == INSTANCE_MANAGER_OWNER) {
YRLOG_DEBUG("instance({}) has been taken over by instance-manager, send kill to instance manager", instanceKey);
messages::ForwardKillRequest req;
req.set_requestid(instancePtr->requestid());
*req.mutable_instance() = std::move(*instancePtr);
(void)Send(litebus::AID(INSTANCE_MANAGER_ACTOR_NAME, member_->instanceManagerAddress), "ForwardKill",
req.SerializeAsString());
return Status(FAILED);
}
return member_->globalSched->GetLocalAddress(instancePtr->functionproxyid())
.Then(litebus::Defer(GetAID(), &BootstrapActor::SendKillRequest, std::placeholders::_1, instanceParams,
instancePtr->functionproxyid(), signal, functionName));
}
litebus::Future<Status> BootstrapActor::SendKillRequest(
const litebus::Future<litebus::Option<std::string>> &proxyAddress, const FuncInstanceParams &instanceParams,
const std::string &proxyID, const int32_t signal, const std::string &functionName)
{
if (proxyAddress.IsError() || proxyAddress.Get().IsNone()) {
YRLOG_ERROR("failed to kill instance({}), failed to get local address from global scheduler",
instanceParams.instanceID);
return Status(FAILED);
}
auto requestID = litebus::uuid_generator::UUID::GetRandomUUID().ToString();
core_service::KillRequest killRequest{};
killRequest.set_instanceid(instanceParams.instanceID);
killRequest.set_signal(signal);
killRequest.set_requestid(requestID);
YRLOG_INFO("set killRequest signal: {}", signal);
if (signal != SHUT_DOWN_SIGNAL_SYNC) {
YRLOG_INFO("set payload for args upgrade");
if (functionName.empty() || systemFuncPayloadMap_.find(functionName) == systemFuncPayloadMap_.end()) {
YRLOG_ERROR("function ({}) does not exist", functionName);
return Status(FAILED);
}
auto payload = Base64Encode(std::string(systemFuncPayloadMap_[functionName].payload.dump()));
killRequest.set_payload(payload);
}
auto forwardKillRequest = std::make_shared<internal::ForwardKillRequest>();
forwardKillRequest->set_requestid(requestID);
forwardKillRequest->set_srcinstanceid("");
forwardKillRequest->set_instancerequestid(instanceParams.requestID);
*forwardKillRequest->mutable_req() = std::move(killRequest);
auto aid = litebus::AID(proxyID + LOCAL_SCHED_INSTANCE_CTRL_ACTOR_NAME_POSTFIX, proxyAddress.Get().Get());
YRLOG_INFO("{}|send instance({}) kill request to local({})", requestID, instanceParams.instanceID,
std::string(aid));
(void)Send(aid, "ForwardCustomSignalRequest", forwardKillRequest->SerializeAsString());
auto promise = std::make_shared<litebus::Promise<Status>>();
killPromiseMap_[requestID] = promise;
return promise->GetFuture();
}
void BootstrapActor::ScaleByFunctionName(const std::string &funcName, uint32_t instanceNum)
{
auto iter = systemFuncPayloadMap_.find(funcName);
if (iter == systemFuncPayloadMap_.end()) {
return;
}
auto &payload = iter->second;
payload.payload["instanceNum"] = instanceNum;
member_->dynamicFunctionInstance[funcName] = instanceNum;
ASSERT_IF_NULL(business_);
return business_->ScaleByFunctionName(funcName, instanceNum, payload);
}
Status BootstrapActor::LoadFunctionConfigs()
{
if (!litebus::os::ExistPath(BOOTSTRAP_CONFIG_PATH)) {
YRLOG_WARN("bootstrap config file not exist, maybe not be supported.");
return Status(StatusCode::FILE_NOT_FOUND, "bootstrap config file not exist");
}
std::string jsonStr = Read(BOOTSTRAP_CONFIG_PATH);
nlohmann::json confJson;
try {
confJson = nlohmann::json::parse(jsonStr);
} catch (nlohmann::detail::parse_error &e) {
YRLOG_ERROR("LoadFunctionConfigs parse json failed, error: {}", e.what());
return Status(StatusCode::JSON_PARSE_ERROR, "parse json failed, " + jsonStr + ", error: " + e.what());
}
for (const auto &item : confJson.items()) {
auto functionName = item.key();
if (functionName.empty()) {
YRLOG_WARN("empty function name");
continue;
}
auto config = item.value();
BuildConfigMap(member_->functionConfigMap, functionName, config);
}
return Status(StatusCode::SUCCESS);
}
Status BootstrapActor::LoadCurrentFunctionConfigs()
{
auto bootstrapConfig = metaStorageAccessor_->GetAllWithPrefix(SYSFUNCTION_CONFIG_KEY);
if (bootstrapConfig.IsNone()) {
YRLOG_WARN("Failed to get bootstrap config from metaStorage");
return Status(StatusCode::FAILED);
}
auto currentConfig = bootstrapConfig.Get();
for (auto iter = currentConfig.begin(); iter != currentConfig.end(); ++iter) {
std::string key = iter->first;
std::string value = iter->second;
std::string functionName = key.substr(key.find_last_of('/') + 1, key.length());
nlohmann::json confJson;
try {
confJson = nlohmann::json::parse(value);
} catch (nlohmann::detail::parse_error &e) {
YRLOG_ERROR("parse json failed, error: {}", e.what());
return Status(StatusCode::JSON_PARSE_ERROR, "parse json failed, " + value + ", error: " + e.what());
}
BuildConfigMap(member_->currFunctionConfigMap, functionName, confJson);
}
return Status(StatusCode::SUCCESS);
}
Status BootstrapActor::LoadSysFuncPayloads()
{
if (!litebus::os::ExistPath(PAYLOADFILE_WATCH_PATH)) {
YRLOG_WARN("{} is not exist", PAYLOADFILE_WATCH_PATH);
return Status(StatusCode::FAILED);
}
auto filesOption = litebus::os::Ls(PAYLOADFILE_WATCH_PATH);
if (filesOption.IsNone() || filesOption.Get().empty()) {
YRLOG_WARN("no function payload file in {}", PAYLOADFILE_WATCH_PATH);
return Status(StatusCode::FAILED);
}
auto files = filesOption.Get();
for (const auto &file : files) {
auto filePath = litebus::os::Join(PAYLOADFILE_WATCH_PATH, file, '/');
if (!IsFile(filePath)) {
continue;
}
YRLOG_INFO("read function payload file {}", filePath);
auto content = litebus::os::Read(filePath);
if (content.IsNone() || content.Get().empty()) {
YRLOG_WARN("no function payload information in {}", filePath);
continue;
}
std::string jsonStr = content.Get();
nlohmann::json confJson;
try {
confJson = nlohmann::json::parse(jsonStr);
} catch (nlohmann::detail::parse_error &e) {
YRLOG_ERROR("LoadSysFuncPayloads parse json failed, error: {}", e.what());
return Status(StatusCode::JSON_PARSE_ERROR, "parse json failed, " + jsonStr + ", error: " + e.what());
}
for (const auto &item : confJson.items()) {
AddSysFuncPayload(item.key(), item.value());
}
}
YRLOG_INFO("load system function payload from path({}) successfully", PAYLOADFILE_WATCH_PATH);
return Status(StatusCode::SUCCESS);
}
void BootstrapActor::AddSysFuncPayload(const std::string &configKey, const nlohmann::json &config)
{
if (configKey.empty()) {
YRLOG_WARN("empty config key");
return;
}
FunctionPayload funcPayload{ .sysFuncName = "", .signal = 0, .payload = {} };
if (config.find("systemFunctionName") != config.end()) {
funcPayload.sysFuncName = config["systemFunctionName"];
}
if (config.find("signal") != config.end()) {
funcPayload.signal = config["signal"];
}
if (config.find("payload") != config.end()) {
funcPayload.payload = config["payload"];
}
if (auto iter = member_->dynamicFunctionSet.find(configKey); iter != member_->dynamicFunctionSet.end()) {
uint32_t instanceNum = 0;
auto insIter = member_->dynamicFunctionInstance.find(configKey);
if (insIter != member_->dynamicFunctionInstance.end()) {
instanceNum = insIter->second;
}
if (funcPayload.payload.find("instanceNum") != funcPayload.payload.end()) {
funcPayload.payload["instanceNum"] = instanceNum;
}
}
systemFuncPayloadMap_[configKey] = funcPayload;
}
Status BootstrapActor::LoadSysFuncMetas()
{
if (!litebus::os::ExistPath(METAFILE_WATCH_PATH)) {
YRLOG_WARN("{} is not exist", METAFILE_WATCH_PATH);
return Status(StatusCode::FAILED);
}
auto filesOption = litebus::os::Ls(METAFILE_WATCH_PATH);
if (filesOption.IsNone() || filesOption.Get().empty()) {
YRLOG_WARN("no function meta file in {}", METAFILE_WATCH_PATH);
return Status(StatusCode::FAILED);
}
auto files = filesOption.Get();
for (const auto &file : files) {
auto filePath = litebus::os::Join(METAFILE_WATCH_PATH, file, '/');
if (!IsFile(filePath)) {
continue;
}
YRLOG_INFO("Read function meta file {}", filePath);
auto content = litebus::os::Read(filePath);
if (content.IsNone() || content.Get().empty()) {
YRLOG_WARN("no function meta information in {}", filePath);
continue;
}
try {
UpdateSysFunctionMeta(content.Get());
} catch (std::exception &e) {
YRLOG_WARN("function metadata is invalid in {}, error: {}", filePath, e.what());
continue;
}
}
YRLOG_INFO("load system function meta from path({}) successfully", METAFILE_WATCH_PATH);
return Status(StatusCode::SUCCESS);
}
Status BootstrapActor::LoadCurrentSysFuncMetas()
{
auto prefix = FUNC_META_PATH_PREFIX + "/0/function/";
auto sysFuncMetas = metaStorageAccessor_->GetAllWithPrefix(prefix);
if (sysFuncMetas.IsNone()) {
YRLOG_WARN("No system function meta infos from metaStorage");
return Status(StatusCode::SUCCESS);
}
auto currentMetas = sysFuncMetas.Get();
for (auto iter = currentMetas.begin(); iter != currentMetas.end(); ++iter) {
std::string funcKey = iter->first;
std::string content = iter->second;
try {
auto funcMeta = GetFuncMetaFromJson(content);
auto funcName = funcMeta.funcMetaData.name;
if (systemFuncMetaQueue_.find(funcName) == systemFuncMetaQueue_.end()) {
std::vector<std::pair<std::string, FunctionMeta>> queue;
queue.emplace_back(funcKey, funcMeta);
systemFuncMetaQueue_[funcName] = queue;
} else {
systemFuncMetaQueue_[funcName].emplace_back(funcKey, funcMeta);
}
if (systemFuncMetaQueue_[funcName].size() > MAX_SYSFUNCTION_META_INFO) {
YRLOG_WARN("the number of recovered metadata ({}) reaches maximum limit ({})", funcName,
MAX_SYSFUNCTION_META_INFO);
}
} catch (std::exception &e) {
YRLOG_WARN("function metadata is invalid with funcKey {}, error: {}", funcKey, e.what());
continue;
}
}
return Status(StatusCode::SUCCESS);
}
void BootstrapActor::BuildConfigMap(std::unordered_map<std::string, FunctionConfig> &mapName,
const std::string &functionName, const nlohmann::json &config)
{
YRLOG_INFO("Build function {} config", functionName);
FunctionConfig functionConfig{ .tenantID = "0",
.version = "$latest",
.memory = 0,
.cpu = 0,
.instanceNum = 0,
.args = {},
.extension = {},
.createOptions = {},
.jsonStr = config.dump() };
functionConfig.extension["schedule_policy"] = "shared";
if (config.find("tenantID") != config.end()) {
functionConfig.tenantID = config["tenantID"];
}
if (config.find("version") != config.end()) {
functionConfig.version = config["version"];
}
if (config.find("memory") != config.end()) {
functionConfig.memory = static_cast<float>(config["memory"]);
}
if (config.find("cpu") != config.end()) {
functionConfig.cpu = static_cast<float>(config["cpu"]);
}
if (config.find("createOptions") != config.end()) {
for (const auto &createOption : config["createOptions"].items()) {
functionConfig.createOptions[createOption.key()] = createOption.value();
}
}
if (config.find("instanceNum") != config.end()) {
functionConfig.instanceNum = static_cast<float>(config["instanceNum"]);
}
if (auto schedulingOps = config.find("schedulingOps"); schedulingOps != config.end()) {
if (auto extension = schedulingOps->find("extension"); extension != schedulingOps->end()) {
for (const auto &item : extension->items()) {
functionConfig.extension[item.key()] = item.value();
}
}
}
if (config.find("args") != config.end()) {
functionConfig.args = config["args"];
BuildFunctionArgs(functionName, functionConfig.args);
}
mapName[functionName] = functionConfig;
}
void BootstrapActor::BuildFunctionArgs(const std::string &functionName, nlohmann::json &customJson)
{
auto customArgs = sysFuncCustomArgsMap_.find(functionName);
if (customArgs == sysFuncCustomArgsMap_.end()) {
YRLOG_INFO("function {} don't hava custom args in startup parameters", functionName);
return;
}
if (customJson.empty() || !customJson.is_object()) {
YRLOG_INFO("function {} don't hava custom args in config file", functionName);
return;
}
for (const auto &customArg : customArgs->second.items()) {
customJson[customArg.key()] = customArg.value();
}
}
Status BootstrapActor::LoadSysFuncCustomArgs(const std::string &argStr)
{
if (argStr.empty()) {
YRLOG_WARN("sysFuncCustomArgs is empty");
sysFuncCustomArgsMap_.clear();
return Status(FAILED, "sysFuncCustomArgs is empty");
}
nlohmann::json argJson;
try {
argJson = nlohmann::json::parse(argStr);
} catch (nlohmann::detail::parse_error &e) {
YRLOG_ERROR("parse arg json failed,error: {}", e.what());
return Status(StatusCode::JSON_PARSE_ERROR, "parse arg json failed, " + argStr + ", error: " + e.what());
}
for (const auto &item : argJson.items()) {
sysFuncCustomArgsMap_[item.key()] = item.value();
}
return Status(StatusCode::SUCCESS);
}
litebus::Future<bool> BootstrapActor::CheckInstanceExist(const FuncInstanceParams &funcInstanceParams)
{
if (member_->instanceMgr == nullptr) {
YRLOG_ERROR("failed to check instance exist, null instanceMgr");
return false;
}
return member_->instanceMgr->GetInstanceInfoByInstanceID(funcInstanceParams.instanceID)
.Then(litebus::Defer(GetAID(), &BootstrapActor::OnGetInstanceInfo, std::placeholders::_1, funcInstanceParams));
}
litebus::Future<bool> BootstrapActor::OnGetInstanceInfo(const instance_manager::InstanceKeyInfoPair &pair,
const FuncInstanceParams &funcInstanceParams)
{
auto instanceKey =
GenInstanceKey(funcInstanceParams.functionKey, funcInstanceParams.instanceID, funcInstanceParams.requestID);
if (instanceKey.IsNone()) {
YRLOG_WARN("invalid instance key");
return true;
}
auto key = pair.first;
auto instancePtr = pair.second;
if (key.empty() || instancePtr == nullptr) {
YRLOG_INFO("instance {} 's status is not alive.", instanceKey.Get());
member_->instanceWaitingStateTimesMap[instanceKey.Get()] = 0;
return false;
}
auto routeKey = GenInstanceRouteKey(funcInstanceParams.instanceID);
if (IsNonRecoverableStatus(instancePtr->instancestatus().code())) {
YRLOG_INFO("instance({}) 's status is failed, try to kill", key);
member_->instanceWaitingStateTimesMap[key] = 0;
(void)KillInstance(funcInstanceParams, SHUT_DOWN_SIGNAL_SYNC);
return true;
}
if (IsWaitingStatus(instancePtr->instancestatus().code())) {
if (member_->instanceWaitingStateTimesMap.find(key) == member_->instanceWaitingStateTimesMap.end()) {
member_->instanceWaitingStateTimesMap[key] = 0;
}
++member_->instanceWaitingStateTimesMap[key];
} else {
member_->instanceWaitingStateTimesMap[key] = 0;
}
if (member_->instanceWaitingStateTimesMap[key] == WAIT_INSTANCE_MAX_TIMES) {
YRLOG_INFO("instance({}) is in waiting status and reach max times, try to force delete", key);
member_->instanceWaitingStateTimesMap[key] = 0;
(void)metaStorageAccessor_->Delete(key, true);
metaStorageAccessor_->Delete(routeKey, true);
return true;
}
return true;
}
void BootstrapActor::OnCheckInstanceExist(const litebus::Future<bool> &isExisted, const FuncInstanceParams ¶ms,
const std::pair<std::string, FunctionConfig> &funcConfig)
{
if (isExisted.IsOK() && isExisted.Get()) {
return;
}
if (member_->isExiting) {
YRLOG_INFO("system function loader is exiting, don't create new instance");
return;
}
YRLOG_INFO("Send function {} schedule request to global scheduler", funcConfig.first);
(void)member_->globalSched->Schedule(BuildScheduleRequest(funcConfig.second, params))
.OnComplete([aid(GetAID()), functionName(funcConfig.first),
waitStartInstanceMs(waitStartInstanceMs_)](const litebus::Future<Status> &future) {
if (future.IsError()) {
YRLOG_ERROR("function {} schedule failed, reason: {}", functionName, future.GetErrorCode());
return;
}
auto status = future.Get();
if (status.IsError()) {
YRLOG_ERROR("function {} schedule failed, reason: {}", functionName, status.ToString());
return;
}
YRLOG_INFO("function {} schedule success", functionName);
litebus::AsyncAfter(waitStartInstanceMs, aid, &BootstrapActor::UpdateSysFunctionPayloadByName,
functionName);
});
}
std::shared_ptr<messages::ScheduleRequest> BootstrapActor::BuildScheduleRequest(
const FunctionConfig &functionConfig, const FuncInstanceParams &funcInstanceParams)
{
auto createRequest = std::make_shared<CreateRequest>();
createRequest->set_traceid(funcInstanceParams.traceID);
createRequest->set_requestid(funcInstanceParams.requestID);
createRequest->set_designatedinstanceid(funcInstanceParams.instanceID);
createRequest->set_function(funcInstanceParams.functionKey);
if (!functionConfig.args.empty()) {
auto args = createRequest->mutable_args();
::common::Arg arg{};
arg.set_type(::common::Arg_ArgType_VALUE);
arg.set_value(functionConfig.args.dump());
args->Add(std::move(arg));
}
for (const auto &createOpt : functionConfig.createOptions) {
(*createRequest->mutable_createoptions())[createOpt.first] = createOpt.second;
}
(*createRequest->mutable_createoptions())[RESOURCE_OWNER_KEY] = SYSTEM_OWNER_VALUE;
for (const auto &extension : functionConfig.extension) {
(*createRequest->mutable_schedulingops()->mutable_extension())[extension.first] = extension.second;
}
(*createRequest->mutable_schedulingops()->mutable_resources())[CPU_RESOURCE_NAME] = functionConfig.cpu;
(*createRequest->mutable_schedulingops()->mutable_resources())[MEMORY_RESOURCE_NAME] = functionConfig.memory;
auto scheduleRequest = TransFromCreateReqToScheduleReq(std::move(*createRequest), "");
(*scheduleRequest->mutable_instance()->mutable_scheduleoption()->mutable_resourceselector())[RESOURCE_OWNER_KEY] =
scheduleRequest->instance().instanceid();
scheduleRequest->mutable_instance()->mutable_instancestatus()->set_code(
static_cast<int32_t>(InstanceState::SCHEDULING));
scheduleRequest->mutable_instance()->mutable_instancestatus()->set_msg("scheduling");
scheduleRequest->mutable_instance()->set_issystemfunc(true);
scheduleRequest->mutable_instance()->set_tenantid(functionConfig.tenantID.empty() ? "0" : functionConfig.tenantID);
YRLOG_INFO("{}|schedule instance({})", scheduleRequest->requestid(), scheduleRequest->instance().instanceid());
return scheduleRequest;
}
void BootstrapActor::SystemFunctionKeepAlive()
{
ASSERT_IF_NULL(business_);
business_->SystemFunctionKeepAlive();
keepAliveTimer_ =
litebus::AsyncAfter(member_->sysFuncRetryPeriod, GetAID(), &BootstrapActor::SystemFunctionKeepAlive);
}
void BootstrapActor::StopSystemFunctionKeepAlive()
{
litebus::TimerTools::Cancel(keepAliveTimer_);
}
FuncInstanceParams BootstrapActor::BuildFuncInstanceParams(const std::string &functionName,
const FunctionConfig &config, uint32_t index)
{
std::string key = functionName + "-" + std::to_string(index);
return FuncInstanceParams{ .traceID = key,
.instanceID = key,
.requestID = key,
.functionKey = config.tenantID + "/" + functionName + "/" + config.version };
}
litebus::Option<FunctionConfig> BootstrapActor::GetFunctionConfig(const std::string &funcName)
{
if (member_->functionConfigMap.find(funcName) == member_->functionConfigMap.end()) {
return litebus::None();
}
return member_->functionConfigMap[funcName];
}
litebus::Option<FunctionConfig> BootstrapActor::GetCurrFunctionConfig(const std::string &funcName)
{
if (member_->currFunctionConfigMap.find(funcName) == member_->currFunctionConfigMap.end()) {
return litebus::None();
}
return member_->currFunctionConfigMap[funcName];
}
size_t BootstrapActor::GetFunctionConfigSize()
{
return member_->functionConfigMap.size();
}
litebus::Option<nlohmann::json> BootstrapActor::GetSysFuncCustomArgs(const std::string &funcName)
{
if (sysFuncCustomArgsMap_.find(funcName) == sysFuncCustomArgsMap_.end()) {
return {};
}
return sysFuncCustomArgsMap_[funcName];
}
litebus::Option<FunctionPayload> BootstrapActor::GetFunctionPayload(const std::string &funcName)
{
if (systemFuncPayloadMap_.find(funcName) == systemFuncPayloadMap_.end()) {
return {};
}
return systemFuncPayloadMap_[funcName];
}
litebus::Option<FunctionMetaQueue> BootstrapActor::GetFunctionMetaQueue()
{
return systemFuncMetaQueue_;
}
void BootstrapActor::UpdateLeaderInfo(const explorer::LeaderInfo &leaderInfo)
{
litebus::AID masterAID(SYSTEM_FUNCTION_LOADER_BOOTSTRAP_ACTOR, leaderInfo.address);
auto newStatus = leader::GetStatus(GetAID(), masterAID, curStatus_);
if (businesses_.find(newStatus) == businesses_.end()) {
YRLOG_WARN("new status({}) business don't exist", newStatus);
return;
}
business_ = businesses_[newStatus];
business_->OnChange();
curStatus_ = newStatus;
}
void BootstrapActor::ForwardCustomSignalResponse(const litebus::AID &from, std::string &&, std::string &&msg)
{
internal::ForwardKillResponse forwardKillResponse;
if (msg.empty() || !forwardKillResponse.ParseFromString(msg)) {
YRLOG_WARN("(custom signal)invalid response body from({}).", from.HashString());
return;
}
auto iter(killPromiseMap_.find(forwardKillResponse.requestid()));
if (iter == killPromiseMap_.end()) {
YRLOG_WARN("{}|(custom signal)failed to get response, no request matches result",
forwardKillResponse.requestid());
return;
}
YRLOG_DEBUG("{}|(custom signal) get response", forwardKillResponse.requestid());
iter->second->SetValue(Status::OK());
(void)killPromiseMap_.erase(forwardKillResponse.requestid());
}
void BootstrapActor::BindInstanceManager(const std::shared_ptr<instance_manager::InstanceManager> &instanceMgr)
{
member_->instanceMgr = instanceMgr;
}
void BootstrapActor::MasterBusiness::SystemFunctionKeepAlive()
{
auto actor = actor_.lock();
ASSERT_IF_NULL(actor);
for (const auto &functionConfig : member_->currFunctionConfigMap) {
for (uint32_t i = 0; i < functionConfig.second.instanceNum; ++i) {
auto params = BuildFuncInstanceParams(functionConfig.first, functionConfig.second, i);
litebus::Async(actor->GetAID(), &BootstrapActor::CheckInstanceExist, params)
.OnComplete(litebus::Defer(actor->GetAID(), &BootstrapActor::OnCheckInstanceExist,
std::placeholders::_1, params, functionConfig));
}
}
}
void BootstrapActor::MasterBusiness::ScaleByFunctionName(const std::string &funcName, uint32_t instanceNum,
const FunctionPayload &payload)
{
auto actor = actor_.lock();
ASSERT_IF_NULL(actor);
actor->DoDynamicScaleByFunctionName(funcName, instanceNum, payload);
}
bool BootstrapActor::CheckSysFunctionNeedUpgrade(const std::string &funcName, const FunctionConfig &newConfig,
const litebus::Option<FunctionConfig> &currConfigOpt)
{
if (currConfigOpt.IsNone()) {
YRLOG_INFO("current config is empty, need to upgrade function ({})", funcName);
return true;
}
auto currConfig = currConfigOpt.Get();
if (newConfig.version != currConfig.version) {
YRLOG_INFO("version ({}) is different from current version ({}), need to upgrade function ({})",
newConfig.version, currConfig.version, funcName);
return true;
}
if (newConfig.createOptions.size() != currConfig.createOptions.size()) {
YRLOG_INFO("size of createOptions differs, need to upgrade function ({}), size before ({}), size after ({})",
funcName, currConfig.createOptions.size(), newConfig.createOptions.size());
return true;
}
for (auto &[optKey, value] : newConfig.createOptions) {
auto currIt = currConfig.createOptions.find(optKey);
if (currIt == currConfig.createOptions.end()) {
YRLOG_INFO("{} option is added, need to upgrade function ({})", optKey, funcName);
return true;
}
if (std::strcmp(value.c_str(), currIt->second.c_str()) != 0) {
YRLOG_INFO(
"{} has different config, new({}), curr({}), need to upgrade function ({})",
optKey, value, currIt->second, funcName);
return true;
}
}
if (abs(newConfig.memory - currConfig.memory) > EPSINON) {
YRLOG_INFO("memory ({}) is different from current memory ({}), need to upgrade function ({})", newConfig.memory,
currConfig.memory, funcName);
return true;
}
if (abs(newConfig.cpu - currConfig.cpu) > EPSINON) {
YRLOG_INFO("cpu ({}) is different from current cpu ({}), need to upgrade function ({})", newConfig.cpu,
currConfig.cpu, funcName);
return true;
}
if (std::strcmp(newConfig.args.dump().c_str(), currConfig.args.dump().c_str()) != 0) {
YRLOG_INFO("args is different from current args, need to upgrade function ({})", funcName);
return true;
}
return false;
}
void BootstrapActor::SendUpgradeFunctionScheduleRequest(const std::shared_ptr<litebus::Promise<Status>> &promise,
const FunctionConfig &newConfig,
const FuncInstanceParams &newParams,
uint32_t retryTime)
{
if (retryTime > MAX_RETRY_TIMES) {
YRLOG_WARN("Timeout to UpgradeFunctionSchedule, retry");
promise->SetValue(Status(StatusCode::FAILED));
return;
}
member_->globalSched->Schedule(BuildScheduleRequest(newConfig, newParams))
.After(retryTimeoutMs_,
[](const litebus::Future<Status> &future) {
future.SetFailed(-1);
return future;
})
.OnComplete([promise, newConfig, newParams, retryTime, aid(GetAID())](const litebus::Future<Status> &future) {
if (future.IsError()) {
YRLOG_WARN("Timeout to UpgradeFunctionSchedule, retry({})", retryTime);
litebus::Async(aid, &BootstrapActor::SendUpgradeFunctionScheduleRequest, promise, newConfig, newParams,
retryTime + 1);
return;
}
promise->SetValue(future.Get());
});
}
void BootstrapActor::RetrySendUpgradeFunctionSignal(const std::string &functionName, const FunctionConfig &newConfig,
const FunctionConfig &currConfig, int retryTimes,
const std::shared_ptr<litebus::Promise<Status>> &promise)
{
promise->Associate(litebus::Async(GetAID(), &BootstrapActor::SendUpgradeFunctionSignal, functionName,
newConfig, currConfig, retryTimes));
}
litebus::Future<Status> BootstrapActor::SendUpgradeFunctionSignal(const std::string &functionName,
const FunctionConfig &newConfig,
const FunctionConfig &currConfig, int retryTimes)
{
if (retryTimes > MAX_RETRY_TIMES) {
YRLOG_WARN("Failed to upgrade function ({}), reaches maximum retry times", functionName);
return Status(StatusCode::FAILED);
}
auto newParams = BuildFuncInstanceParams(functionName, newConfig, 0);
auto currParams = BuildFuncInstanceParams(functionName, currConfig, 0);
auto promise = std::make_shared<litebus::Promise<Status>>();
KillInstance(currParams, SHUT_DOWN_SIGNAL_SYNC)
.After(UPGRADE_TIMEOUT_MS, [currParams](const litebus::Future<Status> &future) -> litebus::Future<Status> {
YRLOG_WARN("Kill instance ({}) timeout {} ms", currParams.functionKey, UPGRADE_TIMEOUT_MS);
auto promise = std::make_shared<litebus::Promise<Status>>();
promise->SetFailed(static_cast<int32_t>(StatusCode::FAILED));
return promise->GetFuture();
})
.Then([aid(GetAID()), currParams, newConfig, newParams, member(member_),
waitKillInstanceMs(waitKillInstanceMs_)](const Status &status) {
auto promise = std::make_shared<litebus::Promise<Status>>();
if (status.IsError() && status.StatusCode() != ERR_INSTANCE_NOT_FOUND
&& status.StatusCode() != ERR_INSTANCE_EXITED) {
YRLOG_WARN("Failed to kill instance({}), Code({})", currParams.functionKey,
fmt::underlying(status.StatusCode()));
promise->SetFailed(static_cast<int32_t>(StatusCode::FAILED));
return promise->GetFuture();
}
if (member->isExiting) {
YRLOG_WARN("Failed to create new instance({}), system function loader is exiting",
newParams.functionKey);
promise->SetFailed(static_cast<int32_t>(StatusCode::FAILED));
return promise->GetFuture();
}
litebus::AsyncAfter(waitKillInstanceMs, aid, &BootstrapActor::SendUpgradeFunctionScheduleRequest, promise,
newConfig, newParams, 0);
return promise->GetFuture();
})
.OnComplete([aid(GetAID()), promise, functionName, newConfig, currConfig,
retryTimes, retryTimeoutMs(retryTimeoutMs_)](const litebus::Future<Status> &future) {
if (future.IsError() || future.Get().IsError()) {
YRLOG_WARN("Failed to upgrade function({}) for {} times, retry to upgrade", functionName, retryTimes);
litebus::AsyncAfter(retryTimeoutMs, aid, &BootstrapActor::RetrySendUpgradeFunctionSignal,
functionName, newConfig, currConfig, retryTimes + 1, promise);
return;
}
YRLOG_INFO("Succeed to upgrade function ({})", functionName);
promise->SetValue(Status(StatusCode::SUCCESS));
});
return promise->GetFuture();
}
void BootstrapActor::RetrySendUpdateArgsSignal(const std::string &functionName, const FunctionPayload &functionPayload,
int retryTimes, const std::shared_ptr<litebus::Promise<Status>> &promise)
{
promise->Associate(
litebus::Async(GetAID(), &BootstrapActor::SendUpdateArgsSignal, functionName, functionPayload, retryTimes));
}
litebus::Future<Status> BootstrapActor::SendUpdateArgsSignal(const std::string &functionName,
const FunctionPayload &functionPayload, int retryTimes)
{
if (retryTimes > MAX_RETRY_TIMES) {
YRLOG_WARN("Failed to update function({}) args, reaches maximum retry times", functionName);
return Status(StatusCode::FAILED);
}
if (member_->currFunctionConfigMap.find(functionPayload.sysFuncName) == member_->currFunctionConfigMap.end()) {
YRLOG_WARN("Failed to update function({}) args, can't find system-function ({})", functionName,
functionPayload.sysFuncName);
return Status(StatusCode::FAILED);
}
FunctionConfig functionConfig = member_->currFunctionConfigMap[functionPayload.sysFuncName];
auto params = BuildFuncInstanceParams(functionPayload.sysFuncName, functionConfig, 0);
auto promise = std::make_shared<litebus::Promise<Status>>();
YRLOG_INFO("Start to send function ({}) args to instance ({}) with signal ({})", functionName, params.functionKey,
functionPayload.signal);
KillInstance(params, functionPayload.signal, functionName)
.After(SENDARGS_TIMEOUT_MS, [aid(GetAID()), functionName,
params](const litebus::Future<Status> &future) -> litebus::Future<Status> {
YRLOG_WARN("Send function ({}) args to instance({}) timeout {} ms", functionName, params.functionKey,
SENDARGS_TIMEOUT_MS);
auto promise = std::make_shared<litebus::Promise<Status>>();
promise->SetFailed(static_cast<int32_t>(StatusCode::FAILED));
return promise->GetFuture();
})
.OnComplete([aid(GetAID()), promise, functionName, functionPayload, params,
retryTimes, retryTimeoutMs(retryTimeoutMs_)](const litebus::Future<Status> &future) {
if (future.IsError() || future.Get().IsError()) {
YRLOG_WARN("Failed to send function({}) args to instance({}) for {} times, retry to send", functionName,
params.functionKey, retryTimes);
litebus::AsyncAfter(retryTimeoutMs, aid, &BootstrapActor::RetrySendUpdateArgsSignal, functionName,
functionPayload, retryTimes + 1, promise);
return;
}
YRLOG_INFO("Succeed to send function({}) args to instance ({})", functionName, params.functionKey);
promise->SetValue(Status(StatusCode::SUCCESS));
});
return promise->GetFuture();
}
litebus::Future<std::list<Status>> BootstrapActor::MasterBusiness::KillSystemFuncInstances()
{
member_->isExiting = true;
auto actor = actor_.lock();
ASSERT_IF_NULL(actor);
std::list<litebus::Future<Status>> futures;
for (const auto &functionConfig : member_->functionConfigMap) {
for (uint32_t i = 0; i < functionConfig.second.instanceNum; ++i) {
auto params = BuildFuncInstanceParams(functionConfig.first, functionConfig.second, i);
futures.push_back(actor->KillInstance(params, SHUT_DOWN_SIGNAL_SYNC));
}
}
return litebus::Collect<Status>(futures);
}
void BootstrapActor::SlaveBusiness::SystemFunctionKeepAlive()
{
}
void BootstrapActor::SlaveBusiness::ScaleByFunctionName(const std::string &funcName, uint32_t instanceNum,
const FunctionPayload &payload)
{
}
litebus::Future<std::list<Status>> BootstrapActor::SlaveBusiness::KillSystemFuncInstances()
{
litebus::Promise<std::list<Status>> promise;
std::list<Status> list;
promise.SetValue(list);
return promise.GetFuture();
}
}