* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Description: Record meta for a stream.
*/
#include "datasystem/common/stream_cache/stream_meta_shm.h"
#include "datasystem/common/inject/inject_point.h"
#include "datasystem/common/util/format.h"
#include "datasystem/stream/stream_config.h"
namespace datasystem {
Status StreamMetaShm::Init(std::shared_ptr<client::IMmapTableEntry> mmapTableEntry)
{
RETURN_RUNTIME_ERROR_IF_NULL(shmPtr_);
auto *data = shmPtr_;
usage_ = reinterpret_cast<decltype(usage_)>(shmPtr_);
data += sizeof(*(usage_));
CHECK_FAIL_RETURN_STATUS(static_cast<size_t>((data) - (shmPtr_)) <= shmSz_, K_RUNTIME_ERROR,
"Work area size too small");
if (mmapTableEntry != nullptr) {
mmapTableEntry_ = std::move(mmapTableEntry);
}
return Status::OK();
}
Status StreamMetaShm::TryIncUsage(uint64_t size)
{
INJECT_POINT("StreamMetaShm.TryIncUsage");
bool success = false;
uint64_t currUsage = 0;
do {
currUsage = __atomic_load_n(usage_, __ATOMIC_RELAXED);
CHECK_FAIL_RETURN_STATUS(UINT64_MAX - currUsage >= size, K_OUT_OF_RANGE,
"The usage of shared memory reached UINT64_MAX");
uint64_t desiredVal = currUsage + size;
CHECK_FAIL_RETURN_STATUS(desiredVal <= maxStreamSize_, K_OUT_OF_MEMORY,
FormatString("stream: %s, currUsage: %llu, tryIncUsage: %llu, maxStreamSize_: %llu",
streamName_, currUsage, size, maxStreamSize_));
success =
__atomic_compare_exchange_n(usage_, &currUsage, desiredVal, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED);
} while (!success);
VLOG(SC_NORMAL_LOG_LEVEL) << "TryIncUsage for streamName:" << streamName_ << ", size: " << size
<< ", before: " << currUsage << ", after: " << (currUsage + size);
return Status::OK();
}
Status StreamMetaShm::TryDecUsage(uint64_t size)
{
bool success = false;
uint64_t currUsage = 0;
do {
currUsage = __atomic_load_n(usage_, __ATOMIC_RELAXED);
CHECK_FAIL_RETURN_STATUS_PRINT_ERROR(
currUsage >= size, K_RUNTIME_ERROR,
FormatString("[TryDecUsage error] stream: %s, currUsage: %llu, tryDecUsage: %llu", streamName_, currUsage,
size));
uint64_t desiredVal = currUsage - size;
success =
__atomic_compare_exchange_n(usage_, &currUsage, desiredVal, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED);
} while (!success);
VLOG(SC_NORMAL_LOG_LEVEL) << "TryDecUsage for streamName:" << streamName_ << ", size: " << size
<< ", before: " << currUsage << ", after: " << (currUsage - size);
return Status::OK();
}
}