* Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef DATASYSTEM_COMMON_SHARED_MEMORY_ARENA_H
#define DATASYSTEM_COMMON_SHARED_MEMORY_ARENA_H
#include <atomic>
#include <cstdint>
#include <map>
#include <memory>
#include <set>
#include <shared_mutex>
#include <unordered_map>
#include <vector>
#include "datasystem/common/log/log.h"
#ifdef WITH_TESTS
#include "datasystem/common/inject/inject_point.h"
#endif
#include "datasystem/common/shared_memory/arena_group_key.h"
#include "datasystem/common/shared_memory/memstat.h"
#include "datasystem/common/shared_memory/mmap/immap.h"
#include "datasystem/common/shared_memory/resource_pool.h"
#include "datasystem/common/util/format.h"
#include "datasystem/common/util/status_helper.h"
#include "datasystem/common/util/thread_pool.h"
#include "datasystem/common/util/timer.h"
#include "datasystem/common/util/wait_post.h"
namespace datasystem {
enum class ServiceType { OBJECT, STREAM };
namespace memory {
constexpr uint32_t TENANT_RESOURCE_RELEASE_DELAY_MS = 600'000;
class Arena;
class ArenaGroup {
public:
ArenaGroup(std::vector<std::shared_ptr<Arena>> arenas, uint64_t maxSize, CacheType cacheType);
ArenaGroup(const ArenaGroup &) = delete;
ArenaGroup &operator=(const ArenaGroup &) = delete;
~ArenaGroup();
* @brief Allocate memory from shared memory.
* @param[in] size Memory size to be allocated in bytes.
* @param[in] populate Indicate need populate or not.
* @param[out] realSize Memory size real allocated in bytes.
* @param[out] pointer Pointer to the allocated shared memory.
* @param[out] fd File descriptor of the allocated shared memory segments.
* @param[out] offset Offset from the base of the shared memory mmap.
* @param[out] mmapSize Total size of shared memory segments.
* @param[in] type The type of datasystem service for this allocation request.
* @return K_OK if success, the error otherwise.
* K_OUT_OF_MEMORY: no enough memory can be allocated.
* K_RUNTIME_ERROR: arena has not been initialized, pointer is null, or
* memory info can not be found, it should not happen.
*/
Status AllocateMemory(uint64_t size, bool populate, uint64_t &realSize, void *&pointer, int &fd, ptrdiff_t &offset,
uint64_t &mmapSize, ServiceType type);
Status AllocateMemory(uint64_t size, bool populate, uint64_t &realSize, void *&pointer, int &fd, ptrdiff_t &offset,
uint64_t &mmapSize, uint8_t &numaId, ServiceType type);
* @brief Free memory from shared memory.
* @param[in] pointer Pointer to be free.
* @param[out] bytesFree The free bytes.
* @param[out] bytesRealFree The real free bytes.
* @param[in] usedSize The size of memory used by the service type.
* @return K_OK if success, the error otherwise.
* K_RUNTIME_ERROR: arena has not been initialized, pointer is null or pointer is not found.
*/
Status FreeMemory(void *pointer, uint64_t &bytesFree, uint64_t &bytesRealFree, uint64_t usedSize);
* @brief Free memory from shared memory.
* @param[in] pointer Pointer to be free.
* @return K_OK if success, the error otherwise.
* K_RUNTIME_ERROR: arena has not been initialized, pointer is null or pointer is not found.
*/
Status FreeMemory(void *pointer);
* @brief Get allocated size of arena.
* @return Allocated size in bytes.
*/
uint64_t GetMemoryUsage() const;
* @brief Obtains the storeFd related pointer.
* @param[in] fd Fd of related pooled memory.
* @param[out] ptrMmapSz Pointer and mmapSz.
* @return K_OK if success, the error otherwise.
* K_RUNTIME_ERROR: fd can not be found.
*/
Status FdToPointer(int fd, std::pair<void *, uint64_t> &ptrMmapSz) const;
* @brief Get Arena statistics view.
* @param[out] shmMemStat Shared memory statistics.
*/
void GetArenaGroupStat(ShmMemStat &shmMemStat);
* @brief Get the arena id list.
* @return The arena id list.
*/
std::vector<uint32_t> GetArenaIds() const;
* @brief Destroy all arena in this group.
* @return Status of this call.
*/
Status DestroyAll();
* @brief Get all the fds of this arena group.
* @return fds
*/
std::vector<int> GetAllFds();
private:
* @brief Allocate memory from the specified arena, and retry allocate from other arenas if OOM.
* @param[in] retry Retry when OOM.
* @param[in] populate Indicate need populate or not.
* @param[in, out] size Read needed memory size in bytes and write actual allocated size.
* @param[in, out] index The arena index.
* @param[out] pointer Allocated memory pointer.
* @return Status of this call.
*/
Status AllocateMemoryImpl(bool retry, bool populate, uint64_t &size, size_t &index, void *&pointer);
std::vector<std::shared_ptr<Arena>> arenas_;
const uint64_t maxSize_;
CacheType cacheType_;
std::atomic<size_t> nextId_{ 0 };
std::atomic<uint64_t> memoryUsage_{ 0 };
std::atomic<uint64_t> realMemoryUsage_{ 0 };
std::unordered_map<int, std::pair<void *, uint64_t>> fdPointerTable_;
mutable std::shared_timed_mutex allocatedMutex_;
struct AllocRecord {
uint64_t size = 0;
uint64_t realSize = 0;
size_t index = 0;
};
std::unordered_map<void *, AllocRecord> allocatedTable_;
std::atomic<bool> destroyed_{ false };
};
class ArenaManager {
public:
ArenaManager(bool populate, bool scaling, ssize_t decayMs);
* @brief Init ArenaManager.
*/
void Init();
* @brief Init ArenaManager.
* @param[in] The cache type.
* @param[in] funcRegister The register func for allocator.
* @return K_OK if success, the error otherwise.
*/
Status Init(CacheType type, AllocatorFuncRegister funcRegister);
* @brief Create an new arena group to allocate/deallocate shared memory.
* @param[in] type The cache type.
* @param[in] maxSize Shared memory max size in bytes.
* @param[out] arenaGroup New created arena group.
* @return K_OK if success, the error otherwise.
* K_RUNTIME_ERROR: Runtime error because of many things.
*/
Status CreateArenaGroup(CacheType type, uint64_t maxSize, std::shared_ptr<ArenaGroup> &arenaGroup);
* @brief Create an new arena group for the specific tenant to allocate/deallocate shared memory.
* @param[in] tenantId The ID of the specific tenant.
* @param[in] type The cache type.
* @param[in] maxSize Shared memory max size in bytes.
* @param[out] arenaGroup New created arena group.
* @return K_OK if success, the error otherwise.
* K_RUNTIME_ERROR: Runtime error because of many things.
*/
Status CreateArenaGroup(const std::string &tenantId, CacheType type, uint64_t maxSize,
std::shared_ptr<ArenaGroup> &arenaGroup);
* @brief Get the Arena of the specific tenant.
* @param[in] key The arena group key.
* @param[out] arenaGroup The arena group reference.
* @return Status
*/
Status GetArenaGroup(const ArenaGroupKey &key, std::shared_ptr<ArenaGroup> &arenaGroup);
* @brief Get the tenant arena or create an new arena group for the specific tenant to allocate/deallocate shared
* memory.
* @param[in] key The arena group key.
* @param[in] maxSize Shared memory max size in bytes.
* @param[out] arena New created arena.
* @return K_OK if success, the error otherwise.
* K_RUNTIME_ERROR: Runtime error because of many things.
*/
Status GetOrCreateArenaGroup(const ArenaGroupKey &type, uint64_t maxSize, std::shared_ptr<ArenaGroup> &arenaGroup);
* @brief Destroy the arena group
* @param[in] arenaGroup Arena group.
* @return K_OK if success, the error otherwise.
* K_NOT_FOUND: arena can not be found.
*/
Status DestroyArenaGroup(const std::shared_ptr<ArenaGroup> &arenaGroup);
* @brief Destroy the arena group with a delay timer.
* @param[in] key The arena group key.
* @return Status of this call.
*/
Status DestroyArenaGroup(const ArenaGroupKey &key);
* @brief Destroy all arena group.
* @return Status of this call.
*/
Status DestroyAllArenaGroup();
* @brief Get arena count.
* @return Arena count.
*/
uint32_t GetArenaCounts()
{
return numArenas_.load(std::memory_order_relaxed);
}
* @brief pre allocate virtual memory for each arena.
* @param[in] arenaInds arena index
* @param[in] maxSize maxSize of shared memory.
*/
void FakeAllocate(CacheType type, const std::vector<uint64_t> &arenaInds, const uint64_t maxSize);
~ArenaManager();
* @brief Set releaseable tenant.
* @param[in] key The arena group key.
*/
void SetReleaseableTenant(const ArenaGroupKey &key);
* @brief Cancel expired tenant timer.
* @param[in] key The arena group key.
*/
void CancelExpiredTenantTimer(const ArenaGroupKey &key);
* @brief Get all expired fds.
* @return All expired fds.
*/
std::set<int> GetAllExpiredFds();
* @brief Check if the tenantId can access input workerFds or not.
* @param[in] tenantId The tenant ID from request.
* @param[in] workerFds The workerFds from request.
* @return K_OK if success, the error otherwise.
* K_NOT_AUTHORIZED: There is workerFd that cannot be accessed by this tenant.
*/
Status CheckWorkerFdTenant(const std::string &tenantId, const std::vector<int> &workerFds);
private:
* @brief To align the size to an integer multiple of a page.
* @param[in] size mmap size .
* @return aligned size
*/
uint64_t RoundUpToNextMultiple(uint64_t size);
* @brief The alloc hook is invoked whenever the arena needs additional memory from the OS, e.g. when
* all local mapped memory are in use, and we need more memory to satisfy the current request
* (which is typical done through mmap).
* @param[in] size Always a multiple of the page size in bytes.
* @param[in] alignment Always a power of two at least as large as the page size.
* @param[in] arenaInd Arena index.
* @param[out] zero Indicate whether the extent is zeroed.
* @param[out] commit Indicate whether the extent is committed.
* @return A pointer to size bytes of mapped memory on behalf of arena.
*/
static void *AllocHook(size_t size, size_t alignment, unsigned arenaInd, bool *zero, bool *commit);
* @brief Destroys an extent at given addr and size with committed/decommited memory as indicated,
* on behalf of arena arenaInd. This function may be called to destroy retained extents
* during arena destruction (see arena.<i>.destroy).
* @param[in] addr Deallocate address.
* @param[in] size Deallocate size in bytes.
* @param[in] committed Commit state.
* @param[in] arenaInd Arena index.
*/
static void DestroyHook(void *addr, size_t size, bool committed, unsigned arenaInd);
* @brief Commits or decommits any physical memory to back pages within an extent at given addr and size at
* offset bytes, extending for length on behalf of arena arena_ind, returning false upon success.
* @param[in] commit True indicates commit physical memory, False indicates decommits physical memory.
* @param[in] addr The commit address.
* @param[in] size Address size.
* @param[in] offset Offset bytes of given addr.
* @param[in] length extending length.
* @param[in] arenaInd Arena index.
* @return True indicates commit or decommit failed; false means success.
*/
static bool CommitHook(bool commit, void *addr, size_t size, size_t offset, size_t length, unsigned arenaInd);
template <typename F>
Status DestroyArenas(const std::vector<uint32_t> &arenaIds, F &&f)
{
std::lock_guard<std::shared_timed_mutex> l(mutex_);
LOG_IF_ERROR(f(), "Destroy Jemalloc arena failed");
for (const auto arenaId : arenaIds) {
if (arenas_[arenaId] == nullptr) {
LOG(ERROR) << "Arena id not found: " << arenaId;
continue;
}
arenas_[arenaId] = nullptr;
(void)numArenas_.fetch_sub(1, std::memory_order_relaxed);
}
return Status::OK();
}
friend ArenaGroup;
std::atomic<uint32_t> numArenas_;
std::vector<std::shared_ptr<Arena>> arenas_;
struct PreReleaseTenantResourceInfo {
bool IsExpired()
{
auto timeoutDurationMs = TENANT_RESOURCE_RELEASE_DELAY_MS;
#ifdef WITH_TESTS
INJECT_POINT("PreReleaseTenantResourceInfo.IsExpired", [&timeoutDurationMs](int timeMs) {
timeoutDurationMs = timeMs;
return true;
});
#endif
return timer.ElapsedMilliSecond() > timeoutDurationMs;
}
Timer timer;
ArenaGroupKey key;
std::vector<int> fds;
std::function<void()> releaseHandler = nullptr;
};
* @brief Get all fds by key.
* @param[in] key The arena group key.
*/
std::vector<int> GetAllFdsByKey(const ArenaGroupKey &key);
* @brief Start check expired tenant resource thread.
*/
void StartCheckExpiredTenantResource();
* @brief Update expired tenant.
* @param[in] expiredTenantsInfo The expired tenants' detail.
*/
void UpdateExpiredTenant(const std::vector<PreReleaseTenantResourceInfo> &expiredTenantsInfo);
std::shared_timed_mutex preReleaseTenantResourceInfoMapMutex_;
std::unordered_map<ArenaGroupKey, PreReleaseTenantResourceInfo> preReleaseTenantResourceInfoMap_;
std::unique_ptr<ThreadPool> handleExpiredTenantThread_;
const int handleExpiredTenantThreadNum_ = 1;
std::unordered_map<ArenaGroupKey, std::shared_ptr<ArenaGroup>> tenantArenas_;
std::shared_timed_mutex tenantMutex_;
std::shared_timed_mutex mutex_;
const uint64_t ARENAS_INIT_SIZE = 4096;
const static int HUGE_PAGE_SIZE = 2 * 1024 * 1024;
bool populate_{ false };
bool scaling_{ false };
ssize_t decayMs_;
std::atomic<bool> destroyed_{ false };
WaitPost destroyWaitPost_;
uint64_t maxTenantSize_ = 0;
std::shared_timed_mutex registerMutex_;
std::unordered_map<CacheType, AllocatorFuncRegister> funcRegisterList_;
};
class Arena {
public:
Arena(uint32_t arenaId, void *handler, bool populate, bool scaling, uint64_t mmapSize,
CacheType cacheType = CacheType::MEMORY);
~Arena() noexcept;
* @brief Arena initialize.
* @param[in] funcRegister The register func for allocator.
* @return K_OK if success, the error otherwise.
*/
Status Init(AllocatorFuncRegister funcRegister);
Status BuildNumaRangeTable();
Status QueryNumaId(void *ptr, uint8_t &numaId) const;
* @brief Get mmap entry info via pointer, it will traverses 'mmapEntryTable_' to find the
* MmapEntry which the pointer belongs (via pointer address), and return the info
* such as fd, offset and mmap size.
* @param[in] pointer Pointer to search for the MmapEntry.
* @param[out] fd Shared memory file fd.
* @param[out] offset Offset from the base of the mmap.
* @param[out] mmapSize Shared memory mmap size in bytes.
* @return Status of the call.
*/
Status GetAllocInfo(void *pointer, int &fd, ptrdiff_t &offset, uint64_t &mmapSize);
* @brief Destroy Jemalloc arena.
* @return Status of this call.
*/
Status DestroyArena();
* @brief Return arena id.
* @return Arena ID.
*/
uint32_t GetArenaId() const
{
return arenaId_;
}
* @brief Return the mmap fd.
* @return Fd.
*/
uint32_t GetFd() const
{
return mmap_->Fd();
}
* @brief Get the actual physical memory usage.
* @return Physical memory usage in this arena.
*/
uint64_t GetPhysicalMemoryUsage()
{
return physicalMemoryStats_.RealUsage();
}
* @brief Increase the real memory usage.
* @param[in] realSize The memory size.
*/
void AddRealMemoryUsage(uint64_t realSize);
* @brief Decrease the real memory usage.
* @param[in] realSize The memory size.
*/
void SubRealMemoryUsage(uint64_t realSize);
* @brief Get the real memory usage not include the jemalloc cache.
* @return Real memory usage in this arena.
*/
uint64_t GetRealMemoryUsage()
{
return realMemoryUsage_;
}
* @brief Get the mmap pointer.
* @return The mmap pointer
*/
void *GetPointer()
{
return mmap_->Pointer();
}
* @brief Get the mmap memory size.
* @return The mmap memory size.
*/
uint64_t GetMmapSize()
{
return mmapSize_;
}
private:
void LogNumaRangeTable() const;
* @brief Jemalloc extent allocate hook, The alloc hook is invoked whenever the arena needs additional
* memory from the OS, e.g. when all local mapped memory are in use, and we need more memory to
* satisfy the current request (which is typical done through mmap).
* @param[in] size Always a multiple of the page size in bytes.
* @param[in] alignment Always a power of two at least as large as the page size.
* @param[out] zero Indicate whether the extent is zeroed.
* @param[out] commit Indicate whether the extent is committed.
* @return A pointer to size bytes of mapped memory on behalf of arena.
*/
void *AllocHook(size_t size, size_t alignment, bool *zero, bool *commit);
* @brief Destroys an Jemalloc extent at given addr and size with committed/decommited memory as indicated,
* returning false upon success. If the function returns true, this indicates opt-out from deallocation;
* the memory mapping associated with the extent remains mapped, in the same commit state, and
* available for future use, in which case it will be automatically retained for later reuse.
* @param[in] addr Deallocate address.
* @param[in] size Deallocate size in bytes.
* @param[in] committed Commit state.
* @return False upon success; True means remains mapped, in the same commit state, and available for future use.
*/
void DestroyHook(void *addr, size_t size, bool committed);
* @brief Commits or decommits any physical memory to back pages within an extent at given addr and size at
* offset bytes, extending for length on behalf of arena arena_ind, returning false upon success.
* @param[in] commit True indicates commit physical memory, False indicates decommits physical memory.
* @param[in] addr The commit address.
* @param[in] size Address size.
* @param[in] offset Offset bytes of given addr.
* @param[in] length extending length.
* @return True indicates commit or decommit failed; false means success.
*/
bool CommitHook(bool commit, void *addr, size_t size, size_t offset, size_t length);
* @brief The Commit implement.
* @param[in] addr The commit address.
* @param[in] size Address size.
* @param[in] offset Offset bytes of given addr.
* @param[in] length extending length.
* @return True indicates insufficient physical memory to satisfy the request; false means commit success.
*/
bool CommitImpl(void *addr, size_t offset, size_t length);
* @brief The Decommit implement.
* @param[in] addr The decommit address.
* @param[in] size Address size.
* @param[in] offset Offset bytes of given addr.
* @param[in] length extending length.
* @return True indicates remains committed and available for future use; false means decommit success.
*/
bool DecommitImpl(void *addr, size_t offset, size_t length);
* @brief Increase the actual physical memory usage.
* @param[in] size The memory size.
* @return True indicates memory limit is not exceeded.
*/
bool AddPhysicalMemeoryUsage(uint64_t size);
* @brief Decrease the actual physical memory usage.
* @param[in] size The memory size.
*/
void SubPhysicalMemeoryUsage(uint64_t size);
friend ArenaManager;
static uint64_t pageSize_;
uint32_t arenaId_;
void *handler_;
bool populate_;
bool scaling_;
std::atomic<bool> destroyed_;
uint64_t mmapSize_;
std::atomic<uint64_t> realMemoryUsage_{ 0 };
std::atomic<uint64_t> allocatedCount_{ 0 };
ResourcePool physicalMemoryStats_;
std::atomic<bool> firstFakeHook_{ true };
CacheType cacheType_;
std::unique_ptr<IMmap> mmap_;
std::map<uintptr_t, uint8_t> numaRanges_;
};
}
}
#endif