* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "group_manager_actor.h"
#include <google/protobuf/util/json_util.h>
#include "async/async.hpp"
#include "async/collect.hpp"
#include "async/defer.hpp"
#include "common/constants/signal.h"
#include "common/logs/logging.h"
#include "common/utils/collect_status.h"
namespace functionsystem::instance_manager {
const int64_t DEFAULT_RETRY_INTERVAL = 10000;
bool GenGroupValueJson(const std::shared_ptr<messages::GroupInfo> &group, std::string &jsonStr)
{
return google::protobuf::util::MessageToJsonString(*group, &jsonStr).ok();
}
std::shared_ptr<internal::ForwardKillRequest> MakeKillReq(
const std::shared_ptr<resource_view::InstanceInfo> &instanceInfo, const std::string &srcInstanceID, int32_t signal,
const std::string &msg)
{
auto requestID = litebus::uuid_generator::UUID::GetRandomUUID().ToString();
core_service::KillRequest killRequest{};
killRequest.set_signal(signal);
killRequest.set_instanceid(instanceInfo->instanceid());
killRequest.set_payload(msg);
killRequest.set_requestid(requestID);
auto forwardKillRequest = std::make_shared<internal::ForwardKillRequest>();
forwardKillRequest->set_requestid(requestID);
forwardKillRequest->set_srcinstanceid(srcInstanceID);
forwardKillRequest->set_instancerequestid(instanceInfo->requestid());
*forwardKillRequest->mutable_req() = std::move(killRequest);
return forwardKillRequest;
}
void GroupManagerActor::Init()
{
curStatus_ = SLAVE_BUSINESS;
businesses_[MASTER_BUSINESS] = std::make_shared<MasterBusiness>(member_, shared_from_this());
businesses_[SLAVE_BUSINESS] = std::make_shared<SlaveBusiness>(member_, shared_from_this());
business_ = businesses_[curStatus_];
WatchGroups();
Receive("ForwardCustomSignalResponse", &GroupManagerActor::OnForwardCustomSignalResponse);
Receive("KillGroup", &GroupManagerActor::KillGroup);
Receive("OnClearGroup", &GroupManagerActor::OnClearGroup);
}
void GroupManagerActor::OnForwardCustomSignalResponse(const litebus::AID &from, std::string &&name, std::string &&msg)
{
ASSERT_IF_NULL(business_);
business_->OnForwardCustomSignalResponse(from, std::move(name), std::move(msg));
}
void GroupManagerActor::MasterBusiness::OnForwardCustomSignalResponse(const litebus::AID &from, std::string &&name,
std::string &&msg)
{
YRLOG_DEBUG("receive OnForwardCustomSignalResponse from {}", std::string(from));
auto killRsp = std::make_shared<internal::ForwardKillResponse>();
killRsp->ParseFromString(msg);
if (auto it = member_->killRspPromises.find(killRsp->requestid()); it != member_->killRspPromises.end()) {
it->second->SetValue(Status(StatusCode(killRsp->code()), killRsp->message()));
member_->killRspPromises.erase(it);
return;
}
YRLOG_WARN("receive a kill response of unknown requestID({})", killRsp->requestid());
}
litebus::Future<Status> GroupManagerActor::OnInstanceAbnormal(
const std::string &instanceKey, const std::shared_ptr<resource_view::InstanceInfo> &instanceInfo)
{
ASSERT_IF_NULL(business_);
return business_->OnInstanceAbnormal(instanceKey, instanceInfo);
}
litebus::Future<Status> GroupManagerActor::MasterBusiness::OnInstanceAbnormal(
const std::string &instanceKey, const std::shared_ptr<resource_view::InstanceInfo> &instanceInfo)
{
ProcessAbnormalInstanceChildrenGroup(instanceKey, instanceInfo);
if (instanceInfo->groupid().empty()) {
return Status::OK();
}
auto errMsg = fmt::format(
"instance ({}) under group ({}) was abnormal , instance exited with code({})",
instanceInfo->groupid(), instanceInfo->instanceid(), instanceInfo->instancestatus().exitcode());
return FatalGroup(instanceInfo->groupid(), instanceInfo->instanceid(), errMsg);
}
* FatalGroup will set a group to FATAL, and then set all instances in group to FATAL
* @param instanceInfo
* @param groupID
* @return
*/
litebus::Future<Status> GroupManagerActor::MasterBusiness::FatalGroup(const std::string &groupID,
const std::string &ignoredInstanceID,
const std::string &errMsg)
{
auto [groupKeyInfo, exists] = member_->groupCaches->GetGroupInfo(groupID);
if (!exists) {
return Status(StatusCode::ERR_INNER_SYSTEM_ERROR, "group not found");
}
auto groupKey = groupKeyInfo.first;
auto groupInfo = groupKeyInfo.second;
if (groupInfo->status() == static_cast<int32_t>(GroupState::FAILED)) {
YRLOG_WARN("group ({}) already failed", groupID);
return Status::OK();
}
auto cacheInsLen = member_->groupCaches->GetGroupInstances(groupID).size();
YRLOG_DEBUG("{}|{} receive instance delete, check group({}) instance life cycle: {}, cache instance len: {}",
groupInfo->traceid(), groupInfo->requestid(), groupID, groupInfo->groupopts().samerunninglifecycle(),
cacheInsLen);
if (!groupInfo->groupopts().samerunninglifecycle() && cacheInsLen > 0) {
YRLOG_WARN("{}|{} group ({}) is not same running lifecycle", groupInfo->traceid(),
groupInfo->requestid(), groupID);
return Status::OK();
}
groupInfo->set_status(static_cast<int32_t>(GroupState::FAILED));
groupInfo->set_message(errMsg);
std::string groupValue;
if (!GenGroupValueJson(groupInfo, groupValue)) {
return Status(StatusCode::JSON_PARSE_ERROR, "parse gen group value json str failed");
}
auto actor = actor_.lock();
ASSERT_IF_NULL(actor);
ASSERT_IF_NULL(member_->metaClient);
member_->metaClient->Put(groupKey, groupValue, {})
.OnComplete(litebus::Defer(actor->GetAID(), &GroupManagerActor::FatalAllInstanceOfGroup, groupID,
ignoredInstanceID, errMsg));
return Status::OK();
}
litebus::Future<Status> GroupManagerActor::MasterBusiness::PersistentGroupInfo(const std::string &groupID,
const GroupState &state,
const std::string &description)
{
auto [groupKeyInfo, exists] = member_->groupCaches->GetGroupInfo(groupID);
if (!exists) {
return Status(StatusCode::ERR_INNER_SYSTEM_ERROR, "group not found");
}
auto groupKey = groupKeyInfo.first;
auto groupInfo = groupKeyInfo.second;
if (groupInfo->status() == static_cast<int32_t>(state)) {
YRLOG_WARN("group ({}) already in {}", groupID, ToString(state));
return Status::OK();
}
groupInfo->set_status(static_cast<int32_t>(state));
groupInfo->set_message(description);
std::string groupValue;
if (!GenGroupValueJson(groupInfo, groupValue)) {
return Status(StatusCode::JSON_PARSE_ERROR, "failed to gen group value json str");
}
auto actor = actor_.lock();
ASSERT_IF_NULL(actor);
ASSERT_IF_NULL(member_->metaClient);
return member_->metaClient->Put(groupKey, groupValue, {}).Then([](const std::shared_ptr<PutResponse> &rsp) {
return rsp->status;
});
}
litebus::Future<Status> GroupManagerActor::MasterBusiness::ProcessAbnormalInstanceChildrenGroup(
const std::string &instanceKey, const std::shared_ptr<resource_view::InstanceInfo> &instanceInfo)
{
for (auto [groupKey, groupInfo] : member_->groupCaches->GetChildGroups(instanceInfo->instanceid())) {
groupInfo->set_status(static_cast<int32_t>(GroupState::FAILED));
groupInfo->set_message(fmt::format("group parent({}) failed", instanceInfo->instanceid()));
std::string groupValue;
if (!GenGroupValueJson(groupInfo, groupValue)) {
return Status(StatusCode::JSON_PARSE_ERROR, "failed to gen group value json str");
}
ASSERT_IF_NULL(member_->metaClient);
member_->metaClient->Put(groupKey, groupValue, {})
.OnComplete([gk(groupKey)](const litebus::Future<std::shared_ptr<PutResponse>> &putRsp) {
if (putRsp.IsError()) {
YRLOG_ERROR("failed to put group({}) info in metastore, status({})", gk, putRsp.GetErrorCode());
return;
}
if (putRsp.Get()->status.IsError()) {
YRLOG_ERROR("failed to put group({}) info in metastore, putRsp({})", gk,
putRsp.Get()->status.GetMessage());
return;
}
});
}
return Status::OK();
}
litebus::Future<Status> GroupManagerActor::MasterBusiness::ProcessDeleteInstanceChildrenGroup(
const std::string &instanceKey, const std::shared_ptr<resource_view::InstanceInfo> &instanceInfo)
{
auto createdGroups = member_->groupCaches->GetChildGroups(instanceInfo->instanceid());
YRLOG_INFO("deleted instance({}) creates {} groups, will be deleted as well", instanceInfo->instanceid(),
createdGroups.size());
for (auto createdGroup : createdGroups) {
YRLOG_INFO("group({}) parent({}) is deleted, clear group info", createdGroup.second->groupid(),
instanceInfo->instanceid());
auto actor = actor_.lock();
ASSERT_IF_NULL(actor);
actor->ClearGroupInfo(createdGroup.second->groupid(), Status::OK());
}
return Status::OK();
}
void GroupManagerActor::FatalAllInstanceOfGroup(const std::string &groupID, const std::string &ignoredInstanceID,
const std::string &errMsg)
{
auto instances = member_->groupCaches->GetGroupInstances(groupID);
for (const auto &iter : instances) {
auto cachedInstanceInfo = iter.second;
if (ignoredInstanceID == cachedInstanceInfo->instanceid()) {
continue;
}
auto killReq = MakeKillReq(cachedInstanceInfo, GROUP_MANAGER_OWNER, GROUP_EXIT_SIGNAL, errMsg);
ASSERT_IF_NULL(member_->globalScheduler);
member_->globalScheduler->GetLocalAddress(cachedInstanceInfo->functionproxyid())
.Then(litebus::Defer(GetAID(), &GroupManagerActor::InnerKillInstance, std::placeholders::_1,
cachedInstanceInfo, killReq))
.OnComplete([cachedInstanceInfo](const litebus::Future<Status> &s) {
if (!s.IsOK()) {
YRLOG_ERROR("failed to get kill instance {}, on proxy {}, in group {}, err is {}",
cachedInstanceInfo->instanceid(), cachedInstanceInfo->functionproxyid(),
cachedInstanceInfo->groupid(), s.GetErrorCode());
}
});
}
}
litebus::Future<Status> GroupManagerActor::OnInstancePut(
const std::string &instanceKey, const std::shared_ptr<resource_view::InstanceInfo> &instanceInfo)
{
ASSERT_IF_NULL(business_);
return business_->OnInstancePut(instanceKey, instanceInfo);
}
litebus::Future<Status> GroupManagerActor::MasterBusiness::OnInstancePut(
const std::string &instanceKey, const std::shared_ptr<resource_view::InstanceInfo> &instanceInfo)
{
if (instanceInfo->groupid().empty()) {
YRLOG_DEBUG("instance({}) doesn't belong to any group, ignored", instanceInfo->instanceid());
return Status::OK();
}
if (auto [groupKeyInfo, exists] = member_->groupCaches->GetGroupInfo(instanceInfo->groupid());
exists && groupKeyInfo.second->status() == static_cast<int32_t>(GroupState::FAILED)) {
if (instanceInfo->instancestatus().code() != static_cast<int32_t>(InstanceState::SCHEDULING) &&
instanceInfo->instancestatus().code() != static_cast<int32_t>(InstanceState::CREATING) &&
instanceInfo->instancestatus().code() != static_cast<int32_t>(InstanceState::RUNNING) &&
instanceInfo->instancestatus().code() != static_cast<int32_t>(InstanceState::EXITING) &&
instanceInfo->instancestatus().code() != static_cast<int32_t>(InstanceState::EXITED) &&
instanceInfo->instancestatus().code() != static_cast<int32_t>(InstanceState::EVICTING)) {
return Status::OK();
}
auto actor = actor_.lock();
ASSERT_IF_NULL(actor);
auto groupInfo = groupKeyInfo.second;
auto killReq = MakeKillReq(instanceInfo, GROUP_MANAGER_OWNER, GROUP_EXIT_SIGNAL,
"instance exit with group together, reason: group(" + instanceInfo->groupid() +
") failed due to " + groupInfo->message());
ASSERT_IF_NULL(member_->globalScheduler);
return member_->globalScheduler->GetLocalAddress(instanceInfo->functionproxyid())
.Then(litebus::Defer(actor->GetAID(), &GroupManagerActor::InnerKillInstance, std::placeholders::_1,
instanceInfo, killReq))
.OnComplete([instanceInfo](litebus::Future<Status> s) {
if (!s.IsOK()) {
YRLOG_ERROR("failed to get kill instance {}, on proxy {}, in group {}, err is {}",
instanceInfo->instanceid(), instanceInfo->functionproxyid(), instanceInfo->groupid(),
s.GetErrorCode());
}
return s;
});
}
member_->groupCaches->AddGroupInstance(instanceInfo->groupid(), instanceKey, instanceInfo);
return Status::OK();
}
void GroupManagerActor::OnClearGroup(const litebus::AID &from, std::string &&name, std::string &&msg)
{
auto killGroupResp = std::make_shared<::messages::KillGroupResponse>();
if (!killGroupResp->ParseFromString(msg)) {
YRLOG_ERROR("failed to parse response for clear group. from({}) msg({}), ignore it", std::string(from), msg);
return;
}
requestGroupClearMatch_.Synchronized(killGroupResp->groupid(), Status::OK());
}
void GroupManagerActor::KillGroup(const litebus::AID &from, std::string &&name, std::string &&msg)
{
ASSERT_IF_NULL(business_);
YRLOG_DEBUG("receive kill group request from {}", from.HashString());
business_->KillGroup(from, std::move(name), std::move(msg));
}
void GroupManagerActor::MasterBusiness::KillGroup(const litebus::AID &from, std::string &&name, std::string &&msg)
{
auto killGroupReq = std::make_shared<::messages::KillGroup>();
RETURN_IF_TRUE(!killGroupReq->ParseFromString(msg), "invalid request for resume group");
RETURN_IF_TRUE(auto inserted = member_->killingGroups.emplace(killGroupReq->groupid()).second;
!inserted, fmt::format("receive group({}) request which suspend/resume/kill is ongoint, ignored",
killGroupReq->groupid()));
auto actor = actor_.lock();
ASSERT_IF_NULL(actor);
switch (killGroupReq->signal()) {
case (SHUT_DOWN_SIGNAL_GROUP): {
InnerKillGroup(killGroupReq->groupid(), killGroupReq->srcinstanceid())
.OnComplete(litebus::Defer(actor->GetAID(), &GroupManagerActor::InnerKillInstanceOnComplete, from,
killGroupReq->groupid(), killGroupReq->grouprequestid(),
std::placeholders::_1));
return;
}
case (GROUP_SUSPEND_SIGNAL): {
return SuspendGroup(from, killGroupReq);
}
case (GROUP_RESUME_SIGNAL): {
return ResumeGroup(from, killGroupReq);
}
default: {
YRLOG_WARN("invalid group signal({}) for {}", killGroupReq->signal(), killGroupReq->groupid());
return;
}
}
return;
}
litebus::Future<Status> GroupManagerActor::MasterBusiness::InnerKillGroup(const std::string &groupID,
const std::string &srcInstanceID)
{
YRLOG_INFO("start killing group {}", groupID);
auto instances = member_->groupCaches->GetGroupInstances(groupID);
auto futures = std::list<litebus::Future<Status>>();
auto actor = actor_.lock();
ASSERT_IF_NULL(actor);
if (auto group = member_->groupCaches->GetGroupInfo(groupID);
group.second && group.first.second->status() == static_cast<int32_t>(GroupState::SCHEDULING)) {
auto reason = fmt::format("group({}) canceled", groupID);
ASSERT_IF_NULL(member_->instanceManager);
(void)member_->instanceManager->TryCancelSchedule(groupID, messages::CancelType::GROUP, reason);
}
for (const auto &inst : instances) {
auto killReq = MakeKillReq(inst.second, srcInstanceID, SHUT_DOWN_SIGNAL, "group killed");
auto promise = std::make_shared<litebus::Promise<Status>>();
futures.emplace_back(promise->GetFuture());
member_->killRspPromises[killReq->requestid()] = promise;
ASSERT_IF_NULL(member_->globalScheduler);
member_->globalScheduler->GetLocalAddress(inst.second->functionproxyid())
.Then(litebus::Defer(actor->GetAID(), &GroupManagerActor::InnerKillInstance, std::placeholders::_1,
inst.second, killReq))
.OnComplete([instInfo(inst.second)](const litebus::Future<Status> &s) {
if (!s.IsOK()) {
YRLOG_ERROR("failed to get kill instance {}, on proxy {}, in group {}", instInfo->instanceid(),
instInfo->functionproxyid(), instInfo->groupid());
}
});
}
std::string errDescription = "kill group(" + groupID + ") instances";
return CollectStatus(futures, errDescription, StatusCode::FAILED, StatusCode::SUCCESS)
.After(KILLGROUP_TIMEOUT,
[](const litebus::Future<Status> &future) {
auto promise = litebus::Promise<Status>();
promise.SetValue(Status(StatusCode::REQUEST_TIME_OUT, "kill group timeout"));
return promise.GetFuture();
})
.Then(litebus::Defer(actor->GetAID(), &GroupManagerActor::ClearGroupInfo, groupID,
std::placeholders::_1, true));
}
void GroupManagerActor::OnGroupSuspend(const litebus::Future<Status> &future, const litebus::AID &from,
const std::string &requestID, const std::string &groupID)
{
ASSERT_FS(future.IsOK());
auto status = future.Get();
if (status.IsError()) {
return InnerKillInstanceOnComplete(
from, groupID, requestID,
Status(status.StatusCode(),
fmt::format("failed to suspend group({}), reason:{}", groupID, status.RawMessage())));
}
ASSERT_IF_NULL(business_);
business_->PersistentGroupInfo(groupID, GroupState::SUSPEND, "group is already suspend")
.OnComplete(litebus::Defer(GetAID(), &GroupManagerActor::InnerKillInstanceOnComplete, from, groupID, requestID,
std::placeholders::_1));
}
litebus::Future<Status> GroupManagerActor::BroadCastSignalForGroup(const std::string &groupID,
const std::string &srcInstanceID,
const int32_t &signal)
{
ASSERT_IF_NULL(business_);
return business_->BroadCastSignalForGroup(groupID, srcInstanceID, signal);
}
litebus::Future<Status> GroupManagerActor::MasterBusiness::BroadCastSignalForGroup(const std::string &groupID,
const std::string &srcInstanceID,
const int32_t &signal)
{
auto actor = actor_.lock();
ASSERT_IF_NULL(actor);
auto instances = member_->groupCaches->GetGroupInstances(groupID);
auto futures = std::list<litebus::Future<Status>>();
YRLOG_INFO("broadcast {} to {} instances of group({})", SignalToString(signal), instances.size(), groupID);
for (const auto &inst : instances) {
auto killReq = MakeKillReq(inst.second, srcInstanceID, signal, fmt::format("group broadcast"));
auto promise = std::make_shared<litebus::Promise<Status>>();
futures.emplace_back(promise->GetFuture());
member_->killRspPromises[killReq->requestid()] = promise;
ASSERT_IF_NULL(member_->globalScheduler);
member_->globalScheduler->GetLocalAddress(inst.second->functionproxyid())
.Then(litebus::Defer(actor->GetAID(), &GroupManagerActor::InnerKillInstance, std::placeholders::_1,
inst.second, killReq))
.OnComplete([instInfo(inst.second), signal](const litebus::Future<Status> &s) {
if (!s.IsOK()) {
YRLOG_ERROR("failed to send ({}) instance {}, on proxy {}, in group {}", SignalToString(signal),
instInfo->instanceid(), instInfo->functionproxyid(), instInfo->groupid());
}
});
}
std::string errDescription = fmt::format("broadcast ({}) group({}) instances", SignalToString(signal), groupID);
return CollectStatus(futures, errDescription);
}
void GroupManagerActor::MasterBusiness::SuspendGroup(const litebus::AID &from,
const std::shared_ptr<::messages::KillGroup> &killGroupReq)
{
YRLOG_INFO("recevied group({}) suspend request from {}", killGroupReq->groupid(), from.HashString());
auto actor = actor_.lock();
ASSERT_IF_NULL(actor);
auto &groupID = killGroupReq->groupid();
auto &requestID = killGroupReq->grouprequestid();
auto &srcInstanceID = killGroupReq->srcinstanceid();
auto group = member_->groupCaches->GetGroupInfo(groupID);
if (!group.second) {
auto reason = fmt::format("group({}) is not found, unable to be suspend", groupID);
YRLOG_ERROR("{}, request from {}", reason, from.HashString());
return actor->InnerKillInstanceOnComplete(from, groupID, requestID,
Status(StatusCode::ERR_PARAM_INVALID, reason));
}
if (group.first.second->status() == static_cast<int32_t>(GroupState::SUSPEND)) {
return actor->InnerKillInstanceOnComplete(from, groupID, requestID, Status::OK());
}
if (group.first.second->status() != static_cast<int32_t>(GroupState::RUNNING)) {
auto reason = fmt::format("status of group(id:{} name:{}) is {} which not allow to be suspend", groupID,
group.first.second->groupopts().groupname(),
ToString(static_cast<GroupState>(group.first.second->status())));
YRLOG_ERROR("{}, request from {}", reason, from.HashString());
return actor->InnerKillInstanceOnComplete(from, groupID, requestID,
Status(StatusCode::ERR_STATE_MACHINE_ERROR, reason));
}
BroadCastSignalForGroup(groupID, srcInstanceID, INSTANCE_CHECKPOINT_SIGNAL)
.Then([actor, groupID, srcInstanceID, requestID](const Status &status) -> litebus::Future<Status> {
if (status.IsError()) {
return status;
}
return litebus::Async(actor->GetAID(), &GroupManagerActor::BroadCastSignalForGroup, groupID, srcInstanceID,
INSTANCE_TRANS_SUSPEND_SIGNAL);
})
.Then(litebus::Defer(actor->GetAID(), &GroupManagerActor::ClearGroupInfo, groupID,
std::placeholders::_1, false))
.OnComplete(litebus::Defer(actor->GetAID(), &GroupManagerActor::OnGroupSuspend, std::placeholders::_1, from,
requestID, groupID));
}
void GroupManagerActor::MasterBusiness::ResumeGroup(const litebus::AID &from,
const std::shared_ptr<::messages::KillGroup> &killGroupReq)
{
YRLOG_INFO("recevied group({}) resume request from {}", killGroupReq->groupid(), from.HashString());
auto actor = actor_.lock();
ASSERT_IF_NULL(actor);
auto groupID = killGroupReq->groupid();
auto requestID = killGroupReq->grouprequestid();
auto srcInstanceID = killGroupReq->srcinstanceid();
auto group = member_->groupCaches->GetGroupInfo(groupID);
if (!group.second) {
auto reason = fmt::format("group({}) is not found, unable to be resume", groupID);
YRLOG_ERROR("{}, request from {}", reason, from.HashString());
return actor->InnerKillInstanceOnComplete(from, groupID, requestID,
Status(StatusCode::ERR_PARAM_INVALID, reason));
}
if (group.first.second->status() == static_cast<int32_t>(GroupState::RUNNING)) {
return actor->InnerKillInstanceOnComplete(from, groupID, requestID, Status::OK());
}
if (group.first.second->status() != static_cast<int32_t>(GroupState::SUSPEND)) {
auto reason = fmt::format("status of group(id:{} name:{}) is {} which not allow to be resumed", groupID,
group.first.second->groupopts().groupname(),
ToString(static_cast<GroupState>(group.first.second->status())));
YRLOG_ERROR("{}, request from {}", reason, from.HashString());
return actor->InnerKillInstanceOnComplete(from, groupID, requestID,
Status(StatusCode::ERR_STATE_MACHINE_ERROR, reason));
}
if (member_->enableFakeSuspendResume) {
YRLOG_INFO("enable_fake_suspend_resume is true, using directed resume for group({})", groupID);
FakeResumeGroup(from, groupID, requestID);
} else {
ReScheduleGroup(groupID).OnComplete(litebus::Defer(actor->GetAID(), &GroupManagerActor::OnGroupResume,
std::placeholders::_1, from, groupID, requestID));
}
}
void GroupManagerActor::OnGroupResume(const litebus::Future<Status> &future, const litebus::AID &from,
const std::string &groupID, const std::string &requestID)
{
ASSERT_FS(future.IsOK());
auto status = future.Get();
if (status.IsError()) {
return InnerKillInstanceOnComplete(
from, groupID, requestID,
Status(status.StatusCode(),
fmt::format("failed to resume group({}), reason:{}", groupID, status.RawMessage())));
}
ASSERT_IF_NULL(business_);
business_->PersistentGroupInfo(groupID, GroupState::RUNNING, "group is already resumed")
.OnComplete(litebus::Defer(GetAID(), &GroupManagerActor::InnerKillInstanceOnComplete, from, groupID, requestID,
std::placeholders::_1));
}
litebus::Future<Status> GroupManagerActor::MasterBusiness::ReScheduleGroup(const std::string &groupID)
{
auto actor = actor_.lock();
ASSERT_IF_NULL(actor);
auto instances = member_->groupCaches->GetGroupInstances(groupID);
auto [group, exist] = member_->groupCaches->GetGroupInfo(groupID);
if (!exist || group.second == nullptr) {
auto reason = fmt::format("group({}) is not found to resume.", groupID);
YRLOG_ERROR("{}", reason);
return Status(StatusCode::ERR_PARAM_INVALID, reason);
}
auto groupInfo = group.second;
for (auto inst : instances) {
auto &info = inst.second;
for (auto &request : *groupInfo->mutable_requests()) {
if (request.requestid() != info->requestid()) {
continue;
}
*request.mutable_instance() = *info;
request.mutable_instance()->set_ischeckpointed(true);
}
}
ASSERT_IF_NULL(member_->globalScheduler);
return member_->globalScheduler->GroupSchedule(groupInfo, DEFAULT_RETRY_INTERVAL)
.Then([](const messages::GroupResponse &rsp) {
return Status(static_cast<StatusCode>(rsp.code()), rsp.message());
});
}
void GroupManagerActor::MasterBusiness::FakeResumeGroup(
const litebus::AID &from,
const std::string &groupID,
const std::string &requestID)
{
auto actor = actor_.lock();
ASSERT_IF_NULL(actor);
DirectedResumeGroup(groupID)
.OnComplete([this, from, groupID, requestID](const litebus::Future<Status> &future) {
OnFakeResumeComplete(future, from, groupID, requestID);
});
}
litebus::Future<Status> GroupManagerActor::MasterBusiness::DirectedResumeGroup(const std::string &groupID)
{
auto actor = actor_.lock();
ASSERT_IF_NULL(actor);
auto instances = member_->groupCaches->GetGroupInstances(groupID);
if (instances.empty()) {
return Status(StatusCode::ERR_PARAM_INVALID, fmt::format("group({}) has no instances to resume", groupID));
}
YRLOG_INFO("directed resume: sending INSTANCE_RESUME_SIGNAL to {} instances of group({})",
instances.size(), groupID);
auto futures = std::list<litebus::Future<Status>>();
for (const auto &inst : instances) {
auto killReq = MakeKillReq(inst.second, GROUP_MANAGER_OWNER, INSTANCE_RESUME_SIGNAL,
fmt::format("directed resume for group({})", groupID));
auto promise = std::make_shared<litebus::Promise<Status>>();
futures.emplace_back(promise->GetFuture());
member_->killRspPromises[killReq->requestid()] = promise;
ASSERT_IF_NULL(member_->globalScheduler);
member_->globalScheduler->GetLocalAddress(inst.second->functionproxyid())
.Then(litebus::Defer(actor->GetAID(), &GroupManagerActor::InnerKillInstance, std::placeholders::_1,
inst.second, killReq))
.OnComplete([this, instInfo(inst.second), reqID(killReq->requestid())](const litebus::Future<Status> &s) {
if (!s.IsOK()) {
YRLOG_ERROR("failed to send INSTANCE_RESUME_SIGNAL to instance {}, on proxy {}, in group {}",
instInfo->instanceid(), instInfo->functionproxyid(), instInfo->groupid());
if (auto iter = member_->killRspPromises.find(reqID); iter != member_->killRspPromises.end()) {
iter->second->SetValue(
Status(StatusCode::ERR_INNER_COMMUNICATION, "failed to send resume signal"));
member_->killRspPromises.erase(iter);
}
return;
}
if (s.Get().IsError()) {
if (auto iter = member_->killRspPromises.find(reqID); iter != member_->killRspPromises.end()) {
iter->second->SetValue(s.Get());
member_->killRspPromises.erase(iter);
}
}
});
}
std::string errDescription = fmt::format("directed resume group({}) instances", groupID);
return CollectStatus(futures, errDescription);
}
void GroupManagerActor::MasterBusiness::OnFakeResumeComplete(
const litebus::Future<Status> &future,
const litebus::AID &from,
const std::string &groupID,
const std::string &requestID)
{
auto actor = actor_.lock();
ASSERT_IF_NULL(actor);
if (!future.IsOK()) {
YRLOG_ERROR("directed resume future failed for group({})", groupID);
return actor->InnerKillInstanceOnComplete(
from, groupID, requestID,
Status(StatusCode::ERR_INNER_SYSTEM_ERROR,
fmt::format("failed to resume group({}), reason: future failed", groupID)));
}
auto status = future.Get();
if (status.IsError()) {
YRLOG_ERROR("directed resume failed for group({}), reason: {}", groupID, status.RawMessage());
auto instances = member_->groupCaches->GetGroupInstances(groupID);
std::list<std::shared_ptr<resource_view::InstanceInfo>> resumedInstances;
for (const auto &inst : instances) {
resumedInstances.push_back(inst.second);
}
RollbackResumedInstances(groupID, resumedInstances);
return actor->InnerKillInstanceOnComplete(
from, groupID, requestID,
Status(status.StatusCode(),
fmt::format("failed to resume group({}), reason:{}", groupID, status.RawMessage())));
}
PersistentGroupInfo(groupID, GroupState::RUNNING, "group resumed via directed resume")
.OnComplete(litebus::Defer(actor->GetAID(), &GroupManagerActor::InnerKillInstanceOnComplete, from, groupID,
requestID, std::placeholders::_1));
}
void GroupManagerActor::MasterBusiness::RollbackResumedInstances(
const std::string &groupID, const std::list<std::shared_ptr<resource_view::InstanceInfo>> &resumedInstances)
{
auto actor = actor_.lock();
ASSERT_IF_NULL(actor);
if (resumedInstances.empty()) {
YRLOG_INFO("no resumed instances to rollback for group({})", groupID);
return;
}
YRLOG_INFO("rolling back {} resumed instances for group({})", resumedInstances.size(), groupID);
for (const auto &inst : resumedInstances) {
auto killReq = MakeKillReq(inst, GROUP_MANAGER_OWNER, INSTANCE_TRANS_SUSPEND_SIGNAL,
fmt::format("rollback resume for group({})", groupID));
auto promise = std::make_shared<litebus::Promise<Status>>();
member_->killRspPromises[killReq->requestid()] = promise;
ASSERT_IF_NULL(member_->globalScheduler);
member_->globalScheduler->GetLocalAddress(inst->functionproxyid())
.Then(litebus::Defer(actor->GetAID(), &GroupManagerActor::InnerKillInstance, std::placeholders::_1,
inst, killReq))
.OnComplete([this, instInfo(inst), reqID(killReq->requestid())](const litebus::Future<Status> &s) {
if (!s.IsOK()) {
YRLOG_ERROR("failed to rollback instance {} on proxy {} in group {}",
instInfo->instanceid(), instInfo->functionproxyid(), instInfo->groupid());
if (auto iter = member_->killRspPromises.find(reqID); iter != member_->killRspPromises.end()) {
iter->second->SetValue(
Status(StatusCode::ERR_INNER_COMMUNICATION, "failed to send rollback suspend signal"));
member_->killRspPromises.erase(iter);
}
return;
}
if (s.Get().IsError()) {
if (auto iter = member_->killRspPromises.find(reqID); iter != member_->killRspPromises.end()) {
iter->second->SetValue(s.Get());
member_->killRspPromises.erase(iter);
}
}
});
}
}
* @brief local abnormal, kill all other instances
*
* @param abnormalLocal
* @param instanceInfo
* @return litebus::Future<Status>
*/
litebus::Future<Status> GroupManagerActor::OnLocalAbnormal(const std::string &abnormalLocal)
{
ASSERT_IF_NULL(business_);
return business_->OnLocalAbnormal(abnormalLocal);
}
litebus::Future<Status> GroupManagerActor::MasterBusiness::OnLocalAbnormal(const std::string &abnormalLocal)
{
YRLOG_INFO("master business get on local({}) abnormal", abnormalLocal);
auto ownedGroups = member_->groupCaches->GetNodeGroups(abnormalLocal);
YRLOG_INFO("abnormal local owns {} groups", ownedGroups.size());
for (auto group : ownedGroups) {
YRLOG_INFO("abnormal local owns group {}({})", group.first, group.second->status());
auto currGroupState = group.second->status();
group.second->set_ownerproxy(GROUP_MANAGER_OWNER);
if (currGroupState == static_cast<int32_t>(GroupState::SCHEDULING)) {
group.second->set_status(static_cast<int32_t>(GroupState::FAILED));
}
std::string groupValue;
if (!GenGroupValueJson(group.second, groupValue)) {
return Status(StatusCode::JSON_PARSE_ERROR, "failed to gen group value json str");
}
ASSERT_IF_NULL(member_->metaClient);
member_->metaClient->Put(group.first, groupValue, {}).Then([](const std::shared_ptr<PutResponse> &rsp) {
if (rsp->status.IsError()) {
YRLOG_ERROR("failed to modify group owner in etcd, err(%s)", rsp->status.GetMessage());
}
return rsp->status;
});
if (currGroupState != static_cast<int32_t>(GroupState::SCHEDULING)) {
continue;
}
auto actor = actor_.lock();
auto instances = member_->groupCaches->GetGroupInstances(group.second->groupid());
YRLOG_INFO("send GROUP_EXIT_SIGNAL to {} instances", instances.size());
for (auto inst : instances) {
auto killReq =
MakeKillReq(inst.second, GROUP_MANAGER_OWNER, GROUP_EXIT_SIGNAL,
"instance exit with group together, reason: local scheduler(" + abnormalLocal + ") failed");
auto promise = std::make_shared<litebus::Promise<Status>>();
member_->killRspPromises[killReq->requestid()] = promise;
ASSERT_IF_NULL(member_->globalScheduler);
member_->globalScheduler->GetLocalAddress(inst.second->functionproxyid())
.Then(litebus::Defer(actor->GetAID(), &GroupManagerActor::InnerKillInstance, std::placeholders::_1,
inst.second, killReq))
.OnComplete([instInfo(inst.second)](litebus::Future<Status> s) {
if (!s.IsOK()) {
YRLOG_ERROR("failed to get kill instance {}, on proxy {}, in group {}, err is {}",
instInfo->instanceid(), instInfo->functionproxyid(), instInfo->groupid(),
s.GetErrorCode());
}
});
}
}
return Status::OK();
}
litebus::Future<Status> GroupManagerActor::OnInstanceDelete(
const std::string &instanceKey, const std::shared_ptr<resource_view::InstanceInfo> &instanceInfo)
{
ASSERT_IF_NULL(business_);
return business_->OnInstanceDelete(instanceKey, instanceInfo);
}
litebus::Future<Status> GroupManagerActor::ClearGroupInfo(const std::string &groupID, const Status &status,
bool isClearMetastore)
{
if (!status.IsOk()) {
YRLOG_WARN("status is not ok when clear group info, {}", status.GetMessage());
return status;
}
auto [groupKeyInfo, exists] = member_->groupCaches->GetGroupInfo(groupID);
if (!exists) {
return Status(StatusCode::ERR_GROUP_SCHEDULE_FAILED, "group not found in group manager");
}
auto groupKey = groupKeyInfo.first;
auto ownerProxy = groupKeyInfo.second->ownerproxy();
auto clearGroupReq = std::make_shared<messages::KillGroup>();
clearGroupReq->set_groupid(groupID);
clearGroupReq->set_grouprequestid(groupKeyInfo.second->requestid());
auto promise = std::make_shared<litebus::Promise<Status>>();
ASSERT_IF_NULL(member_->globalScheduler);
member_->globalScheduler->GetLocalAddress(ownerProxy)
.Then(litebus::Defer(GetAID(), &GroupManagerActor::SendClearGroupToLocal, std::placeholders::_1, groupKey,
clearGroupReq, promise, isClearMetastore));
return promise->GetFuture();
}
litebus::Future<Status> GroupManagerActor::SendClearGroupToLocal(
const litebus::Option<std::string> &proxyAddress, const std::string &groupKey,
const std::shared_ptr<messages::KillGroup> clearReq, const std::shared_ptr<litebus::Promise<Status>> &promise,
bool isClearMetastore)
{
if (proxyAddress.IsNone()) {
YRLOG_WARN("{}|failed to clear group, local address not found", clearReq->groupid());
DeleteGroupInfoFromMetaStore(groupKey, promise);
return Status::OK();
}
auto localAID = litebus::AID(LOCAL_GROUP_CTRL_ACTOR_NAME, proxyAddress.Get());
auto future = requestGroupClearMatch_.AddSynchronizer(clearReq->groupid());
(void)Send(localAID, "ClearGroup", clearReq->SerializeAsString());
future.OnComplete([promise, groupKey, isClearMetastore, aid(GetAID())]
(const litebus::Future<Status> &future) {
if (future.IsError()) {
YRLOG_WARN("failed get clear group response, group:{}", groupKey);
}
if (isClearMetastore) {
litebus::Async(aid, &GroupManagerActor::DeleteGroupInfoFromMetaStore, groupKey, promise);
} else {
promise->SetValue(Status::OK());
}
});
return Status::OK();
}
void GroupManagerActor::DeleteGroupInfoFromMetaStore(const std::string &groupKey,
const std::shared_ptr<litebus::Promise<Status>> promise)
{
ASSERT_IF_NULL(member_->metaClient);
member_->metaClient->Delete(groupKey, {})
.OnComplete([promise, groupKey](const litebus::Future<std::shared_ptr<DeleteResponse>> &delRsp) {
if (delRsp.IsError()) {
promise->SetValue(Status(StatusCode::BP_META_STORAGE_DELETE_ERROR,
"failed to delete group info to metastore, key " + groupKey));
} else {
promise->SetValue(Status::OK());
}
});
}
void GroupManagerActor::InnerKillInstanceOnComplete(const litebus::AID &from, const std::string &groupID,
const std::string &requestID,
const litebus::Future<Status> &future)
{
RETURN_IF_TRUE(future.IsError(), "Invalid future");
auto status = future.Get();
auto msg = ::messages::KillGroupResponse{};
msg.set_groupid(groupID);
msg.set_code(static_cast<int32_t>(status.StatusCode()));
msg.set_message(status.RawMessage());
msg.set_grouprequestid(requestID);
YRLOG_INFO("send OnKillGroup of ({}) to {}, msg {}", groupID, from.HashString(), msg.message());
Send(from, "OnKillGroup", msg.SerializeAsString());
member_->killingGroups.erase(groupID);
}
litebus::Future<Status> GroupManagerActor::InnerKillInstance(
const litebus::Option<std::string> &proxyAddress, const std::shared_ptr<resource_view::InstanceInfo> &instance,
const std::shared_ptr<internal::ForwardKillRequest> killReq)
{
if (proxyAddress.IsNone()) {
auto status = Status(StatusCode::ERR_INNER_COMMUNICATION, "local address not found");
if (auto iter = member_->killRspPromises.find(killReq->requestid()); iter != member_->killRspPromises.end()) {
iter->second->SetValue(status);
member_->killRspPromises.erase(iter);
}
return status;
}
auto localAID =
litebus::AID(instance->functionproxyid() + LOCAL_SCHED_INSTANCE_CTRL_ACTOR_NAME_POSTFIX, proxyAddress.Get());
YRLOG_INFO("{}|send instance({}) kill request to local({})", killReq->requestid(), instance->instanceid(),
std::string(localAID));
(void)Send(localAID, "ForwardCustomSignalRequest", killReq->SerializeAsString());
return Status::OK();
}
void GroupManagerActor::WatchGroups()
{
YRLOG_INFO("start watch groups info");
ASSERT_IF_NULL(member_->metaClient);
auto observer = [aid(GetAID())](const std::vector<WatchEvent> &events, bool) -> bool {
litebus::Async(aid, &GroupManagerActor::OnGroupWatchEvent, events);
return true;
};
auto syncer = [aid(GetAID())](const std::shared_ptr<GetResponse> &getResponse) -> litebus::Future<SyncResult> {
return litebus::Async(aid, &GroupManagerActor::GroupInfoSyncer, getResponse);
};
auto handler = [aid(GetAID())](const std::shared_ptr<GetResponse> &getResponse) -> litebus::Future<Status> {
return litebus::Async(aid, &GroupManagerActor::CheckSyncResponse, getResponse);
};
(void)member_->metaClient
->GetAndWatchWithHandler(GROUP_PATH_PREFIX, { .prefix = true, .prevKv = true }, observer, syncer, handler)
.Then([aid(GetAID())](const std::shared_ptr<Watcher> &watcher) -> litebus::Future<Status> {
litebus::Async(aid, &GroupManagerActor::OnGroupWatch, watcher);
return Status::OK();
});
(void)explorer::Explorer::GetInstance().AddLeaderChangedCallback(
"GroupManager", [aid(GetAID())](const explorer::LeaderInfo &leaderInfo) {
litebus::Async(aid, &GroupManagerActor::UpdateLeaderInfo, leaderInfo);
});
}
void GroupManagerActor::OnGroupWatch(const std::shared_ptr<Watcher> &watcher)
{
YRLOG_INFO("start watch groups info");
member_->watcher = watcher;
}
void GroupManagerActor::OnGroupWatchEvent(const std::vector<WatchEvent> &events)
{
YRLOG_INFO("get group watch events");
for (const auto &event : events) {
switch (event.eventType) {
case EVENT_TYPE_PUT: {
auto eventKey = TrimKeyPrefix(event.kv.key(), member_->metaClient->GetTablePrefix());
auto group = std::make_shared<messages::GroupInfo>();
if (TransToGroupInfoFromJson(*group, event.kv.value())) {
OnGroupPut(eventKey, group);
} else {
YRLOG_ERROR("failed to transform group({}) info from String.", eventKey);
}
break;
}
case EVENT_TYPE_DELETE: {
auto history = std::make_shared<messages::GroupInfo>();
auto eventKey = TrimKeyPrefix(event.prevKv.key(), member_->metaClient->GetTablePrefix());
if (!TransToGroupInfoFromJson(*history, event.prevKv.value())) {
YRLOG_ERROR("failed to transform group({}) info from String.", eventKey);
break;
}
OnGroupDelete(eventKey, history);
break;
}
default: {
YRLOG_ERROR("not supported");
break;
}
}
}
}
void GroupManagerActor::OnGroupPut(const std::string &groupKey, std::shared_ptr<messages::GroupInfo> groupInfo)
{
ASSERT_IF_NULL(business_);
business_->OnGroupPut(groupKey, groupInfo);
}
void GroupManagerActor::MasterBusiness::OnGroupPut(const std::string &groupKey,
std::shared_ptr<messages::GroupInfo> groupInfo)
{
member_->groupCaches->AddGroup(groupKey, groupInfo);
auto actor = actor_.lock();
ASSERT_IF_NULL(actor);
ASSERT_IF_NULL(member_->instanceManager);
member_->instanceManager->GetInstanceInfoByInstanceID(groupInfo->parentid())
.Then(litebus::Defer(actor->GetAID(), &GroupManagerActor::OnGroupPutCheckParentStatus, groupKey, groupInfo,
std::placeholders::_1));
}
litebus::Future<Status> GroupManagerActor::OnGroupPutCheckParentStatus(
const std::string &groupKey, const std::shared_ptr<messages::GroupInfo> &groupInfo,
const std::pair<std::string, std::shared_ptr<resource_view::InstanceInfo>> &parentInfo)
{
if (parentInfo.second == nullptr) {
return OnGroupPutParentMissing(groupKey, groupInfo);
} else if (parentInfo.second->instancestatus().code() == static_cast<int32_t>(InstanceState::FATAL)) {
return OnGroupPutParentFatal(groupKey, groupInfo);
}
return Status::OK();
}
litebus::Future<Status> GroupManagerActor::OnGroupPutParentMissing(
const std::string &groupKey, const std::shared_ptr<messages::GroupInfo> &groupInfo)
{
ASSERT_IF_NULL(business_);
return business_->InnerKillGroup(groupInfo->groupid(), groupInfo->parentid());
}
litebus::Future<Status> GroupManagerActor::OnGroupPutParentFatal(const std::string &groupKey,
const std::shared_ptr<messages::GroupInfo> &groupInfo)
{
auto errMsg = fmt::format("group({}) parent({}) is abnormal", groupInfo->groupid(), groupInfo->parentid());
ASSERT_IF_NULL(business_);
return business_->FatalGroup(groupInfo->groupid(), groupInfo->parentid(), errMsg);
}
void GroupManagerActor::OnGroupDelete(const std::string &groupKey,
const std::shared_ptr<messages::GroupInfo> &groupInfo)
{
member_->groupCaches->RemoveGroup(groupInfo->groupid());
}
void GroupManagerActor::QueryGroupStatus(const litebus::AID &from, std::string &&name, std::string &&msg)
{
YRLOG_ERROR("calling not implemented method QueryGroupStatus");
}
litebus::Future<Status> GroupManagerActor::OnGetInstanceFromMetaStore(
const litebus::Future<std::shared_ptr<GetResponse>> &getResponse,
const std::string &instanceID, const std::string &groupID)
{
if (getResponse.IsError() || getResponse.Get() == nullptr || getResponse.Get()->kvs.empty()) {
YRLOG_WARN("failed to get instance({}) from meta store", instanceID);
auto errMsg = fmt::format("group({}) instance({}) is killed separately", groupID, instanceID);
ASSERT_IF_NULL(business_);
business_->FatalGroup(groupID, instanceID, errMsg);
}
return Status::OK();
}
void GroupManagerActor::MasterBusiness::CheckAndFetchMissingInstances(const std::string &groupID,
const std::set<std::string> &cachedInstanceIDs,
const google::protobuf::RepeatedPtrField<messages::ScheduleRequest> &requests)
{
for (const auto &req : requests) {
const auto &instanceID = req.instance().instanceid();
if (cachedInstanceIDs.count(instanceID) == 0) {
auto actor = actor_.lock();
YRLOG_DEBUG("Instance({}) not in local cache, checking existence.", instanceID);
ASSERT_IF_NULL(member_->metaClient);
auto instanceKey = GenInstanceKey(req.instance().function(), req.instance().instanceid(),
req.instance().requestid());
if (instanceKey.IsNone()) {
YRLOG_ERROR("{}|{} failed to generate key", req.instance().requestid(), req.instance().instanceid());
continue;
}
member_->metaClient->Get(instanceKey.Get(), {})
.Then(litebus::Defer(actor->GetAID(), &GroupManagerActor::OnGetInstanceFromMetaStore,
std::placeholders::_1, instanceID, groupID));
}
}
}
void GroupManagerActor::MasterBusiness::CheckGroupInstanceConsistency(std::shared_ptr<messages::GroupInfo> &groupInfo)
{
std::set<std::string> cachedInstanceIDs;
const auto &cachedGroupInstanceInfoMap = member_->groupCaches->GetGroupInstances(groupInfo->groupid());
for (const auto &pair: cachedGroupInstanceInfoMap) {
cachedInstanceIDs.insert(pair.second->instanceid());
}
* GroupInfo in ETCD or range instances
* "requests":[{"instance":{"instanceID":"676f43f5-18fe-432f-9300-000000000080"....]",
* "rangeRequests":[{"instance":{"instanceID":"676f43f5-18fe-432f-9300-000000000080-r-0"...]"
* when insRangeScheduler==true, use rangeRequests instead of requests
*/
if (groupInfo->insrangescheduler()) {
CheckAndFetchMissingInstances(groupInfo->groupid(), cachedInstanceIDs, groupInfo->rangerequests());
} else {
CheckAndFetchMissingInstances(groupInfo->groupid(), cachedInstanceIDs, groupInfo->requests());
}
}
void GroupManagerActor::MasterBusiness::OnChange()
{
YRLOG_INFO("GroupManagerActor become master");
for (auto group : member_->groupCaches->GetGroups()) {
if (group.second.second->status() == static_cast<int32_t>(GroupState::RUNNING) &&
group.second.second->groupopts().samerunninglifecycle()) {
CheckGroupInstanceConsistency(group.second.second);
continue;
}
if (group.second.second->status() == static_cast<int32_t>(GroupState::FAILED)) {
YRLOG_INFO("find group({}) is failed", group.second.second->groupid());
for (auto instance : member_->groupCaches->GetGroupInstances(group.second.second->groupid())) {
if (instance.second->instancestatus().code() == static_cast<int32_t>(InstanceState::RUNNING) ||
instance.second->instancestatus().code() == static_cast<int32_t>(InstanceState::CREATING)) {
YRLOG_INFO("find instance({}) with status({}) in group({}), will set it to fatal",
instance.second->instanceid(), instance.second->instancestatus().code(),
group.second.second->groupid());
auto actor = actor_.lock();
auto killReq = MakeKillReq(instance.second, GROUP_MANAGER_OWNER, GROUP_EXIT_SIGNAL,
"instance exit with group together, reason: group(" +
group.second.second->groupid() + ") failed due to " +
group.second.second->message());
ASSERT_IF_NULL(member_->globalScheduler);
member_->globalScheduler->GetLocalAddress(instance.second->functionproxyid())
.Then(litebus::Defer(actor->GetAID(), &GroupManagerActor::InnerKillInstance,
std::placeholders::_1, instance.second, killReq))
.OnComplete([instInfo(instance.second)](litebus::Future<Status> s) {
if (!s.IsOK()) {
YRLOG_ERROR("failed to get kill instance {}, on proxy {}, in group {}",
instInfo->instanceid(), instInfo->functionproxyid(), instInfo->groupid());
}
});
}
}
}
}
}
void GroupManagerActor::GroupCaches::AddGroup(const std::string groupKey,
const std::shared_ptr<messages::GroupInfo> &group)
{
YRLOG_DEBUG("adding group(id={}, parent={}, node={}, status={}, message: {})", group->groupid(), group->parentid(),
group->ownerproxy(), group->status(), group->message());
groups_.insert_or_assign(group->groupid(), std::make_pair(groupKey, group));
if (auto it = nodeName2Groups_.find(group->ownerproxy()); it != nodeName2Groups_.end()) {
it->second.insert_or_assign(groupKey, group);
} else {
nodeName2Groups_.insert_or_assign(
group->ownerproxy(),
std::unordered_map<std::string, std::shared_ptr<messages::GroupInfo>>{ { groupKey, group } });
}
if (auto it = parent2Groups_.find(group->parentid()); it != parent2Groups_.end()) {
it->second.insert_or_assign(groupKey, group);
} else {
parent2Groups_.insert_or_assign(
group->parentid(),
std::unordered_map<std::string, std::shared_ptr<messages::GroupInfo>>{ { groupKey, group } });
}
}
void GroupManagerActor::GroupCaches::RemoveGroup(const std::string &groupID)
{
YRLOG_DEBUG("remove group({})", groupID);
std::string groupOwner;
std::string groupKey;
std::string groupParent;
if (auto it = groups_.find(groupID); it != groups_.end()) {
groupOwner = it->second.second->ownerproxy();
groupParent = it->second.second->parentid();
groupKey = it->second.first;
groups_.erase(it);
}
if (auto it = nodeName2Groups_.find(groupOwner); it != nodeName2Groups_.end()) {
it->second.erase(groupKey);
if (it->second.empty()) {
nodeName2Groups_.erase(it);
}
}
if (auto it = parent2Groups_.find(groupParent); it != parent2Groups_.end()) {
it->second.erase(groupKey);
if (it->second.empty()) {
parent2Groups_.erase(it);
}
}
if (auto it = groupID2Instances_.find(groupID); it != groupID2Instances_.end()) {
groupID2Instances_.erase(it);
}
}
std::pair<GroupKeyInfoPair, bool> GroupManagerActor::GroupCaches::GetGroupInfo(const std::string &groupID)
{
if (auto it = groups_.find(groupID); it != groups_.end()) {
return { it->second, true };
}
return { {}, false };
}
GroupKeyInfoMap GroupManagerActor::GroupCaches::GetNodeGroups(const std::string &nodeName)
{
if (auto it = nodeName2Groups_.find(nodeName); it != nodeName2Groups_.end()) {
return it->second;
}
return {};
}
GroupKeyInfoMap GroupManagerActor::GroupCaches::GetChildGroups(const std::string &parentID)
{
if (auto it = parent2Groups_.find(parentID); it != parent2Groups_.end()) {
return it->second;
}
return {};
}
void GroupManagerActor::GroupCaches::AddGroupInstance(const std::string &groupID, const std::string &instanceKey,
const std::shared_ptr<resource_view::InstanceInfo> &instanceInfo)
{
if (auto it = groupID2Instances_.find(groupID); it != groupID2Instances_.end()) {
it->second.insert_or_assign(instanceKey, instanceInfo);
} else {
groupID2Instances_.emplace(groupID,
std::unordered_map<std::string, std::shared_ptr<resource_view::InstanceInfo>>{
{ instanceKey, instanceInfo } });
}
}
InstanceKeyInfoMap GroupManagerActor::GroupCaches::GetGroupInstances(const std::string &groupID)
{
if (auto it = groupID2Instances_.find(groupID); it != groupID2Instances_.end()) {
return it->second;
}
return {};
}
std::unordered_map<std::string, GroupKeyInfoPair> GroupManagerActor::GroupCaches::GetGroupInfos()
{
return groups_;
}
litebus::Future<SyncResult> GroupManagerActor::GroupInfoSyncer(const std::shared_ptr<GetResponse> &getResponse)
{
if (getResponse->status.IsError()) {
YRLOG_INFO("failed to get key({}) from meta storage", GROUP_PATH_PREFIX);
return SyncResult{ getResponse->status };
}
if (getResponse->kvs.empty()) {
YRLOG_INFO("get no result with key({}) from meta storage, revision is {}", GROUP_PATH_PREFIX,
getResponse->header.revision);
return SyncResult{ Status::OK() };
}
std::vector<WatchEvent> events;
std::set<std::string> etcdKvSet;
for (auto &kv : getResponse->kvs) {
auto group = std::make_shared<messages::GroupInfo>();
auto eventKey = TrimKeyPrefix(kv.key(), member_->metaClient->GetTablePrefix());
if (TransToGroupInfoFromJson(*group, kv.value())) {
OnGroupPut(eventKey, group);
etcdKvSet.emplace(group->groupid());
} else {
YRLOG_ERROR("failed to transform instance({}) info from String.", eventKey);
}
}
for (const auto groupInfo : member_->groupCaches->GetGroupInfos()) {
if (etcdKvSet.count(groupInfo.first) == 0) {
YRLOG_DEBUG("delete ({}) from cache.", groupInfo.second.first);
OnGroupDelete(groupInfo.second.first, groupInfo.second.second);
}
}
return SyncResult{ Status::OK() };
}
litebus::Future<Status> GroupManagerActor::CheckSyncResponse(const std::shared_ptr<GetResponse> &response)
{
if (!response->status.IsOk()) {
YRLOG_ERROR("failed to sync data, err is {}, gonna to suicide", response->status.ToString());
CommitSuicide();
return Status(StatusCode::FAILED);
}
if (response->header.revision > INT64_MAX - 1) {
YRLOG_ERROR("revision({}) add operation will exceed the maximum value({}) of INT64, gonna to suicide",
response->header.revision, INT64_MAX);
CommitSuicide();
return Status(StatusCode::FAILED);
}
return Status::OK();
}
void GroupManagerActor::CommitSuicide()
{
if (!isSuicide_) {
isSuicide_ = true;
(void)raise(SIGINT);
}
}
}