* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FUNCTION_PROXY_BUSPROXY_INSTANCE_PROXY_REQUEST_DISPATCHER_H
#define FUNCTION_PROXY_BUSPROXY_INSTANCE_PROXY_REQUEST_DISPATCHER_H
#include <memory>
#include <string>
#include "function_proxy/busproxy/instance_proxy/call_cache.h"
#include "function_proxy/busproxy/instance_proxy/perf.h"
#include "function_proxy/busproxy/instance_proxy/forward_interface.h"
#include "function_proxy/common/iam/internal_iam.h"
#include "function_proxy/common/posix_client/data_plane_client/data_interface_posix_client.h"
#include "function_proxy/common/posix_client/data_plane_client/data_interface_client_manager_proxy.h"
#include "function_proxy/common/observer/data_plane_observer/data_plane_observer.h"
namespace functionsystem::busproxy {
SharedStreamMsg CreateCallResponse(const common::ErrorCode &code, const std::string &message,
const std::string &messageID);
struct InstanceRouterInfo {
bool isLocal = false;
bool isReady = false;
std::string runtimeID;
std::string proxyID;
std::string proxyGrpcAddress;
litebus::AID remote;
std::string tenantID;
std::string function;
std::shared_ptr<DataInterfacePosixClient> localClient = nullptr;
resources::TrafficReportType trafficReportType = resources::TrafficReportType::None;
};
struct CallerInfo {
std::string instanceID;
std::string tenantID;
};
class RequestDispatcher {
public:
RequestDispatcher(const std::string &instanceID, bool isLocal, const std::string &tenantID,
const std::shared_ptr<ForwardInterface> &remote, const std::shared_ptr<Perf> &perf)
: instanceID_(instanceID),
tenantID_(tenantID),
local_(isLocal),
callCache_(std::make_shared<CallCache>()),
remoteClient_(remote),
perf_(perf)
{
}
~RequestDispatcher() = default;
litebus::Future<SharedStreamMsg> Call(const SharedStreamMsg &request,
const CallerInfo &callerInfo);
litebus::Future<SharedStreamMsg> CallResult(const SharedStreamMsg &request);
void OnCall(const SharedStreamMsg &, const std::string &, const std::string &);
void OnCallResult(const SharedStreamMsg &, const std::string &, common::ErrorCode callResultCode);
void UpdateInfo(const std::shared_ptr<InstanceRouterInfo> &info);
void Fatal(const std::string &, const StatusCode &);
void Reject(const std::string &, const StatusCode &);
std::list<litebus::Future<SharedStreamMsg>> GetOnRespFuture();
Status AuthorizeInvoke(const std::string &callerTenantID);
std::string GetTenantID()
{
return tenantID_;
}
inline void UpdateRemoteAID(const litebus::AID &aid)
{
remoteAid_ = aid;
local_ = false;
}
static void BindDataInterfaceClientManager(const std::shared_ptr<DataInterfaceClientManagerProxy> &clientManager)
{
clientManager_ = clientManager;
}
static void BindInternalIAM(const std::shared_ptr<functionsystem::function_proxy::InternalIAM> &internalIam)
{
internalIam_ = internalIam;
}
static void BindObserver(const std::shared_ptr<functionsystem::function_proxy::DataPlaneObserver> &observer)
{
observer_ = observer;
}
private:
void TriggerCall(const std::string &requestID);
void ResponseAllMessage();
inline static std::shared_ptr<DataInterfaceClientManagerProxy> clientManager_ { nullptr };
inline static std::shared_ptr<functionsystem::function_proxy::InternalIAM> internalIam_{ nullptr };
inline static std::shared_ptr<functionsystem::function_proxy::DataPlaneObserver> observer_ { nullptr };
void ReportCallTimesMetrics();
void ReportCallLatency(const std::string &requestID, common::ErrorCode errCode);
void ReportTrafficMetrics(bool busy, const size_t &size);
void SendNotify(const std::basic_string<char> &req, const std::shared_ptr<CallRequestContext> &context) const;
std::string instanceID_;
std::string runtimeID_;
std::string proxyID_;
std::string tenantID_;
std::string function_;
litebus::AID remoteAid_;
bool local_ { false };
bool isFatal_ { false };
bool isReject_ { false };
bool isReady_ { false };
bool used_ { false };
std::string fatalMsg_;
StatusCode fatalCode_ {StatusCode::SUCCESS};
std::shared_ptr<CallCache> callCache_ { nullptr };
std::weak_ptr<ForwardInterface> remoteClient_;
std::shared_ptr<DataInterfacePosixClient> dataInterfaceClient_ { nullptr };
std::shared_ptr<Perf> perf_ { nullptr };
std::unordered_set<std::string> verifiedCallerTenantIDs_{};
resources::TrafficReportType trafficReportType_ { resources::TrafficReportType::None };
int callTimes_ = 0;
int failedCallTimes_ = 0;
std::map<std::string, std::chrono::system_clock::time_point> localStartCallTimeMap_;
bool currentTrafficIdleState_ { false };
};
}
#endif