* Copyright (c) Huawei Technologies Co., Ltd. 2025-2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#ifndef BOOST_SS_SLICETABLEMANAGER_H
#define BOOST_SS_SLICETABLEMANAGER_H
#include <cstdint>
#include <memory>
#include <stack>
#include <vector>
#include "access_recorder.h"
#include "binary/fresh_binary.h"
#include "binary/key_value.h"
#include "blob_store/blob_store.h"
#include "blob_store_snapshot_operator.h"
#include "common/bss_metric.h"
#include "compaction/slice_compaction_trigger.h"
#include "compaction/slice_compactor.h"
#include "flushing_bucket_group.h"
#include "include/bss_err.h"
#include "include/config.h"
#include "memory/evict_manager.h"
#include "slice_table/bucket_group_manager.h"
#include "slice_table/slice/raw_data_slice.h"
#include "snapshot/file_store_snapshot_operator.h"
#include "snapshot/slice_table_snapshot.h"
namespace ock {
namespace bss {
class SliceTable : public std::enable_shared_from_this<SliceTable> {
public:
~SliceTable()
{
mBoostNativeMetric = nullptr;
if (mNeedReleaseBlockCache) {
RETURN_AS_NULLPTR(mConfig);
BlockCacheManager::Instance()->DeleteBlockCache(mConfig->GetTaskSlotFlag());
}
LOG_INFO("Delete sliceTableManager object success.");
}
* Initialize slice table
* @param config configuration
* @param fileCache file cache.
* @param memManager memory manager.
* @return if success return BSS_OK, else return others.
*/
BResult Initialize(const ConfigRef &config, const FileCacheManagerRef &fileCache, const MemManagerRef &memManager,
const StateFilterManagerRef &stateFilterManager);
* Exist slice table.
*/
void Exit();
* Open slice table.
*/
BResult Open();
* Close slice table.
*/
void Close();
* Try to evict slice to LMS file for current db.
* @param addedSize already added to slice size.
* @param isSync sync execute evict task
* @param isForce force to execute evict
* @param minSize evict file size
* @return if successfully returns BSS_OK, otherwise returns other
*/
BResult TryCurrentDBEvict(int64_t addedSize, bool isSync, bool isForce, uint32_t minSize = UINT32_MAX);
* Try to evict slice to LMS file for current slot.
* @param addedSize already added to slice size.
* @param isSync sync execute evict task
* @param isForce force to execute evict
* @param minSize evict file size
* @return if successfully returns BSS_OK, otherwise returns other
*/
BResult TryCurrentSlotEvict(int64_t addedSize, bool isSync, bool isForce, uint32_t minSize = UINT32_MAX);
* Try to compact one slice chain.
* @param sliceIndexContext slice index context, include slice chain info.
* @param compactCompletedNotify callback.
*/
BResult TryCompact(const SliceIndexContextRef &sliceIndexContext,
const CompactCompletedNotify &compactCompletedNotify);
* Obtain list by key.
* @param key key
* @param result BinaryValue list
* @return if success return BSS_OK, else return others.
*/
inline BResult GetList(const Key &key, std::deque<Value> &result, std::vector<SectionsReadContextRef> &readMetas)
{
return InternalGetList(key, result, readMetas);
}
* Obtain the value by using the keys from slice table.
* @param Key key, it is SglKey or DualKey.
* @param value value, nullptr if it is not exist.
* @return Returns true if it exists or has already been deleted, false otherwise.
*/
inline bool Get(const Key &key, Value &value)
{
mAccessRecorder->Record();
auto sliceChain = mSliceBucketIndex->GetLogicalSliceChain(key);
if (UNLIKELY(sliceChain == nullptr)) {
return false;
}
if (UNLIKELY(sliceChain->IsEmpty() && !sliceChain->HasFilePage())) {
AddSliceMiss();
AddSliceReadCount();
return false;
}
AddSliceReadCount();
auto self = shared_from_this();
auto getFromBlobFunc = [self](uint64_t blobId, uint32_t keyHashCode, uint64_t seqId,
Value &originalValue) -> BResult {
return self->GetValueFromBlobStore(blobId, keyHashCode, seqId, originalValue);
};
auto ioRes = sliceChain->Get(key, value, getFromBlobFunc, mBoostNativeMetric);
AddSliceListHitMiss(ioRes);
return ioRes > 0 ? true : false;
}
inline uint64_t GetTick()
{
return mAccessRecorder->AccessCount();
}
inline int64_t GetMemHighMark()
{
return mEvictManager->GetMemHighMark();
}
inline void SetMemHighMark(int64_t highMark)
{
mEvictManager->SetMemHighMark(highMark);
}
* Add kv pairs to slice chain.
* @param curSliceIndexContext the slice chain to be added
* @param dataList the kv pairs to be added.
* @param version fresh table version
* @return the data size of the currently added slice.
*/
int32_t AddSlice(const SliceIndexContextRef &curSliceIndexContext,
std::vector<std::pair<SliceKey, Value>> &dataList, uint64_t version);
BResult AddSlice(const SliceIndexContextRef &curSliceIndexContext, RawDataSlice &rawDataSlice, uint32_t &addSize,
bool &forceEvict);
BResult WriteValueToBlobStore(FreshValueNodePtr &curVal, uint32_t keyHash, uint16_t stateId, uint64_t &blobId);
BResult GetValueFromBlobStore(uint64_t blobId, uint32_t keyGroup, uint64_t seqId, Value &value);
* Iterator all, for savepoint, with kv-separate.
* @param keyFilter key filter
* @param stateId state id
* @param blobValueTransformFunc kv-separate conversion to normal kv func
* @return iterator of all kv.
*/
KeyValueIteratorRef EntryIterator(const KeyFilter &keyFilter, uint16_t stateId,
BlobValueTransformFunc &blobValueTransformFunc);
* Get iterator for prefix, with kv-separate.
* @param prefixKey prefix key.
* @param blobValueTransformFunc kv-separate conversion to normal kv func
* @return iterator of having the same prefix.
*/
KeyValueIteratorRef PrefixIterator(const Key &prefixKey, BlobValueTransformFunc &blobValueTransformFunc);
* Iterator all, for savepoint.
* @param keyFilter key filter
* @param stateId state id
* @return iterator of all kv.
*/
KeyValueIteratorRef EntryIterator(const KeyFilter &keyFilter, uint16_t stateId);
* Get iterator for prefix.
* @param prefixKey prefix key.
* @return iterator of having the same prefix.
*/
KeyValueIteratorRef PrefixIterator(const Key &prefixKey);
* Obtain memory manager.
* @return memory manager instance.
*/
inline MemManagerRef GetMemManager()
{
return mMemManager;
}
* Obtain slice bucket index
* @return slice bucket index instance.
*/
inline SliceBucketIndexRef GetSliceBucketIndex()
{
return mSliceBucketIndex;
}
* Obtain bucket group manager
* @return bucket group manager instance.
*/
inline BucketGroupManagerRef GetBucketGroupManager()
{
return mBucketGroupManager;
}
inline StateFilterManagerRef GetStateFilterManager()
{
return mStateFilterManager;
}
inline FullSortEvictorRef GetFullSortEvictor()
{
return mEvictManager->GetEvictorHandle();
}
* For snapshot, prepare file store snapshot.
* @param operatorId operator id.
* @param snapshotId snapshot id.
* @return return operator instance.
*/
FileStoreSnapshotOperatorRef PrepareFileStoreSnapshot(uint64_t operatorId, uint64_t snapshotId);
* For snapshot, prepare blob store snapshot.
* @param operatorId operator id.
* @param snapshotId snapshot id.
* @return return operator instance.
*/
BlobStoreSnapshotOperatorRef PrepareBlobStoreSnapshot(uint64_t operatorId, uint64_t snapshotId);
* Add one slice table snapshot to slice table.
* @param snapshotId snapshot id.
* @param sliceTableSnapshot slice table snapshot.
*/
void AddSliceTableSnapshot(uint64_t snapshotId, const SliceTableSnapshotRef &sliceTableSnapshot);
* Judge snapshot exist.
* @param snapshotId snapshot id.
* @return return true if exist, else return false.
*/
bool IsSnapshotIdExist(uint64_t snapshotId);
* Delete one slice table snapshot.
* @param snapshotId snapshot id to be deleted.
*/
void EraseSnapshotId(uint64_t snapshotId);
* For testing, force trigger evict slice table to lsm store.
*/
inline void ForceTriggerEvict()
{
mEvictManager->ForceEvict();
}
* For testing, force trigger compact lsm store.
*/
void ForceTriggerCompaction()
{
}
* For testing, force clean current version.
*/
void ForceCleanCurrentVersion()
{
mBucketGroupManager->ForceCleanCurrentVersion();
}
inline void AddSnapshotSyncTask(SnapshotSyncTaskRef &compactionTask, SnapshotSyncTaskRef &evictTask)
{
mSnapshotVersion.fetch_add(1, std::memory_order_release);
mEvictManager->AddSnapshotSyncTask(compactionTask);
mCompactManager->AddSnapshotSyncTask(evictTask);
}
inline uint32_t GetSnapshotVersion() const
{
return mSnapshotVersion.load(std::memory_order_acquire);
}
inline void SetCompactionStatus(const SliceTableCompaction &compactionStatus)
{
mIsCompaction.store(compactionStatus, std::memory_order_release);
}
BResult TryCleanUpEvictedSlices(const LogicalSliceChainRef &logicalSliceChain, uint32_t bucketIndex)
{
if (UNLIKELY(logicalSliceChain == nullptr)) {
LOG_ERROR("Logic slice chain is nullptr.");
return BSS_INNER_ERR;
}
mSliceBucketIndex->LockWrite(bucketIndex);
if (UNLIKELY(logicalSliceChain != mSliceBucketIndex->GetLogicChainedSlice(bucketIndex))) {
LOG_ERROR("Slice chain not matched.");
mSliceBucketIndex->Unlock(bucketIndex);
return BSS_INNER_ERR;
}
if (logicalSliceChain->GetSliceStatus() == SliceStatus::COMPACTING) {
mSliceBucketIndex->Unlock(bucketIndex);
return BSS_OK;
}
int32_t curIndex = 0;
while (curIndex <= logicalSliceChain->GetSliceChainTailIndex()) {
SliceAddressRef sliceAddress = logicalSliceChain->GetSliceAddress(curIndex);
if (UNLIKELY(sliceAddress == nullptr)) {
LOG_ERROR("sliceAddress is nullptr, index:" << curIndex);
mSliceBucketIndex->Unlock(bucketIndex);
return BSS_INNER_ERR;
}
if (!(sliceAddress->IsEvicted())) {
break;
}
curIndex++;
}
if (curIndex < static_cast<int32_t>(NO_5)) {
mSliceBucketIndex->Unlock(bucketIndex);
return BSS_OK;
}
LogicalSliceChainRef newLogicalSliceChain = mSliceBucketIndex->CreateLogicalChainedSlice();
if (UNLIKELY(newLogicalSliceChain == nullptr)) {
LOG_ERROR("create new slice chain failed, it is null.");
mSliceBucketIndex->Unlock(bucketIndex);
return BSS_INNER_ERR;
}
for (int32_t i = curIndex; i < static_cast<int32_t>(logicalSliceChain->GetCurrentSliceChainLen()); i++) {
newLogicalSliceChain->InsertSlice(logicalSliceChain->GetSliceAddress(i));
}
mBucketGroupManager->MarkLogicalSliceChainFlushed(newLogicalSliceChain,
mBucketGroupManager->GetBucketGroupVector()[0]);
newLogicalSliceChain->SetBaseSliceIndex(logicalSliceChain->GetBaseSliceIndex() - curIndex);
auto invalidSliceAddress = std::vector<SliceAddressRef>();
mSliceBucketIndex->UpdateLogicalSliceChain(bucketIndex, logicalSliceChain, newLogicalSliceChain);
mSliceBucketIndex->Unlock(bucketIndex);
return BSS_OK;
}
inline void AddSliceUsedMemory(int64_t size) const
{
mEvictManager->AddSliceUsedMemory(size);
}
inline void AddSliceHit()
{
if (mBoostNativeMetric == nullptr || !mBoostNativeMetric->IsSliceMetricEnabled()) {
return;
}
mBoostNativeMetric->AddSliceHitCount();
}
inline void AddSliceMiss()
{
if (mBoostNativeMetric == nullptr || !mBoostNativeMetric->IsSliceMetricEnabled()) {
return;
}
mBoostNativeMetric->AddSliceMissCount();
}
inline void AddSliceListHitMiss(std::stack<Value> &mergingValues, bool &isFound)
{
if (mBoostNativeMetric == nullptr || !mBoostNativeMetric->IsSliceMetricEnabled()) {
return;
}
if (mergingValues.empty() && !isFound) {
mBoostNativeMetric->AddSliceMissCount();
} else {
mBoostNativeMetric->AddSliceHitCount();
}
}
inline void AddSliceListHitMiss(IOResult &res)
{
if (mBoostNativeMetric == nullptr || !mBoostNativeMetric->IsSliceMetricEnabled()) {
return;
}
if (res == SLICE_FOUND || res == SLICE_FOUND_DELETE) {
mBoostNativeMetric->AddSliceHitCount();
return;
}
if (res == NOT_FOUND || res == LSM_FOUND) {
mBoostNativeMetric->AddSliceMissCount();
}
}
inline void AddSliceReadCount()
{
if (mBoostNativeMetric == nullptr || !mBoostNativeMetric->IsSliceMetricEnabled()) {
return;
}
mBoostNativeMetric->AddSliceReadCount();
}
inline void AddSliceReadMetric(uint64_t size)
{
if (mBoostNativeMetric == nullptr || !mBoostNativeMetric->IsSliceMetricEnabled()) {
return;
}
mBoostNativeMetric->AddSliceReadSize(size);
}
void RegisterSliceMetric(BoostNativeMetricPtr metricPtr)
{
mBoostNativeMetric = metricPtr;
mBucketGroupManager->RegisterMetric(metricPtr);
mEvictManager->RegisterEvictMetric(metricPtr);
mCompactManager->RegisterSliceCompactionMetric(metricPtr);
if (mBoostNativeMetric != nullptr && mBoostNativeMetric->IsSliceMetricEnabled()) {
mBoostNativeMetric->SetSliceChainAvgSize(
[this]() -> uint64_t { return mSliceBucketIndex->GetAvgSliceChainSize(); });
mBoostNativeMetric->SetSliceAvgSize([this]() -> uint64_t { return mSliceBucketIndex->GetAvgSliceSize(); });
}
}
inline bool NeedKvSeparate(uint32_t valueSize) const
{
return mIsKVSeparate && valueSize > mKvThreshold;
}
BResult RestoreBlobStore(const std::vector<SliceTableRestoreMetaRef> &metaList,
std::unordered_map<std::string, uint32_t> &restorePathFileIdMap);
TombstoneServiceRef GetTombstoneService()
{
return mTombstoneService;
}
void StopSliceTable()
{
mIsRunning.store(false);
}
bool IsRunning()
{
return mIsRunning;
}
private:
static uint32_t ComputeIndexBucketNum(uint64_t totalMem, const ConfigRef &config);
static uint32_t ComputeBucketGroupNum(uint32_t bucketNum, const ConfigRef &config);
BResult InitializeBlobStore();
inline SliceKVIteratorPtr GetIterator()
{
std::vector<std::vector<DataSliceRef>> dataSlices;
uint32_t curSlot = 0;
while (curSlot < mSliceBucketIndex->GetIndexCapacity()) {
auto curChain = mSliceBucketIndex->GetLogicChainedSlice(static_cast<uint32_t>(curSlot));
if (curChain == nullptr) {
LOG_ERROR("curChain is nullptr");
return nullptr;
}
if (curChain->IsNone()) {
curSlot++;
continue;
}
int32_t curChainIndex = curChain->GetSliceChainTailIndex();
std::vector<DataSliceRef> curChainSlice;
while (curChainIndex >= 0) {
SliceAddressRef sliceAddress = curChain->GetSliceAddress(curChainIndex);
if (sliceAddress == nullptr) {
LOG_ERROR("sliceAddress is nullptr");
return nullptr;
}
if (sliceAddress->IsEvicted()) {
break;
}
DataSliceRef dataSlice = sliceAddress->GetDataSlice();
if (dataSlice == nullptr) {
break;
}
curChainSlice.emplace(curChainSlice.begin(), dataSlice);
curChainIndex--;
}
dataSlices.emplace_back(curChainSlice);
curSlot++;
}
Ref<FlushingBucketGroupIterator> dataSliceVectorIterator = MakeRef<FlushingBucketGroupIterator>();
dataSliceVectorIterator->Initialize(dataSlices);
return std::make_shared<SliceKVIterator>(dataSliceVectorIterator, mMemManager);
}
BResult GetFromFile(const Key &key, LogicalSliceChainRef &logicalSliceChain, Value &finalResult,
std::stack<Value> &mergingValues, std::vector<SectionsReadContextRef> &readMetas) const;
BResult MergeList(std::deque<Value> &result, Value finalResult, std::stack<Value> &mergingValues) const;
BResult InternalGetList(const Key &key, std::deque<Value> &result, std::vector<SectionsReadContextRef> &readMetas);
void GetFromSliceChain(const Key &key, LogicalSliceChainRef &logicalSliceChain, int32_t curIndex,
uint32_t readLength, std::stack<Value> &mergingValues, Value &finalResult, bool &isFound);
bool GetFromSlice(const Key &key, std::stack<Value> &mergingValues, Value &finalResult,
SliceAddressRef &sliceAddress, DataSliceRef &dataSlice, bool &isFound) const;
void RegisterTombstoneService();
private:
std::atomic<bool> mIsRunning{ true };
mutable std::mutex mMutex;
SliceBucketIndexRef mSliceBucketIndex;
ConfigRef mConfig;
BucketGroupManagerRef mBucketGroupManager = std::make_shared<BucketGroupManager>();
FileCacheManagerRef mFileCache;
MemManagerRef mMemManager;
std::mutex mRunningSnapshotMutex;
std::unordered_map<uint64_t, SliceTableSnapshotRef> mRunningSnapshot;
EvictManagerRef mEvictManager;
SliceCompactionTriggerRef mCompactManager;
TombstoneServiceRef mTombstoneService = nullptr;
AccessRecorderRef mAccessRecorder;
std::atomic<uint32_t> mSnapshotVersion{ 0 };
std::atomic<SliceTableCompaction> mIsCompaction{ SliceTableCompaction::OPEN };
StateFilterManagerRef mStateFilterManager;
BoostNativeMetricPtr mBoostNativeMetric = nullptr;
bool mIsKVSeparate = false;
uint32_t mKvThreshold = 0;
bool mNeedReleaseBlockCache = false;
BlobStoreRef mBlobStore = nullptr;
};
using SliceTableManagerRef = std::shared_ptr<SliceTable>;
class SliceTablePrefixIterator : public Iterator<KeyValueRef> {
public:
~SliceTablePrefixIterator() override = default;
BResult Init(const LogicalSliceChainRef &logicalSliceChain, const Key &prefixKey);
bool HasNext() override;
KeyValueRef Next() override;
private:
void Advance();
bool mClosed = false;
uint32_t mCurIndex = 0;
Key mPrefixKey;
KeyValueRef mCurrent;
DataSliceRef mLastDataSlice;
SecKeyValueMap mVisited;
std::shared_ptr<SliceTable> mSliceTable;
std::vector<std::pair<SliceAddressRef, DataSliceRef>> mSliceAddressAndDataSlices;
KeyValueIteratorRef mCurrentSliceTableKvIterator;
LogicalSliceChainRef mLogicalSliceChain = nullptr;
uint32_t mFileIndex = 0;
KeyValueIteratorRef mCurrentFilePageMap = nullptr;
void SetSliceIter();
void SetFileIter(const std::vector<FilePageRef> &filePages);
};
}
}
#endif