* Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "datasystem/common/shared_memory/arena.h"
#include <algorithm>
#include <cerrno>
#include <cstdint>
#include <limits>
#include <vector>
#include "datasystem/common/shared_memory/arena_group_key.h"
#ifdef __linux__
#include <linux/memfd.h>
#endif
#include <sys/mman.h>
#include <sys/syscall.h>
#include <unistd.h>
#include "datasystem/common/constants.h"
#include "datasystem/common/flags/flags.h"
#ifdef WITH_TESTS
#include "datasystem/common/inject/inject_point.h"
#endif
#include "datasystem/common/log/trace.h"
#include "datasystem/common/perf/perf_manager.h"
#include "datasystem/common/shared_memory/allocator.h"
#include "datasystem/common/shared_memory/jemalloc.h"
#include "datasystem/common/shared_memory/mmap/flexible_mmap.h"
#include "datasystem/common/shared_memory/mmap/disk_mmap.h"
#include "datasystem/common/shared_memory/mmap/mem_mmap.h"
#include "datasystem/common/util/format.h"
#include "datasystem/common/util/status_helper.h"
#include "datasystem/common/util/strings_util.h"
#include "datasystem/common/util/validator.h"
#include "datasystem/common/rdma/fast_transport_base.h"
DS_DEFINE_uint32(
arena_per_tenant, 16,
"The arena count for each tenant. Multiple arenas can improve the performance of share memory allocation for "
"the first time, but each arena will use one more fd. The valid range is 1 to 32.");
DS_DEFINE_validator(arena_per_tenant, &Validator::ValidateArenaPerTenant);
DS_DECLARE_bool(enable_huge_tlb);
DS_DEFINE_uint32(
shared_disk_arena_per_tenant, 8,
"The number of disk cache Arena for each tenant. Multiple arenas can improve the performance of shared "
"disk allocation for "
"the first time, but each arena will use one more fd. The valid range is 0 to 32.");
DS_DEFINE_validator(shared_disk_arena_per_tenant, &Validator::ValidateSharedDiskArenaPerTenant);
DS_DECLARE_string(shared_disk_directory);
DS_DECLARE_bool(enable_fallocate);
namespace datasystem {
namespace memory {
thread_local bool g_NeedFallocate = false;
thread_local uint64_t g_RequestSize = 0;
thread_local bool g_FakeAllocate = false;
inline uintptr_t AlignCeiling(uintptr_t addr, uintptr_t alignment)
{
return (addr + alignment - 1) & ~(alignment - 1);
}
ArenaGroup::ArenaGroup(std::vector<std::shared_ptr<Arena>> arenas, uint64_t maxSize, CacheType cacheType)
: arenas_(std::move(arenas)), maxSize_(maxSize), cacheType_(cacheType)
{
for (const auto &arena : arenas_) {
fdPointerTable_.emplace(arena->GetFd(), std::make_pair(arena->GetPointer(), arena->GetMmapSize()));
}
}
ArenaGroup::~ArenaGroup()
{
LOG(INFO) << "ArenaGroup destructor.";
LOG_IF_ERROR(DestroyAll(), "Destroy arenas failed");
}
Status ArenaGroup::AllocateMemory(uint64_t size, bool populate, uint64_t &realSize, void *&pointer, int &fd,
ptrdiff_t &offset, uint64_t &mmapSize, ServiceType type)
{
uint8_t numaId = std::numeric_limits<uint8_t>::max();
return AllocateMemory(size, populate, realSize, pointer, fd, offset, mmapSize, numaId, type);
}
Status ArenaGroup::AllocateMemory(uint64_t size, bool populate, uint64_t &realSize, void *&pointer, int &fd,
ptrdiff_t &offset, uint64_t &mmapSize, uint8_t &numaId, ServiceType type)
{
CHECK_FAIL_RETURN_STATUS(!destroyed_.load(), StatusCode::K_RUNTIME_ERROR, "ArenaGroup destroyed");
CHECK_FAIL_RETURN_STATUS(!arenas_.empty(), StatusCode::K_RUNTIME_ERROR, "arenas_ is empty");
if (size > maxSize_) {
RETURN_STATUS(StatusCode::K_OUT_OF_MEMORY, "Upper to the size limit");
}
if (memoryUsage_.fetch_add(size, std::memory_order_relaxed) > (maxSize_ - size)) {
(void)memoryUsage_.fetch_sub(size, std::memory_order_relaxed);
RETURN_STATUS(StatusCode::K_OUT_OF_MEMORY, "Upper to the size limit");
}
auto index = nextId_.fetch_add(1) % arenas_.size();
auto func = [&index, this](bool populate, uint64_t &size, void *&pointer, int &fd, ptrdiff_t &offset,
uint64_t &mmapSize) {
RETURN_IF_NOT_OK(AllocateMemoryImpl(true, populate, size, index, pointer));
return arenas_[index]->GetAllocInfo(pointer, fd, offset, mmapSize);
};
realSize = size;
Status status = func(populate, realSize, pointer, fd, offset, mmapSize);
if (status.IsError()) {
(void)memoryUsage_.fetch_sub(size, std::memory_order_relaxed);
const int logFreq = 100;
LOG_EVERY_N(ERROR, logFreq) << "total size limit:" << Allocator::Instance()->GetMaxMemorySize(type, cacheType_)
<< ", total physical memory usage:"
<< Allocator::Instance()->GetTotalPhysicalMemoryUsage(cacheType_)
<< ", total real memory usage:"
<< Allocator::Instance()->GetTotalRealMemoryUsage(type, cacheType_)
<< ", total memory usage:"
<< Allocator::Instance()->GetTotalMemoryUsage(type, cacheType_)
<< ", try alloc size:" << size << ", cacheType:" << static_cast<int>(cacheType_);
return status;
}
{
std::lock_guard<std::shared_timed_mutex> l(allocatedMutex_);
allocatedTable_.emplace(pointer, AllocRecord{ size, realSize, index });
}
auto arena = arenas_[index];
arena->AddRealMemoryUsage(realSize);
(void)realMemoryUsage_.fetch_add(realSize, std::memory_order_relaxed);
if (IsUbNumaAffinityEnabled()) {
Status queryRc = arena->QueryNumaId(pointer, numaId);
if (queryRc.IsError()) {
LOG(WARNING) << "QueryNumaId failed for pointer " << pointer << ": " << queryRc.ToString();
numaId = std::numeric_limits<uint8_t>::max();
}
} else {
numaId = std::numeric_limits<uint8_t>::max();
}
VLOG(1) << "[Allocator] Arena " << arena->GetArenaId() << " allocate require size: " << size
<< ", real size: " << realSize << ", offset: " << offset;
return Status::OK();
}
Status ArenaGroup::AllocateMemoryImpl(bool retry, bool populate, uint64_t &size, size_t &index, void *&pointer)
{
CHECK_FAIL_RETURN_STATUS(!arenas_.empty(), StatusCode::K_RUNTIME_ERROR, "arenas_ is empty");
CHECK_FAIL_RETURN_STATUS(arenas_.size() > index, StatusCode::K_RUNTIME_ERROR, "index is bigger than arenas_ size");
using clock = std::chrono::steady_clock;
g_NeedFallocate = populate;
g_RequestSize = size;
auto arenaCount = arenas_.size();
auto arena = arenas_[index];
auto arenaId = arena->GetArenaId();
auto beginTime = clock::now();
auto status = Jemalloc::Allocate(arenaId, size, pointer);
if (status.GetCode() == StatusCode::K_OUT_OF_MEMORY) {
auto it = CACHE_TYPE_STR.find(cacheType_);
std::string errHint = "UnknowType";
if (it != CACHE_TYPE_STR.end()) {
errHint = it->second;
}
std::string errorMsg = FormatString("%s no space in arena: %d", errHint, arenaId);
status = Status(StatusCode::K_OUT_OF_MEMORY, errorMsg);
}
auto elapsed = std::chrono::duration_cast<std::chrono::nanoseconds>(clock::now() - beginTime).count();
PerfPoint::RecordElapsed(status.IsOk() ? PerfKey::JEMALLOC_ALLOCATE_SUCCESS : PerfKey::JEMALLOC_ALLOCATE_FAIL,
elapsed);
if (!retry || arenaCount == 1 || status.GetCode() != StatusCode::K_OUT_OF_MEMORY) {
return status;
}
VLOG(1) << "alloc from arena " << arena->GetArenaId()
<< " failed, fallocate size:" << arena->GetPhysicalMemoryUsage()
<< ", real allocate size:" << arena->GetRealMemoryUsage();
for (auto idx = index + 1; idx < index + arenaCount; idx++) {
auto nextIndex = idx % arenaCount;
arena = arenas_[nextIndex];
if (arena->GetPhysicalMemoryUsage() > arena->GetRealMemoryUsage()
&& arena->GetPhysicalMemoryUsage() - arena->GetRealMemoryUsage() > size) {
VLOG(1) << "try alloc from arena " << arena->GetArenaId()
<< ", fallocate size:" << arena->GetPhysicalMemoryUsage()
<< ", real allocate size:" << arena->GetRealMemoryUsage();
if (AllocateMemoryImpl(false, populate, size, nextIndex, pointer).IsOk()) {
index = nextIndex;
nextId_ = nextIndex + 1;
return Status::OK();
}
}
}
return status;
}
Status ArenaGroup::FreeMemory(void *pointer, uint64_t &bytesFree, uint64_t &bytesRealFree, uint64_t usedSize)
{
CHECK_FAIL_RETURN_STATUS(!destroyed_.load(), StatusCode::K_RUNTIME_ERROR, "ArenaGroup destroyed");
CHECK_FAIL_RETURN_STATUS(!arenas_.empty(), StatusCode::K_RUNTIME_ERROR, "arenas_ is empty");
RETURN_RUNTIME_ERROR_IF_NULL(pointer);
PerfPoint point(PerfKey::ALLOCATE_FREE_WAIT_LOCK);
std::lock_guard<std::shared_timed_mutex> l(allocatedMutex_);
point.RecordAndReset(PerfKey::ALLOCATE_FREE_HOLD_LOCK);
auto iter = allocatedTable_.find(pointer);
if (iter == allocatedTable_.end()) {
RETURN_STATUS(StatusCode::K_RUNTIME_ERROR, "Allocated pointer not found.");
}
const auto &record = iter->second;
CHECK_FAIL_RETURN_STATUS(record.index < arenas_.size(), StatusCode::K_RUNTIME_ERROR,
"invalid index " + std::to_string(record.index));
const auto &arena = arenas_[record.index];
CHECK_FAIL_RETURN_STATUS(record.size <= usedSize, K_RUNTIME_ERROR,
"The size of memory freed is larger than the memory used by the given service type");
Jemalloc::Free(arena->GetArenaId(), pointer);
bytesFree = record.size;
bytesRealFree = record.realSize;
arena->SubRealMemoryUsage(bytesRealFree);
allocatedTable_.erase(pointer);
(void)memoryUsage_.fetch_sub(bytesFree, std::memory_order_relaxed);
(void)realMemoryUsage_.fetch_sub(bytesRealFree, std::memory_order_relaxed);
VLOG(1) << "[Allocator] Arena " << arena->GetArenaId() << " free require size: " << bytesFree
<< ", real size: " << bytesRealFree;
return Status::OK();
}
Status ArenaGroup::FreeMemory(void *pointer)
{
uint64_t bytesFree;
uint64_t bytesRealFree;
return FreeMemory(pointer, bytesFree, bytesRealFree, UINT64_MAX);
}
uint64_t ArenaGroup::GetMemoryUsage() const
{
std::shared_lock<std::shared_timed_mutex> l(allocatedMutex_);
return memoryUsage_;
}
Status ArenaGroup::FdToPointer(int fd, std::pair<void *, uint64_t> &ptrMmapSz) const
{
CHECK_FAIL_RETURN_STATUS(!destroyed_.load(), StatusCode::K_RUNTIME_ERROR, "ArenaGroup destroyed");
auto iter = fdPointerTable_.find(fd);
if (iter == fdPointerTable_.end()) {
RETURN_STATUS(StatusCode::K_NOT_FOUND, FormatString("fd [%d] not found", fd));
}
ptrMmapSz = iter->second;
return Status::OK();
}
void ArenaGroup::GetArenaGroupStat(ShmMemStat &stat)
{
if (destroyed_) {
return;
}
stat.maxMemoryLimit = maxSize_;
stat.memoryUsage = memoryUsage_.load(std::memory_order_relaxed);
stat.numOfFds = fdPointerTable_.size();
stat.realMemoryUsage = 0;
stat.physicalMemoryUsage = 0;
for (const auto &arena : arenas_) {
stat.realMemoryUsage += arena->GetRealMemoryUsage();
stat.physicalMemoryUsage += arena->GetPhysicalMemoryUsage();
}
std::shared_lock<std::shared_timed_mutex> l(allocatedMutex_);
stat.numOfAllocated = allocatedTable_.size();
}
Status ArenaGroup::DestroyAll()
{
auto arenaIds = GetArenaIds();
if (destroyed_.exchange(true)) {
return Status::OK();
}
auto manager = Allocator::Instance()->GetArenaManager();
RETURN_RUNTIME_ERROR_IF_NULL(manager);
return manager->DestroyArenas(arenaIds, [this] {
Status lastRc;
for (const auto &arena : arenas_) {
Status rc = arena->DestroyArena();
lastRc = rc.IsError() ? rc : lastRc;
}
arenas_.clear();
return lastRc;
});
}
std::vector<uint32_t> ArenaGroup::GetArenaIds() const
{
if (destroyed_) {
return {};
}
std::vector<uint32_t> ids;
for (const auto &arena : arenas_) {
ids.emplace_back(arena->GetArenaId());
}
return ids;
}
std::vector<int> ArenaGroup::GetAllFds()
{
std::vector<int> fds;
for (const auto &arean : arenas_) {
fds.emplace_back(arean->GetFd());
}
return fds;
}
ArenaManager::ArenaManager(bool populate, bool scaling, ssize_t decayMs)
: numArenas_(0), populate_(populate), scaling_(scaling), decayMs_(decayMs), destroyed_(false)
{
arenas_.resize(ARENAS_INIT_SIZE);
Jemalloc::Init(&ArenaManager::AllocHook, &ArenaManager::DestroyHook, &ArenaManager::CommitHook);
handleExpiredTenantThread_ = std::make_unique<ThreadPool>(handleExpiredTenantThreadNum_, 0, "TenantExpired");
if (populate || FLAGS_enable_huge_tlb || NeedRegisterWholeArena()) {
FLAGS_arena_per_tenant = 1;
FLAGS_enable_fallocate = false;
}
auto arenaNum = FLAGS_arena_per_tenant;
if (!FLAGS_shared_disk_directory.empty()) {
arenaNum += FLAGS_shared_disk_arena_per_tenant;
}
maxTenantSize_ = ARENAS_INIT_SIZE / arenaNum - 1;
}
void ArenaManager::Init()
{
int runningNum = handleExpiredTenantThread_->GetRunningTasksNum();
if (runningNum >= handleExpiredTenantThreadNum_) {
return;
}
StartCheckExpiredTenantResource();
}
Status ArenaManager::Init(CacheType type, AllocatorFuncRegister funcRegister)
{
std::lock_guard<std::shared_timed_mutex> lck(registerMutex_);
auto rc = funcRegisterList_.try_emplace(type, funcRegister);
if (!rc.second) {
RETURN_STATUS(K_DUPLICATED, FormatString("Already register allocator func for type: %d", (int)type));
}
Init();
return Status::OK();
}
uint64_t ArenaManager::RoundUpToNextMultiple(uint64_t size)
{
if (size % HUGE_PAGE_SIZE == 0) {
return size;
} else {
return size + (HUGE_PAGE_SIZE - (size % HUGE_PAGE_SIZE));
}
}
Status ArenaManager::CreateArenaGroup(CacheType type, uint64_t maxSize, std::shared_ptr<ArenaGroup> &arenaGroup)
{
CHECK_FAIL_RETURN_STATUS_PRINT_ERROR(!destroyed_, K_RUNTIME_ERROR, "ArenaManager already destroyed");
std::vector<uint64_t> arenaInds;
auto rate = 0.78;
CHECK_FAIL_RETURN_STATUS(
static_cast<uint64_t>(static_cast<long double>(std::numeric_limits<uint64_t>::max()) * rate) > maxSize,
K_RUNTIME_ERROR, "mmapSize overflow.");
auto fakeAllocateSize = maxSize;
if (populate_ || IsFastTransportEnabled() || IsRemoteH2DEnabled() || FLAGS_enable_huge_tlb) {
rate = 1;
auto overhead = 0.8;
fakeAllocateSize = static_cast<uint64_t>(overhead * maxSize);
}
uint64_t mmapSize = maxSize / rate;
if (FLAGS_enable_huge_tlb) {
mmapSize = RoundUpToNextMultiple(mmapSize);
}
{
std::lock_guard<std::shared_timed_mutex> l(mutex_);
std::vector<std::shared_ptr<Arena>> arenas;
auto arenasNum = type == CacheType::MEMORY ? FLAGS_arena_per_tenant : FLAGS_shared_disk_arena_per_tenant;
arenasNum = (type == CacheType::DEV_DEVICE || type == CacheType::DEV_HOST || type == CacheType::UB_TRANSPORT)
? 1
: arenasNum;
arenas.reserve(arenasNum);
for (uint32_t i = 0; i < arenasNum; i++) {
uint32_t arenaInd = 0;
void *handler = nullptr;
RETURN_IF_NOT_OK(Jemalloc::CreateArena(decayMs_, arenaInd, handler));
if (ARENAS_INIT_SIZE <= arenaInd) {
LOG_IF_ERROR(Jemalloc::DestroyArena(arenaInd), "Too many arena create, destroy arena failed");
RETURN_STATUS(StatusCode::K_RUNTIME_ERROR,
FormatString("Too many arena created! areneInd: %d, arena init size: %d", arenaInd,
ARENAS_INIT_SIZE));
}
std::shared_ptr<Arena> arena;
if (arenas_[arenaInd] == nullptr) {
arena = std::make_shared<Arena>(arenaInd, handler, populate_, scaling_, mmapSize, type);
arenas_[arenaInd] = arena;
} else {
RETURN_STATUS(StatusCode::K_RUNTIME_ERROR,
FormatString("arena %d reuse, but exists in ArenaManager.", arenaInd));
}
AllocatorFuncRegister regFunc;
{
std::shared_lock<std::shared_timed_mutex> l(registerMutex_);
auto it = funcRegisterList_.find(type);
if (it != funcRegisterList_.end()) {
regFunc = it->second;
}
}
Status rc = arena->Init(regFunc);
if (rc.IsError()) {
LOG(ERROR) << "Init arena " << arenaInd << " failed: " << rc;
arenas_[arenaInd] = nullptr;
return rc;
}
VLOG(1) << "Create arena:" << arenaInd << ", fd:" << arena->GetFd() << ", mmap size:" << mmapSize;
arenaInds.emplace_back(arenaInd);
(void)numArenas_.fetch_add(1, std::memory_order_relaxed);
arenas.emplace_back(std::move(arena));
}
arenaGroup = std::make_shared<ArenaGroup>(std::move(arenas), maxSize, type);
};
FakeAllocate(type, arenaInds, fakeAllocateSize);
return Status::OK();
}
void ArenaManager::FakeAllocate(CacheType type, const std::vector<uint64_t> &arenaInds, const uint64_t maxSize)
{
g_FakeAllocate = true;
for (const auto &arenaInd : arenaInds) {
uint64_t hookSize = maxSize;
void *pointer;
auto status = Jemalloc::Allocate(arenaInd, hookSize, pointer);
if (status.GetCode() == StatusCode::K_OUT_OF_MEMORY) {
auto it = CACHE_TYPE_STR.find(type);
std::string errHint = "UnknowType";
if (it != CACHE_TYPE_STR.end()) {
errHint = it->second;
}
std::string errorMsg = FormatString("%s no space in arena: %d", errHint, arenaInd);
status = Status(StatusCode::K_OUT_OF_MEMORY, errorMsg);
}
if (status.IsOk()) {
LOG(WARNING) << "fake allocate should not success, pointer is not nullptr";
Jemalloc::Free(arenaInd, pointer);
}
}
g_FakeAllocate = false;
}
Status ArenaManager::CreateArenaGroup(const std::string &tenantId, CacheType type, uint64_t maxSize,
std::shared_ptr<ArenaGroup> &arenaGroup)
{
RETURN_IF_NOT_OK(CreateArenaGroup(type, maxSize, arenaGroup));
{
std::lock_guard<std::shared_timed_mutex> l(tenantMutex_);
tenantArenas_[{ tenantId, type }] = arenaGroup;
}
return Status::OK();
}
Status ArenaManager::GetArenaGroup(const ArenaGroupKey &key, std::shared_ptr<ArenaGroup> &arenaGroup)
{
CHECK_FAIL_RETURN_STATUS_PRINT_ERROR(!destroyed_, K_RUNTIME_ERROR, "ArenaManager already destroyed");
std::shared_lock<std::shared_timed_mutex> l(tenantMutex_);
auto iter = tenantArenas_.find(key);
if (iter != tenantArenas_.end()) {
RETURN_RUNTIME_ERROR_IF_NULL(iter->second);
arenaGroup = iter->second;
return Status::OK();
}
RETURN_STATUS(StatusCode::K_RUNTIME_ERROR, FormatString("No arena found for the tenant %s.", key.tenantId));
}
Status ArenaManager::GetOrCreateArenaGroup(const ArenaGroupKey &key, uint64_t maxSize,
std::shared_ptr<ArenaGroup> &arenaGroup)
{
auto rc = GetArenaGroup(key, arenaGroup);
if (rc.IsOk()) {
return Status::OK();
}
{
std::lock_guard<std::shared_timed_mutex> l(tenantMutex_);
auto iter = tenantArenas_.find(key);
if (iter != tenantArenas_.end()) {
arenaGroup = iter->second;
return Status::OK();
}
CHECK_FAIL_RETURN_STATUS_PRINT_ERROR(
tenantArenas_.size() < maxTenantSize_, K_RUNTIME_ERROR,
FormatString("create tenant failed, the maximum tenant size %llu has been reached", maxTenantSize_));
RETURN_IF_NOT_OK(CreateArenaGroup(key.type, maxSize, arenaGroup));
tenantArenas_[key] = arenaGroup;
}
return Status::OK();
}
Status ArenaManager::DestroyArenaGroup(const std::shared_ptr<ArenaGroup> &arenaGroup)
{
CHECK_FAIL_RETURN_STATUS_PRINT_ERROR(!destroyed_, K_RUNTIME_ERROR, "ArenaManager already destroyed");
return arenaGroup->DestroyAll();
}
Status ArenaManager::DestroyArenaGroup(const ArenaGroupKey &key)
{
CHECK_FAIL_RETURN_STATUS_PRINT_ERROR(!destroyed_, K_RUNTIME_ERROR, "ArenaManager already destroyed");
std::shared_ptr<ArenaGroup> arenaGroup;
RETURN_IF_NOT_OK(GetArenaGroup(key, arenaGroup));
if (arenaGroup->GetMemoryUsage() == 0) {
RETURN_IF_NOT_OK(DestroyArenaGroup(arenaGroup));
{
std::lock_guard<std::shared_timed_mutex> l(tenantMutex_);
(void)tenantArenas_.erase(key);
}
return Status::OK();
}
RETURN_STATUS(
StatusCode::K_NOT_READY,
FormatString("The tenant %s's arena is unable to destroy as allocated spaces still exist.", key.tenantId));
}
Status ArenaManager::DestroyAllArenaGroup()
{
if (destroyed_.exchange(true)) {
return Status::OK();
}
Status lastRc;
std::shared_lock<std::shared_timed_mutex> l(tenantMutex_);
for (auto &kv : tenantArenas_) {
auto &arenaGroup = kv.second;
Status rc = arenaGroup->DestroyAll();
lastRc = rc.IsError() ? rc : lastRc;
}
return lastRc;
}
void ArenaManager::SetReleaseableTenant(const ArenaGroupKey &key)
{
auto releaseHandler = [key]() {
LOG(INFO) << FormatString("[TENANT RELEASER]Destroy arena group for %s", key.tenantId);
LOG_IF_ERROR(datasystem::memory::Allocator::Instance()->DestroyArenaGroup(key),
FormatString("Destroy arena group for %s failed", key.tenantId));
};
PreReleaseTenantResourceInfo preReleaseTenantResourceInfo;
preReleaseTenantResourceInfo.key = key;
preReleaseTenantResourceInfo.fds = GetAllFdsByKey(key);
preReleaseTenantResourceInfo.releaseHandler = releaseHandler;
std::lock_guard<std::shared_timed_mutex> lck(preReleaseTenantResourceInfoMapMutex_);
VLOG(1) << FormatString("[TENANT RELEASER]Tenant[%s] is releaseable", key.tenantId);
preReleaseTenantResourceInfoMap_.emplace(key, std::move(preReleaseTenantResourceInfo));
}
void ArenaManager::CancelExpiredTenantTimer(const ArenaGroupKey &key)
{
{
std::lock_guard<std::shared_timed_mutex> lck(preReleaseTenantResourceInfoMapMutex_);
if (preReleaseTenantResourceInfoMap_.erase(key) != 0) {
VLOG(1) << FormatString("[TENANT RELEASER]The release process for tenant[%s] has been canceled",
key.tenantId);
};
}
}
ArenaManager::~ArenaManager()
{
LOG(INFO) << "ArenaManager destructor.";
LOG_IF_ERROR(DestroyAllArenaGroup(), "DestroyAllArenaGroup failed!");
destroyWaitPost_.Set();
}
void *ArenaManager::AllocHook(size_t size, size_t alignment, unsigned arenaInd, bool *zero, bool *commit)
{
auto manager = Allocator::Instance()->GetArenaManager();
std::shared_lock<std::shared_timed_mutex> l(manager->mutex_);
return manager->arenas_[arenaInd]->AllocHook(size, alignment, zero, commit);
}
void ArenaManager::DestroyHook(void *addr, size_t size, bool committed, unsigned arenaInd)
{
auto manager = Allocator::Instance()->GetArenaManager();
auto threadSafePtr = manager->arenas_[arenaInd];
threadSafePtr->DestroyHook(addr, size, committed);
}
bool ArenaManager::CommitHook(bool commit, void *addr, size_t size, size_t offset, size_t length, unsigned arenaInd)
{
auto manager = Allocator::Instance()->GetArenaManager();
auto threadSafePtr = manager->arenas_[arenaInd];
return threadSafePtr->CommitHook(commit, addr, size, offset, length);
}
std::vector<int> ArenaManager::GetAllFdsByKey(const ArenaGroupKey &key)
{
std::shared_lock<std::shared_timed_mutex> l(tenantMutex_);
auto iter = tenantArenas_.find(key);
if (iter != tenantArenas_.end()) {
auto fds = iter->second->GetAllFds();
return fds;
}
return {};
}
void ArenaManager::StartCheckExpiredTenantResource()
{
static int intervalMs = 5'000;
handleExpiredTenantThread_->Execute([this]() {
TraceGuard traceGuard = Trace::Instance().SetTraceUUID();
LOG(INFO) << "[TENANT RELEASER]Start check expired tenant resource thread";
while (!destroyed_) {
{
std::lock_guard<std::shared_timed_mutex> lck(preReleaseTenantResourceInfoMapMutex_);
std::vector<PreReleaseTenantResourceInfo> expiredTenantsInfo;
for (auto &i : preReleaseTenantResourceInfoMap_) {
if (i.second.IsExpired()) {
VLOG(1) << FormatString("[TENANT RELEASER]Tenant[%s] is expired", i.first.tenantId);
expiredTenantsInfo.emplace_back(i.second);
}
}
UpdateExpiredTenant(expiredTenantsInfo);
}
destroyWaitPost_.WaitFor(intervalMs);
}
});
}
void ArenaManager::UpdateExpiredTenant(const std::vector<PreReleaseTenantResourceInfo> &expiredTenantsInfo)
{
for (const auto &expiredTenantInfo : expiredTenantsInfo) {
if (Allocator::Instance()->CheckIfAllFdReleasedHandler(expiredTenantInfo.fds)) {
VLOG(1) << FormatString("[TENANT RELEASER]Tenant[%s] is ready to release", expiredTenantInfo.key.tenantId);
if (expiredTenantInfo.releaseHandler != nullptr) {
expiredTenantInfo.releaseHandler();
}
preReleaseTenantResourceInfoMap_.erase(expiredTenantInfo.key);
}
}
}
std::set<int> ArenaManager::GetAllExpiredFds()
{
std::shared_lock<std::shared_timed_mutex> lck(preReleaseTenantResourceInfoMapMutex_);
std::set<int> expiredFds;
for (auto iter : preReleaseTenantResourceInfoMap_) {
if (iter.second.IsExpired()) {
expiredFds.insert(iter.second.fds.begin(), iter.second.fds.end());
}
}
return expiredFds;
}
Status ArenaManager::CheckWorkerFdTenant(const std::string &tenantId, const std::vector<int> &workerFds)
{
std::set<int> allFds;
std::shared_ptr<ArenaGroup> arenaGroup;
std::vector<ArenaGroupKey> keys = {
{ DEFAULT_TENANT_ID, CacheType::MEMORY },
{ DEFAULT_TENANT_ID, CacheType::DISK },
{ tenantId, CacheType::MEMORY },
{ tenantId, CacheType::DISK }
};
for (const auto &key : keys) {
Status rc = GetArenaGroup(key, arenaGroup);
if (rc.IsError()) {
continue;
}
auto fds = arenaGroup->GetAllFds();
allFds.insert(fds.begin(), fds.end());
}
for (auto fd : workerFds) {
if (allFds.find(fd) == allFds.end()) {
return Status(K_NOT_AUTHORIZED, FormatString("workerFd %d is not belong to tenant[%s]", fd, tenantId));
}
}
return Status::OK();
}
uint64_t Arena::pageSize_ = getpagesize();
Arena::Arena(uint32_t arenaId, void *handler, bool populate, bool scaling, uint64_t mmapSize, CacheType cacheType)
: arenaId_(arenaId),
handler_(handler),
populate_(populate),
scaling_(scaling),
destroyed_(false),
mmapSize_(mmapSize),
cacheType_(cacheType)
{
}
Status Arena::Init(AllocatorFuncRegister funcRegister)
{
switch (cacheType_) {
case CacheType::MEMORY:
mmap_ = std::make_unique<MemMmap>();
break;
case CacheType::DISK:
mmap_ = std::make_unique<DiskMmap>();
break;
case CacheType::DEV_DEVICE:
case CacheType::DEV_HOST:
case CacheType::UB_TRANSPORT:
mmap_ = std::make_unique<FlexibleMmap>(funcRegister);
break;
default:
return Status(StatusCode::K_INVALID,
FormatString("Unkowned cache type: %d", static_cast<int32_t>(cacheType_)));
}
RETURN_IF_NOT_OK(mmap_->Initialize(mmapSize_, populate_, FLAGS_enable_huge_tlb));
if ((cacheType_ == CacheType::MEMORY || cacheType_ == CacheType::UB_TRANSPORT) && IsUbNumaAffinityEnabled()) {
Status rc = BuildNumaRangeTable();
if (rc.IsError()) {
LOG(WARNING) << "BuildNumaRangeTable failed for arena " << arenaId_
<< ", affinity optimization will be skipped: " << rc.ToString();
}
}
return Status::OK();
}
Status Arena::DestroyArena()
{
if (!destroyed_.exchange(true)) {
VLOG(1) << "destroy arena:" << arenaId_;
return Jemalloc::DestroyArena(arenaId_);
}
return Status::OK();
}
Arena::~Arena() noexcept
{
LOG_IF_ERROR(DestroyArena(), "DestroyArena failed");
if (handler_ != nullptr) {
Jemalloc::DestroyHandler(handler_);
handler_ = nullptr;
}
}
void *Arena::AllocHook(size_t size, size_t alignment, bool *zero, bool *commit)
{
if (g_FakeAllocate) {
*commit = false;
}
bool needCommit = *commit && !*zero;
if (g_FakeAllocate && !firstFakeHook_.load()) {
LOG(INFO) << "fake hook before, no need to hook again";
return nullptr;
}
if (needCommit && !AddPhysicalMemeoryUsage(size)) {
return nullptr;
}
void *addr = mmap_->Allocate(size, alignment, zero, commit);
if (addr == nullptr) {
if (needCommit) {
(void)SubPhysicalMemeoryUsage(size);
}
return addr;
}
if (g_FakeAllocate) {
firstFakeHook_ = false;
}
if (needCommit) {
if (CommitImpl(addr, 0, size)) {
(void)SubPhysicalMemeoryUsage(size);
}
}
return addr;
}
void Arena::DestroyHook(void *addr, size_t size, bool committed)
{
(void)size;
(void)committed;
int fd;
ptrdiff_t offset;
uint64_t mmapSize;
Status rc = GetAllocInfo(addr, fd, offset, mmapSize);
if (rc.IsError()) {
mmap_->Destroy();
}
}
bool Arena::CommitHook(bool commit, void *addr, size_t size, size_t offset, size_t length)
{
VLOG(1) << "CommitHook arena " << arenaId_;
(void)size;
if (commit) {
if (g_FakeAllocate) {
LOG(INFO) << "fake allocate no need to commit";
return true;
}
if (!AddPhysicalMemeoryUsage(length)) {
return true;
}
bool ret = CommitImpl(addr, offset, length);
if (ret) {
(void)SubPhysicalMemeoryUsage(length);
}
return ret;
}
return DecommitImpl(addr, offset, length);
}
bool Arena::CommitImpl(void *addr, size_t offset, size_t length)
{
PerfPoint point(PerfKey::JEMALLOC_COMMIT);
if (!g_NeedFallocate && length > Arena::pageSize_) {
if (length == g_RequestSize) {
return false;
} else if (length > g_RequestSize) {
offset += g_RequestSize;
length -= g_RequestSize;
} else {
LOG(WARNING) << "Request size is: " << g_RequestSize << " but length is " << length
<< ", it should not happen";
}
}
return mmap_->Commit(addr, offset, length);
}
bool Arena::DecommitImpl(void *addr, size_t offset, size_t length)
{
PerfPoint point(PerfKey::JEMALLOC_DECOMMIT);
if (!scaling_ || destroyed_.load()) {
return false;
}
if (mmap_->Decommit(addr, offset, length)) {
return true;
}
(void)SubPhysicalMemeoryUsage(length);
return false;
}
Status Arena::GetAllocInfo(void *pointer, int &fd, ptrdiff_t &offset, uint64_t &mmapSize)
{
PerfPoint point(PerfKey::ALLOCATE_GET_MAP);
auto allocation = mmap_->GetAllocation(pointer);
if (!allocation) {
fd = -1;
mmapSize = 0;
offset = 0;
RETURN_STATUS(StatusCode::K_RUNTIME_ERROR, "Pointer addr not Range");
}
fd = allocation->fd;
offset = allocation->offset;
mmapSize = allocation->mmapSize;
return Status::OK();
}
bool Arena::AddPhysicalMemeoryUsage(uint64_t size)
{
if (Allocator::Instance()->AddTotalPhysicalMemoryUsage(cacheType_, size)) {
physicalMemoryStats_.AddRealUsageNoCheck(size);
return true;
}
return false;
}
void Arena::SubPhysicalMemeoryUsage(uint64_t size)
{
#ifdef WITH_TESTS
INJECT_POINT("arena.decommit", [&size](uint64_t num) { size += num; });
#endif
Allocator::Instance()->SubTotalPhysicalMemoryUsage(cacheType_, size);
if (!physicalMemoryStats_.SubRealUsageCAS(size)) {
LOG(WARNING) << "Arena " << arenaId_ << " sub physical memory usage failed";
}
}
void Arena::AddRealMemoryUsage(uint64_t realSize)
{
(void)realMemoryUsage_.fetch_add(realSize, std::memory_order_relaxed);
(void)allocatedCount_.fetch_add(1, std::memory_order_relaxed);
}
void Arena::SubRealMemoryUsage(uint64_t realSize)
{
(void)realMemoryUsage_.fetch_sub(realSize, std::memory_order_relaxed);
(void)allocatedCount_.fetch_sub(1, std::memory_order_relaxed);
}
void Arena::LogNumaRangeTable() const
{
constexpr size_t maxPrintEntries = 16;
auto baseAddr = reinterpret_cast<uintptr_t>(mmap_->Pointer());
auto arenaEndAddr = baseAddr + mmapSize_;
size_t index = 0;
for (auto it = numaRanges_.cbegin(); it != numaRanges_.cend(); ++it, ++index) {
if (index >= maxPrintEntries) {
LOG(INFO) << "... (" << (numaRanges_.size() - maxPrintEntries) << " more entries)";
break;
}
auto addr = it->first;
auto numaId = it->second;
auto next = std::next(it);
auto nextAddr = (next == numaRanges_.cend()) ? arenaEndAddr : next->first;
auto rangeSize = nextAddr - addr;
auto offset = addr - baseAddr;
LOG(INFO) << " [index=" << index << ", offset=" << offset << ", size=" << rangeSize
<< ", numaId=" << static_cast<uint32_t>(numaId) << "]";
}
}
Status Arena::BuildNumaRangeTable()
{
auto *base = static_cast<uint8_t *>(mmap_->Pointer());
CHECK_FAIL_RETURN_STATUS(base != nullptr, K_RUNTIME_ERROR, "Arena mmap base pointer is null");
CHECK_FAIL_RETURN_STATUS(pageSize_ > 0, K_RUNTIME_ERROR, "Arena page size is invalid");
constexpr size_t pageBatch = 1024;
constexpr size_t bytesPerNumaSample = 1 << 20;
std::map<uintptr_t, uint8_t> ranges;
std::vector<void *> pages;
std::vector<int> status;
pages.reserve(pageBatch);
status.reserve(pageBatch);
auto flushBatch = [&ranges, &pages, &status]() -> Status {
RETURN_OK_IF_TRUE(pages.empty());
if (syscall(SYS_move_pages, 0, pages.size(), pages.data(), nullptr, status.data(), 0) != 0) {
RETURN_STATUS(K_RUNTIME_ERROR, FormatString("move_pages failed: %s", StrErr(errno)));
}
for (size_t i = 0; i < pages.size(); ++i) {
if (status[i] < 0) {
RETURN_STATUS(K_RUNTIME_ERROR,
FormatString("move_pages returned page status %d for address %p", status[i], pages[i]));
}
auto addr = reinterpret_cast<uintptr_t>(pages[i]);
auto numaId = static_cast<uint8_t>(status[i]);
auto last = ranges.empty() ? ranges.end() : std::prev(ranges.end());
if (last == ranges.end() || last->second != numaId) {
ranges.emplace(addr, numaId);
}
}
pages.clear();
status.clear();
return Status::OK();
};
Timer timer;
for (uint64_t off = 0; off < mmapSize_; off += bytesPerNumaSample) {
pages.emplace_back(base + off);
status.emplace_back(-1);
if (pages.size() == pageBatch) {
RETURN_IF_NOT_OK(flushBatch());
}
}
RETURN_IF_NOT_OK(flushBatch());
CHECK_FAIL_RETURN_STATUS(!ranges.empty(), K_RUNTIME_ERROR, "NUMA range table is empty");
auto elapsed = timer.ElapsedMilliSecond();
numaRanges_ = std::move(ranges);
LOG(INFO) << "NUMA range table for arena " << arenaId_ << ", total " << numaRanges_.size() << " entries, cost "
<< elapsed << "ms";
LogNumaRangeTable();
return Status::OK();
}
Status Arena::QueryNumaId(void *ptr, uint8_t &numaId) const
{
CHECK_FAIL_RETURN_STATUS(ptr != nullptr, K_INVALID, "Input pointer is null");
CHECK_FAIL_RETURN_STATUS(!numaRanges_.empty(), K_NOT_FOUND, "NUMA range table is empty");
auto addr = reinterpret_cast<uintptr_t>(ptr);
auto it = numaRanges_.upper_bound(addr);
CHECK_FAIL_RETURN_STATUS(it != numaRanges_.begin(), K_NOT_FOUND,
FormatString("Address %p is before the first NUMA range", ptr));
--it;
numaId = it->second;
return Status::OK();
}
}
}