* Copyright (c) Huawei Technologies Co., Ltd. 2026. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Description: Limit URMA fallback TCP payloads to avoid congesting the shared RPC channel.
*/
#ifndef DATASYSTEM_COMMON_OBJECT_CACHE_URMA_FALLBACK_TCP_LIMITER_H
#define DATASYSTEM_COMMON_OBJECT_CACHE_URMA_FALLBACK_TCP_LIMITER_H
#include <atomic>
#include <cstdint>
#include <string>
#include "datasystem/utils/status.h"
namespace datasystem {
class UrmaFallbackTcpLimiter {
public:
UrmaFallbackTcpLimiter() = delete;
~UrmaFallbackTcpLimiter() = delete;
static constexpr uint64_t kMaxPendingBytes = 10 * 1024 * 1024;
static constexpr uint64_t kMaxSinglePayloadBytes = 1024 * 1024;
class Ticket {
public:
Ticket() = default;
Ticket(const Ticket &) = delete;
Ticket &operator=(const Ticket &) = delete;
Ticket(Ticket &&other) noexcept;
Ticket &operator=(Ticket &&other) noexcept;
~Ticket();
bool IsActive() const;
private:
friend class UrmaFallbackTcpLimiter;
Ticket(std::atomic<uint64_t> *pendingBytes, uint64_t bytes) noexcept;
void Release();
std::atomic<uint64_t> *pendingBytes_{ nullptr };
uint64_t bytes_{ 0 };
};
static Status TryAcquireProcessScope(uint64_t bytes, const Status &transportStatus, const std::string &direction,
Ticket &ticket, bool checkSinglePayloadLimit = true);
static Status TryAcquire(std::atomic<uint64_t> &pendingBytes, uint64_t bytes, const Status &transportStatus,
const std::string &direction, Ticket &ticket, bool checkSinglePayloadLimit = true);
private:
static Status BuildRejectStatus(const Status &transportStatus, const std::string &reason);
static std::atomic<uint64_t> processPendingBytes_;
};
}
#endif