* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "instance_proxy.h"
#include "async/defer.hpp"
#include "common/metrics/metrics_adapter.h"
#include "common/status/status.h"
#include "busproxy/invocation_handler/invocation_handler.h"
namespace functionsystem::busproxy {
const std::string INSTANCE_EXIT_MESSAGE = "instance has been killed or exited.";
const std::string YR_ROUTE_KEY = "YR_ROUTE";
const uint32_t MAX_CALL_RESULT_RETRY_TIMES = 3;
void InstanceProxy::Init()
{
ActorBase::Init();
Receive("ForwardCall", &InstanceProxy::ForwardCall);
Receive("ResponseForwardCall", &InstanceProxy::ResponseForwardCall);
Receive("ForwardCallResult", &InstanceProxy::ForwardCallResult);
Receive("ResponseForwardCallResult", &InstanceProxy::ResponseForwardCallResult);
}
litebus::Future<std::string> InstanceProxy::GetTenantID()
{
ASSERT_FS(selfDispatcher_);
return selfDispatcher_->GetTenantID();
}
litebus::Future<SharedStreamMsg> InstanceProxy::Call(const CallerInfo &callerInfo,
const std::string &dstInstanceID, const SharedStreamMsg &request,
const std::shared_ptr<TimePoint> &time)
{
ASSERT_FS(request->has_callreq());
const auto &callReq = request->callreq();
YRLOG_INFO("{}|{}|received call request from {} to {}", callReq.traceid(), callReq.requestid(),
callerInfo.instanceID, dstInstanceID);
perf_->Record(callReq, dstInstanceID, time);
if (dstInstanceID == instanceID_) {
ASSERT_FS(selfDispatcher_);
return selfDispatcher_->Call(request, callerInfo)
.Then([aid(GetAID()), request, selfDispatcher(selfDispatcher_)](const SharedStreamMsg &callRsp) {
litebus::Async(aid, &InstanceProxy::OnLocalCall, callRsp, request, selfDispatcher);
return callRsp;
});
}
if (const auto it = callReq.createoptions().find(YR_ROUTE_KEY);
it != callReq.createoptions().end() && !it->second.empty()) {
auto remoteAID = litebus::AID(dstInstanceID, it->second);
auto promise = std::make_shared<litebus::Promise<SharedStreamMsg>>();
SendForwardCall(remoteAID, callerInfo.tenantID, request)
.OnComplete([messageID(request->messageid()), promise](const litebus::Future<SharedStreamMsg> &future) {
if (future.IsError()) {
auto response = std::make_shared<runtime_rpc::StreamingMessage>();
response->set_messageid(messageID);
auto callResponse = response->mutable_callrsp();
callResponse->set_code(
Status::GetPosixErrorCode(common::ErrorCode::ERR_REQUEST_BETWEEN_RUNTIME_BUS));
callResponse->set_message("connection with runtime may be interrupted, please retry.");
promise->SetValue(response);
} else {
auto rsp = future.Get();
rsp->set_messageid(messageID);
promise->SetValue(rsp);
}
});
return promise->GetFuture();
}
if (remoteDispatchers_.find(dstInstanceID) == remoteDispatchers_.end()) {
auto dispatcher = std::make_shared<RequestDispatcher>(dstInstanceID, false, "", shared_from_this(), perf_);
ASSERT_FS(observer_);
(void)observer_->SubscribeInstanceEvent(instanceID_, dstInstanceID);
remoteDispatchers_[dstInstanceID] = std::move(dispatcher);
}
const auto &dispatcher = remoteDispatchers_[dstInstanceID];
auto func = [traceID(callReq.traceid()), requestID(callReq.requestid()),
dispatcher](const SharedStreamMsg &callRsp) {
dispatcher->OnCall(callRsp, traceID, requestID);
return callRsp;
};
return dispatcher->Call(request, callerInfo).Then(func);
}
void InstanceProxy::OnLocalCall(const litebus::Future<SharedStreamMsg> &callRspFut, const SharedStreamMsg &callReq,
const std::shared_ptr<RequestDispatcher> &dispatcher)
{
ASSERT_FS(!callRspFut.IsError());
const auto &callRsp = callRspFut.Get();
auto &requestID = callReq->callreq().requestid();
perf_->RecordReceivedCallRsp(requestID);
dispatcher->OnCall(callRsp, callReq->callreq().traceid(), callReq->callreq().requestid());
}
void InstanceProxy::ForwardCall(const litebus::AID &from, std::string &&, std::string &&msg)
{
auto request = std::make_shared<runtime_rpc::StreamingMessage>();
(void)request->ParseFromString(msg);
(void)DoForwardCall(from, request);
}
litebus::Future<Status> InstanceProxy::DoForwardCall(const litebus::AID &from,
const std::shared_ptr<runtime_rpc::StreamingMessage> &request)
{
ASSERT_FS(request->has_callreq());
auto &srcInstanceID = from.Name();
const auto &callReq = request->callreq();
std::string srcTenantID = "";
if (request->messageid().length() > callReq.requestid().length()) {
srcTenantID = request->messageid().substr(0, request->messageid().length() - callReq.requestid().length());
request->set_messageid(callReq.requestid());
}
YRLOG_INFO("{}|{}|received forward Call instance from {} to {}, function name is {}", callReq.traceid(),
callReq.requestid(), srcInstanceID, instanceID_, callReq.function());
perf_->Record(callReq, instanceID_, nullptr);
std::map<std::string, std::string> callCreateOptMap;
for (const auto &ite : callReq.createoptions()) {
callCreateOptMap[ite.first] = ite.second;
}
functionsystem::metrics::MetricsAdapter::GetInstance().GetMetricsContext().SetBillingInvokeOptions(
callReq.requestid(), callCreateOptMap, callReq.function(), instanceID_);
ASSERT_FS(selfDispatcher_);
(void)selfDispatcher_->Call(request, CallerInfo{ .instanceID = srcInstanceID, .tenantID = srcTenantID })
.OnComplete(litebus::Defer(GetAID(), &InstanceProxy::OnForwardCall, std::placeholders::_1, from, request,
selfDispatcher_));
if ((remoteDispatchers_.find(srcInstanceID) == remoteDispatchers_.end() ||
remoteDispatchers_[srcInstanceID] == nullptr) &&
srcInstanceID != instanceID_) {
auto dispatcher = std::make_shared<RequestDispatcher>(srcInstanceID, false, "", shared_from_this(), perf_);
ASSERT_FS(observer_);
(void)observer_->SubscribeInstanceEvent(instanceID_, srcInstanceID, true);
remoteDispatchers_[srcInstanceID] = dispatcher;
}
if (srcInstanceID != instanceID_) {
auto dispatcher = remoteDispatchers_[srcInstanceID];
dispatcher->UpdateRemoteAID(from);
}
return Status::OK();
}
void InstanceProxy::Reject(const std::string &instanceID, const std::string &message, const StatusCode &code)
{
if (instanceID == instanceID_) {
ASSERT_FS(selfDispatcher_);
selfDispatcher_->Reject(message, code);
return;
}
if (remoteDispatchers_.find(instanceID) != remoteDispatchers_.end() && remoteDispatchers_[instanceID]) {
remoteDispatchers_[instanceID]->Reject(message, code);
}
}
void InstanceProxy::OnForwardCall(const litebus::Future<SharedStreamMsg> &callRspFut, const litebus::AID &from,
const SharedStreamMsg &callReq, const std::shared_ptr<RequestDispatcher> &dispatcher)
{
ASSERT_FS(!callRspFut.IsError());
const auto &callRsp = callRspFut.Get();
ASSERT_FS(callReq->has_callreq());
auto call = callReq->callreq();
auto &requestID = callReq->callreq().requestid();
perf_->RecordReceivedCallRsp(requestID);
dispatcher->OnCall(callRsp, call.traceid(), call.requestid());
callRsp->set_messageid(call.requestid());
YRLOG_INFO("{}|{}|ready to forward call response", call.traceid(), call.requestid());
Send(from, "ResponseForwardCall", callRsp->SerializeAsString());
}
void InstanceProxy::ResponseForwardCall(const litebus::AID &from, std::string &&, std::string &&msg)
{
auto request = std::make_shared<runtime_rpc::StreamingMessage>();
(void)request->ParseFromString(msg);
ASSERT_FS(request->has_callrsp());
auto &requestID = request->messageid();
perf_->RecordReceivedCallRsp(requestID);
YRLOG_INFO("receive forward call response {} from {}", request->messageid(), std::string(from));
if (auto promise(forwardCallPromises_.find(request->messageid())); promise != forwardCallPromises_.end()) {
promise->second->SetValue(request);
(void)forwardCallPromises_.erase(promise);
return;
}
YRLOG_WARN("no request {} is waiting for forward call response, ignore it.", request->messageid());
}
litebus::Future<SharedStreamMsg> InstanceProxy::CallResult(const std::string &srcInstanceID,
const std::string &dstInstanceID,
const SharedStreamMsg &request,
const std::shared_ptr<TimePoint> &time)
{
ASSERT_FS(request->has_callresultreq());
const auto &callresult = request->callresultreq();
auto &requestID = callresult.requestid();
perf_->RecordCallResult(requestID, time);
if (dstInstanceID == instanceID_) {
ASSERT_FS(selfDispatcher_);
return selfDispatcher_->CallResult(request)
.Then([aid(GetAID()), request, dstInstanceID, srcInstanceID](const SharedStreamMsg &callResultAck) {
litebus::Async(aid, &InstanceProxy::OnLocalCallResult, callResultAck, request, dstInstanceID,
srcInstanceID);
return callResultAck;
});
}
if (remoteDispatchers_.find(dstInstanceID) == remoteDispatchers_.end() ||
remoteDispatchers_[dstInstanceID] == nullptr) {
auto dispatcher = std::make_shared<RequestDispatcher>(dstInstanceID, false, "", shared_from_this(), perf_);
ASSERT_FS(observer_);
auto future = observer_->SubscribeInstanceEvent(instanceID_, dstInstanceID)
.Then([aid(GetAID()), srcInstanceID, dstInstanceID, request, time](const Status &) {
return litebus::Async(aid, &InstanceProxy::RetryCallResult, srcInstanceID, dstInstanceID,
request, time);
});
remoteDispatchers_[dstInstanceID] = dispatcher;
return future;
}
auto dispatcher = remoteDispatchers_[dstInstanceID];
auto callResultCode = callresult.code();
auto func = [requestID(callresult.requestid()), dispatcher(selfDispatcher_), callResultCode]
(const SharedStreamMsg &callResultAck) {
dispatcher->OnCallResult(callResultAck, requestID, callResultCode);
return callResultAck;
};
return dispatcher->CallResult(request).Then(func);
}
litebus::Future<SharedStreamMsg> InstanceProxy::RetryCallResult(const std::string &srcInstanceID,
const std::string &dstInstanceID,
const SharedStreamMsg &request,
const std::shared_ptr<TimePoint> &time)
{
if (remoteDispatchers_.find(dstInstanceID) != remoteDispatchers_.end()) {
failedSubDstRouteOnCallResult_.erase(dstInstanceID);
return CallResult(srcInstanceID, dstInstanceID, request, time);
}
static const uint32_t MAX_FAILED_TIMES = 3;
static const int64_t DEFER_RETRY = 1000;
if (failedSubDstRouteOnCallResult_[dstInstanceID] < MAX_FAILED_TIMES) {
failedSubDstRouteOnCallResult_[dstInstanceID]++;
YRLOG_WARN("subscribe dstInstance({}) for call result from {} failed {} times, retry again", dstInstanceID,
srcInstanceID, failedSubDstRouteOnCallResult_[dstInstanceID]);
auto promise = std::make_shared<litebus::Promise<SharedStreamMsg>>();
litebus::AsyncAfter(DEFER_RETRY, GetAID(), &InstanceProxy::DeferRetryCallResult, srcInstanceID,
dstInstanceID, request, time, promise);
return promise->GetFuture();
}
YRLOG_ERROR("subscribe dstInstance({}) for call result from {} failed {} times, instance not found", dstInstanceID,
srcInstanceID, failedSubDstRouteOnCallResult_[dstInstanceID]);
failedSubDstRouteOnCallResult_.erase(dstInstanceID);
auto response = std::make_shared<runtime_rpc::StreamingMessage>();
response->set_messageid(request->messageid());
auto callResultAck = response->mutable_callresultack();
callResultAck->set_code(Status::GetPosixErrorCode(StatusCode::ERR_INSTANCE_NOT_FOUND));
callResultAck->set_message("instance not found or instance may not be recovered");
return response;
}
void InstanceProxy::DeferRetryCallResult(const std::string &srcInstanceID, const std::string &dstInstanceID,
const SharedStreamMsg &request, const std::shared_ptr<TimePoint> &time,
const std::shared_ptr<litebus::Promise<SharedStreamMsg>> &promise)
{
auto future = CallResult(srcInstanceID, dstInstanceID, request, time);
promise->Associate(future);
}
void InstanceProxy::OnLocalCallResult(const litebus::Future<SharedStreamMsg> &callResultAckFut,
const SharedStreamMsg &callResult, const std::string &dstInstance,
const std::string &srcInstance)
{
ASSERT_FS(!callResultAckFut.IsError());
const auto &callResultAck = callResultAckFut.Get();
auto &requestID = callResult->callresultreq().requestid();
perf_->EndRecord(requestID);
if (remoteDispatchers_.find(dstInstance) != remoteDispatchers_.end() && remoteDispatchers_[dstInstance]) {
remoteDispatchers_[dstInstance]->OnCallResult(callResultAck, callResult->callresultreq().requestid(),
callResult->callresultreq().code());
}
if (remoteDispatchers_.find(srcInstance) != remoteDispatchers_.end() && remoteDispatchers_[srcInstance]) {
remoteDispatchers_[srcInstance]->OnCallResult(callResultAck, callResult->callresultreq().requestid(),
callResult->callresultreq().code());
}
if (srcInstance == instanceID_) {
ASSERT_FS(selfDispatcher_);
selfDispatcher_->OnCallResult(callResultAck, callResult->callresultreq().requestid(),
callResult->callresultreq().code());
InvocationHandler::ReleaseEstimateMemory(srcInstance, callResult->callresultreq().requestid());
return;
}
litebus::AID aid(srcInstance, GetAID().Url());
litebus::Async(aid, &InstanceProxy::OnLocalCallResult, callResultAckFut, callResult, dstInstance,
srcInstance);
}
void InstanceProxy::ForwardCallResult(const litebus::AID &from, std::string &&, std::string &&msg)
{
auto srcInstanceID = from.Name();
auto request = std::make_shared<runtime_rpc::StreamingMessage>();
(void)request->ParseFromString(msg);
ASSERT_FS(request->has_callresultreq());
auto callResult = request->callresultreq();
auto &requestID = request->messageid();
perf_->RecordCallResult(requestID, nullptr);
YRLOG_INFO("{}|receive forward call result from {}", callResult.requestid(), std::string(from));
ASSERT_FS(selfDispatcher_);
(void)selfDispatcher_->CallResult(request).OnComplete(litebus::Defer(
GetAID(), &InstanceProxy::OnForwardCallResult, std::placeholders::_1, from, request, srcInstanceID));
}
void InstanceProxy::OnForwardCallResult(const litebus::Future<SharedStreamMsg> &callResultAckFut,
const litebus::AID &from, const SharedStreamMsg &callResult,
const std::string &srcInstance)
{
ASSERT_FS(!callResultAckFut.IsError());
const auto &callResultAck = callResultAckFut.Get();
ASSERT_FS(callResultAck->has_callresultack());
auto &requestID = callResult->callresultreq().requestid();
perf_->EndRecord(requestID);
if (remoteDispatchers_.find(srcInstance) != remoteDispatchers_.end() && remoteDispatchers_[srcInstance]) {
remoteDispatchers_[srcInstance]->OnCallResult(callResultAck, callResult->callresultreq().requestid(),
callResult->callresultreq().code());
}
if (callResult->callresultreq().instanceid() == instanceID_) {
InvocationHandler::ReleaseEstimateMemory(from.Name(), callResult->callresultreq().requestid());
}
callResultAck->set_messageid(callResult->callresultreq().requestid());
YRLOG_INFO("{}|ready send forward call result response", callResult->callresultreq().requestid());
Send(from, "ResponseForwardCallResult", callResultAck->SerializeAsString());
}
void InstanceProxy::ResponseForwardCallResult(const litebus::AID &from, std::string &&, std::string &&msg)
{
auto request = std::make_shared<runtime_rpc::StreamingMessage>();
(void)request->ParseFromString(msg);
ASSERT_FS(request->has_callresultack());
auto &requestID = request->messageid();
perf_->EndRecord(requestID);
YRLOG_INFO("receive forward call result response {} from {}", request->messageid(), std::string(from));
if (auto promise(forwardCallResultPromises_.find(request->messageid()));
promise != forwardCallResultPromises_.end()) {
promise->second->SetValue(request);
(void)forwardCallResultPromises_.erase(promise);
return;
}
YRLOG_WARN("no request {} is waiting for forward callresult ack, ignore it.", request->messageid());
}
litebus::Future<SharedStreamMsg> InstanceProxy::SendForwardCall(const litebus::AID &aid,
const std::string &callerTenantID,
const SharedStreamMsg &request)
{
ASSERT_FS(request->has_callreq());
const auto &callReq = request->callreq();
if (callerTenantID.empty()) {
request->set_messageid(callReq.requestid());
} else {
request->set_messageid(callerTenantID + callReq.requestid());
}
auto promiseIter = forwardCallPromises_.find(callReq.requestid());
std::shared_ptr<litebus::Promise<SharedStreamMsg>> promise;
bool firstForwardCall = true;
if (promiseIter == forwardCallPromises_.end()) {
promise = std::make_shared<litebus::Promise<SharedStreamMsg>>();
forwardCallPromises_[callReq.requestid()] = promise;
} else {
promise = promiseIter->second;
firstForwardCall = false;
}
if (firstForwardCall || callReq.createoptions().find(YR_ROUTE_KEY) == callReq.createoptions().end()) {
(void)Send(aid, "ForwardCall", request->SerializeAsString());
YRLOG_INFO("{}|{}|(forwardInvoke)send forward call", callReq.traceid(), callReq.requestid());
return promise->GetFuture();
}
YRLOG_INFO("{}|{}|low reliable instance retry to forward call", callReq.traceid(), callReq.requestid());
auto newAid = litebus::AID(REQUEST_ROUTER_NAME, aid.Url());
internal::RouteCallRequest routeReq;
routeReq.mutable_req()->CopyFrom(*request);
routeReq.set_instanceid(aid.Name());
promise = forwardCallPromises_[callReq.requestid()];
(void)Send(newAid, "ForwardCall", routeReq.SerializeAsString());
return promise->GetFuture();
}
litebus::Future<SharedStreamMsg> InstanceProxy::SendForwardCallResult(const litebus::AID &aid,
const SharedStreamMsg &request)
{
ASSERT_FS(request->has_callresultreq());
auto promise = std::make_shared<litebus::Promise<SharedStreamMsg>>();
request->set_messageid(request->callresultreq().requestid());
forwardCallResultPromises_[request->messageid()] = promise;
YRLOG_INFO("{}|(forwardCallResult)send forward callresult to {}", request->callresultreq().requestid(),
aid.HashString());
Send(aid, "ForwardCallResult", request->SerializeAsString());
return promise->GetFuture();
}
void InstanceProxy::NotifyChanged(const std::string &instanceID, const std::shared_ptr<InstanceRouterInfo> &info)
{
ASSERT_IF_NULL(info);
if (instanceID == instanceID_) {
ASSERT_FS(selfDispatcher_);
selfDispatcher_->UpdateInfo(info);
if (!info->isLocal && info->isReady) {
YRLOG_INFO("instance {} is already migrated to {}, instance proxy on local should be terminate", instanceID,
info->proxyID);
RETURN_IF_NULL(observer_);
observer_->NotifyMigratingRequest(instanceID_);
}
return;
}
if (remoteDispatchers_.find(instanceID) == remoteDispatchers_.end()) {
auto dispatcher = std::make_shared<RequestDispatcher>(instanceID, false, "", shared_from_this(), perf_);
remoteDispatchers_[instanceID] = dispatcher;
}
remoteDispatchers_[instanceID]->UpdateInfo(info);
}
void InstanceProxy::Fatal(const std::string &instanceID, const std::string &message, const StatusCode &code)
{
if (instanceID == instanceID_) {
ASSERT_FS(selfDispatcher_);
selfDispatcher_->Fatal(message, code);
return;
}
if (remoteDispatchers_.find(instanceID) != remoteDispatchers_.end() && remoteDispatchers_[instanceID]) {
remoteDispatchers_[instanceID]->Fatal(message, code);
}
}
std::list<litebus::Future<SharedStreamMsg>> InstanceProxy::GetOnRespFuture()
{
ASSERT_FS(selfDispatcher_);
return selfDispatcher_->GetOnRespFuture();
}
void InstanceProxy::DeleteRemoteDispatcher(const std::string &instanceID)
{
if (remoteDispatchers_.find(instanceID) != remoteDispatchers_.end() && remoteDispatchers_[instanceID]) {
remoteDispatchers_[instanceID]->Fatal(INSTANCE_EXIT_MESSAGE, StatusCode::ERR_INSTANCE_EXITED);
}
(void)remoteDispatchers_.erase(instanceID);
}
bool InstanceProxy::Delete()
{
ASSERT_FS(selfDispatcher_);
selfDispatcher_->Fatal(INSTANCE_EXIT_MESSAGE, StatusCode::ERR_INSTANCE_EXITED);
return true;
}
void InstanceProxy::InitDispatcher()
{
selfDispatcher_ = std::make_shared<RequestDispatcher>(instanceID_, true, tenantID_, shared_from_this(), perf_);
}
litebus::Future<SharedStreamMsg> InstanceProxyWrapper::Call(const litebus::AID &to,
const CallerInfo &callerInfo,
const std::string &instanceID,
const SharedStreamMsg &request,
const std::shared_ptr<TimePoint> &time)
{
return litebus::Async(to, &InstanceProxy::Call, callerInfo, instanceID, request, time);
}
litebus::Future<SharedStreamMsg> InstanceProxyWrapper::CallResult(const litebus::AID &to,
const std::string &srcInstanceID,
const std::string &dstInstanceID,
const SharedStreamMsg &request,
const std::shared_ptr<TimePoint> &time)
{
return litebus::Async(to, &InstanceProxy::CallResult, srcInstanceID, dstInstanceID, request, time);
}
litebus::Future<std::string> InstanceProxyWrapper::GetTenantID(const litebus::AID &to)
{
return litebus::Async(to, &InstanceProxy::GetTenantID);
}
}