* Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Description: Define the basic unit of the shared memory in the server side.
* Support allocate and free shared memory.
*/
#include "datasystem/common/shared_memory/shm_unit.h"
#include <utility>
#include <securec.h>
#ifdef WITH_TESTS
#include "datasystem/common/inject/inject_point.h"
#endif
#include "datasystem/common/metrics/kv_metrics.h"
#include "datasystem/common/shared_memory/allocator.h"
#include "datasystem/common/string_intern/string_ref.h"
#include "datasystem/common/util/format.h"
#include "datasystem/common/util/status_helper.h"
#include "datasystem/utils/status.h"
namespace datasystem {
ShmUnit::ShmUnit() : ShmUnitInfo()
{
METRIC_INC(metrics::KvMetricId::WORKER_SHM_UNIT_CREATED_TOTAL);
}
ShmUnit::ShmUnit(int fd, uint64_t mmapSz) : ShmUnitInfo(fd, mmapSz)
{
METRIC_INC(metrics::KvMetricId::WORKER_SHM_UNIT_CREATED_TOTAL);
}
ShmUnit::ShmUnit(ShmKey id, ShmView shmView, void *pointer) : ShmUnitInfo(std::move(id), shmView, pointer)
{
METRIC_INC(metrics::KvMetricId::WORKER_SHM_UNIT_CREATED_TOTAL);
}
ShmUnit::~ShmUnit()
{
VLOG(1) << "Release memory of " << id << " Size: " << size << " Off: " << offset;
Status rc = this->FreeMemory();
if (rc.IsError()) {
LOG(WARNING) << "Destructor for a ShmUnit failed to free memory.";
}
METRIC_INC(metrics::KvMetricId::WORKER_SHM_UNIT_DESTROYED_TOTAL);
}
std::string ShmUnit::GetTenantId()
{
return tenantId_;
}
ShmView ShmUnit::GetShmView()
{
return { fd, mmapSize, offset, size };
}
void ShmUnit::SetHardFreeMemory()
{
needHardFree_ = true;
}
Status ShmUnit::FreeMemory()
{
RETURN_OK_IF_TRUE(pointer == nullptr);
if (shmOwner_) {
shmOwner_.reset();
return Status::OK();
}
VLOG(1) << "[ShmUnit] Arena FreeMemory, Tenant:" << (tenantId_.empty() ? "Default" : tenantId_)
<< ", needHardFree: " << needHardFree_;
#ifdef WITH_TESTS
INJECT_POINT("ShmUnit.FreeMemory", [this]() {
needHardFree_ = true;
return Status::OK();
});
#endif
if (needHardFree_) {
int ret = memset_s(pointer, size, 0, size);
if (ret != EOK) {
LOG(WARNING) << FormatString("[ShmId %s] memset failed, error code: %d.", id, ret);
}
}
return datasystem::memory::Allocator::Instance()->FreeMemory(tenantId_, pointer, serviceType_, cacheType_);
}
Status ShmUnit::AllocateMemory(const std::string &tenantId, uint64_t needSize, bool populate, ServiceType serviceType,
memory::CacheType cacheType)
{
VLOG(1) << "[ShmUnit] AllocateMemory, Tenant: " << (tenantId.empty() ? "Default" : tenantId)
<< ", size: " << needSize << ", cachetype: " << static_cast<int>(cacheType);
serviceType_ = serviceType;
cacheType_ = cacheType;
RETURN_IF_NOT_OK(datasystem::memory::Allocator::Instance()->AllocateMemory(
tenantId, needSize, populate, pointer, fd, offset, mmapSize, numaId, serviceType_, cacheType_));
size = needSize;
tenantId_ = tenantId;
return Status::OK();
}
Status ShmOwner::DistributeMemory(uint64_t shmSize, ShmUnit &shmUnit)
{
uint64_t positionCursor = AllocatePosition(shmSize);
CHECK_FAIL_RETURN_STATUS(positionCursor + shmSize <= size, K_RUNTIME_ERROR,
"Object needs more memory than available.");
shmUnit.size = shmSize;
shmUnit.pointer = reinterpret_cast<void *>(reinterpret_cast<uint64_t>(pointer) + positionCursor);
shmUnit.fd = fd;
shmUnit.offset = offset + positionCursor;
shmUnit.mmapSize = mmapSize;
shmUnit.serviceType_ = serviceType_;
shmUnit.cacheType_ = cacheType_;
shmUnit.tenantId_ = tenantId_;
shmUnit.needHardFree_ = needHardFree_;
shmUnit.shmOwner_ = shared_from_this();
shmUnit.numaId = numaId;
return Status::OK();
}
uint64_t ShmOwner::AllocatePosition(uint64_t shmSize)
{
return cursor_.fetch_add(Align4BitsCeiling(shmSize), std::memory_order_acq_rel);
}
uint64_t Align4BitsCeiling(uint64_t size)
{
const uint64_t alignmentMask = 3;
return (size + alignmentMask) & ~(alignmentMask);
}
}