* Copyright (c) Huawei Technologies Co., Ltd. 2025-2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#include "slice.h"
#include <algorithm>
#include <vector>
#if defined(__aarch64__)
#ifdef BUILD_SVE
#include <arm_sve.h>
#endif
#endif
#include "binary/slice_binary.h"
#include "common/util/bss_math.h"
#include "compare_slice_key.h"
#include "include/bss_err.h"
#include "include/bss_types.h"
#include "slice_table.h"
namespace ock {
namespace bss {
BResult ValueSpace::Put(uint32_t index, FreshValueNodePtr &value, const MemorySegment &freshSegment, bool isValue,
SliceTableManagerRef sliceTable, uint32_t keyHashCode, uint16_t stateId, uint64_t &seqId)
{
uint32_t count = 0;
bool deleteOrPut = false;
std::vector<FreshValueNode *> nodes;
RETURN_ERROR_AS_NULLPTR(value);
value->VisitAsList(freshSegment, [&](FreshValueNode &curVal) {
seqId = std::max(seqId, curVal.ValueSeqId());
if (curVal.ValueType() == ValueType::DELETE) {
deleteOrPut = true;
return false;
} else if (curVal.ValueType() == ValueType::PUT) {
deleteOrPut = true;
nodes.emplace_back(&curVal);
return false;
}
nodes.emplace_back(&curVal);
return true;
});
bool isKvSeparate = false;
for (auto node = nodes.rbegin(); node != nodes.rend(); ++node) {
count++;
uint32_t len = (*node)->ValueDataLen();
if (UNLIKELY(sliceTable != nullptr) && isValue && sliceTable->NeedKvSeparate(len)) {
uint64_t blobId = 0;
BResult result = sliceTable->WriteValueToBlobStore((*node), keyHashCode, stateId, blobId);
if (UNLIKELY(result != BSS_OK)) {
LOG_ERROR("Write value to blob store failed, ret: " << result);
return result;
}
len = sizeof(uint64_t);
mBuffer->WriteAt(reinterpret_cast<const uint8_t *>(&blobId), len, mValueDataBaseOffset + mWritenBytes);
isKvSeparate = true;
} else {
mBuffer->WriteAt((*node)->Value(), len, mValueDataBaseOffset + mWritenBytes);
}
mWritenBytes += len;
}
auto valType = isKvSeparate ? ValueType::SEPARATE : value->ValueType();
if (!isValue) {
valType = count ? (deleteOrPut ? ValueType::PUT : ValueType::APPEND) : ValueType::DELETE;
} else if (count > 1) {
LOG_ERROR("Value type should not have more than one element");
}
mValueOffsets[index] = (static_cast<uint8_t>(valType) << VALUE_INDICATOR_OFFSET) | (mWritenBytes & 0xFFFFFFF);
return BSS_OK;
}
BResult Slice::Initialize(std::vector<std::pair<SliceKey, Value>> &kvPairs, const SliceCreateMeta &meta,
const MemManagerRef &memManager, bool forceMemory)
{
if (kvPairs.empty()) {
return BSS_INVALID_PARAM;
}
mMemManager = memManager;
std::vector<std::pair<SliceKey, uint32_t>> sortedKeySlotList;
BResult result = CreateAndInitBuffer(meta, kvPairs, sortedKeySlotList, forceMemory);
RETURN_NOT_OK_NO_LOG(result);
result = FillBuffer(kvPairs, sortedKeySlotList);
RETURN_NOT_OK(result);
mInit = true;
return BSS_OK;
}
BResult Slice::CreateAndInitBuffer(const SliceCreateMeta &meta, std::vector<std::pair<SliceKey, Value>> &kvPairs,
std::vector<std::pair<SliceKey, uint32_t>> &sortedKeySlotList, bool forceMemory)
{
uint32_t kvCount = kvPairs.size();
uint32_t indexCount = BssMath::RoundUpToPowerOfTwo(kvCount);
uint32_t indexWidth = NO_4;
if (indexCount > BYTE4_MAX_SLOT_SIZE) {
indexWidth = NO_8;
}
uint32_t rightShiftBits = NO_31 - BssMath::NumberOfLeadingZeros(indexCount);
std::vector<std::pair<uint32_t, size_t>> sortedPairs;
sortedPairs.reserve(kvPairs.size());
for (size_t index = 0; index < kvPairs.size(); ++index) {
const auto &pair = kvPairs[index];
uint32_t rotatedHash = BssMath::RotateRight(pair.first.MixedHashCode(), rightShiftBits);
sortedPairs.emplace_back(rotatedHash, index);
}
std::sort(sortedPairs.begin(), sortedPairs.end(), [](const auto &a, const auto &b) { return a.first < b.first; });
std::vector<std::pair<SliceKey, Value>> newKvPairs;
newKvPairs.reserve(kvPairs.size());
for (const auto &pair : sortedPairs) {
newKvPairs.emplace_back(std::move(kvPairs[pair.second].first), std::move(kvPairs[pair.second].second));
}
kvPairs = std::move(newKvPairs);
uint32_t totalKeySize = 0;
uint32_t totalValueSize = 0;
StateIdInterval stateIdInterval;
for (uint32_t slot = 0; slot < kvPairs.size(); slot++) {
auto &pair = kvPairs.at(slot);
auto &key = pair.first;
totalKeySize += key.GetSerializeLength();
auto stateType = StateId::GetStateType(key.StateId());
#if SUPPORT_SLICE_SORT
if (StateTypeUtil::HasSecKey(stateType)) {
sortedKeySlotList.emplace_back(key, slot);
}
#endif
stateIdInterval.Update(key.StateId());
auto &value = pair.second;
if (value.ValueType() != DELETE) {
totalValueSize += value.ValueLen();
}
}
uint32_t bufferSize = 0;
bufferSize += sizeof(SliceHead);
uint32_t indexBase = bufferSize;
bufferSize += indexCount * indexWidth;
uint32_t sortedIndexBase = bufferSize;
uint32_t sortedKeyCount = sortedKeySlotList.size();
bufferSize += sortedKeyCount * sizeof(uint32_t);
uint32_t hashCodeBase = bufferSize;
bufferSize += kvCount * sizeof(uint32_t);
uint32_t seqIdBase = bufferSize;
bufferSize += kvCount * sizeof(uint64_t);
uint32_t keyOffsetBase = bufferSize;
bufferSize += kvCount * sizeof(uint32_t);
bufferSize += totalKeySize;
uint32_t valueOffsetBase = bufferSize;
bufferSize += kvCount * sizeof(uint32_t);
bufferSize += totalValueSize;
uintptr_t address;
auto retVal = mMemManager->GetMemory(MemoryType::SLICE_TABLE, bufferSize, address, forceMemory, mTimeWaitMemory);
if (retVal != BSS_OK) {
LOG_WARN("Alloc memory for init slice table failed, len:" << bufferSize << ", force:" << forceMemory);
return retVal;
}
mBuffer = MakeRef<ByteBuffer>(reinterpret_cast<uint8_t *>(address), bufferSize, mMemManager);
if (UNLIKELY(mBuffer == nullptr)) {
mMemManager->ReleaseMemory(address);
LOG_ERROR("Make ref buffer failed, newByteBuffer is null.");
return BSS_ALLOC_FAIL;
}
uint8_t *data = mBuffer->Data();
mHeader = reinterpret_cast<SliceHead *>(data);
FormatHeader(mHeader, meta, kvCount, indexCount, sortedKeyCount, keyOffsetBase, valueOffsetBase, stateIdInterval);
mHashCodeSpace = std::make_shared<HashCodeSpace>(mBuffer, hashCodeBase);
mSeqIdSpace = std::make_shared<SeqIdSpace>(mBuffer, seqIdBase);
mIndexSpace = std::make_shared<IndexSpace>(mBuffer, indexBase, indexCount, indexWidth, false);
mSortedIndexSpace = std::make_shared<SortedIndexSpace>(mBuffer, sortedIndexBase);
mKeySpace = std::make_shared<KeySpace>(mBuffer, keyOffsetBase, kvCount);
mValueSpace = std::make_shared<ValueSpace>(mBuffer, valueOffsetBase, kvCount);
mSliceSpace.Init(mBuffer, kvCount, sortedKeyCount, totalKeySize, false);
return BSS_OK;
}
BResult Slice::Initialize(RawDataSlice &rawDataSlice, const SliceCreateMeta &meta, const MemManagerRef &memManager,
bool &forceEvict, SliceTableManagerRef sliceTable)
{
mMemManager = memManager;
uint32_t kvCount = rawDataSlice.GetSliceData().size();
if (UNLIKELY(kvCount == 0)) {
return BSS_INVALID_PARAM;
}
std::vector<std::pair<BinaryKey, uint32_t>> sortedKeySlotList;
RETURN_NOT_OK_NO_LOG(CreateAndInitBuffer(meta, rawDataSlice, sortedKeySlotList, forceEvict, sliceTable));
RETURN_NOT_OK(FillBuffer(rawDataSlice, sortedKeySlotList, sliceTable));
mInit = true;
return BSS_OK;
}
namespace {
uint32_t GetTotalValLen(const MemorySegment &freshSegment, FreshValueNodePtr &value)
{
uint32_t len = 0;
value->VisitAsList(freshSegment, [&](FreshValueNode &curVal) {
if (curVal.ValueType() != ValueType::DELETE) {
len += curVal.ValueDataLen();
}
return curVal.ValueType() != ValueType::DELETE && curVal.ValueType() != ValueType::PUT;
});
return len;
}
}
BResult Slice::CreateAndInitBuffer(const SliceCreateMeta &meta, RawDataSlice &rawDataSlice,
std::vector<std::pair<BinaryKey, uint32_t>> &sortedKeySlotList, bool &forceEvict,
SliceTableManagerRef sliceTable)
{
auto &kvPairs = rawDataSlice.GetSliceData();
auto &indexVec = rawDataSlice.GetVectorGroup()->GetIndexVec();
auto &mixHashCodeVec = rawDataSlice.GetVectorGroup()->GetMixHashCodeVec();
uint32_t kvCount = kvPairs.size();
uint32_t indexCount = BssMath::RoundUpToPowerOfTwo(kvCount);
uint32_t indexWidth = NO_4;
if (indexCount > BYTE4_MAX_SLOT_SIZE) {
indexWidth = NO_8;
}
uint32_t rightShiftBits = NO_31 - BssMath::NumberOfLeadingZeros(indexCount);
std::vector<uint32_t> rotatedHashes;
rotatedHashes.reserve(indexVec.size());
for (uint32_t index : indexVec) {
rotatedHashes.emplace_back(BssMath::RotateRight(mixHashCodeVec[index], rightShiftBits));
}
std::sort(indexVec.begin(), indexVec.end(),
[&](uint32_t a, uint32_t b) { return rotatedHashes[a] < rotatedHashes[b]; });
uint32_t totalKeySize = 0;
uint32_t totalValueSize = 0;
StateIdInterval stateIdInterval;
uint32_t slot = 0;
for (uint32_t index : indexVec) {
auto &pair = kvPairs[index];
auto &key = pair.first;
totalKeySize += key.GetSerializeLength();
auto stateType = StateId::GetStateType(key.mPrimaryKey.mStateId);
#if SUPPORT_SLICE_SORT
if (StateTypeUtil::HasSecKey(stateType)) {
sortedKeySlotList.emplace_back(key, slot);
}
slot++;
#endif
stateIdInterval.Update(key.mPrimaryKey.mStateId);
auto &value = pair.second;
uint32_t valueLen = GetTotalValLen(*rawDataSlice.GetFreshSegment(), value);
if (UNLIKELY(sliceTable != nullptr) && key.mIsValue && sliceTable->NeedKvSeparate(valueLen)) {
valueLen = sizeof(uint64_t);
}
totalValueSize += valueLen;
forceEvict = forceEvict || (valueLen > IO_SIZE_4M && StateId::IsList(key.mPrimaryKey.mStateId));
}
uint32_t bufferSize = 0;
bufferSize += sizeof(SliceHead);
uint32_t indexBase = bufferSize;
bufferSize += indexCount * indexWidth;
uint32_t sortedIndexBase = bufferSize;
uint32_t sortedKeyCount = sortedKeySlotList.size();
bufferSize += sortedKeyCount * sizeof(uint32_t);
uint32_t hashCodeBase = bufferSize;
bufferSize += kvCount * sizeof(uint32_t);
uint32_t seqIdBase = bufferSize;
bufferSize += kvCount * sizeof(uint64_t);
uint32_t keyOffsetBase = bufferSize;
bufferSize += kvCount * sizeof(uint32_t);
bufferSize += totalKeySize;
uint32_t valueOffsetBase = bufferSize;
bufferSize += kvCount * sizeof(uint32_t);
bufferSize += totalValueSize;
uintptr_t addr = NO_0;
BResult ret = mMemManager->GetMemoryDirect(MemoryType::SLICE_TABLE, bufferSize, addr);
RETURN_NOT_OK_NO_LOG(ret);
mBuffer = MakeRef<ByteBuffer>(reinterpret_cast<uint8_t *>(addr), bufferSize, mMemManager);
if (UNLIKELY(mBuffer == nullptr)) {
mMemManager->ReleaseMemory(addr);
LOG_ERROR("Make ref buffer failed, byteBuffer is null.");
return BSS_ALLOC_FAIL;
}
uint8_t *data = mBuffer->Data();
mHeader = reinterpret_cast<SliceHead *>(data);
FormatHeader(mHeader, meta, kvCount, indexCount, sortedKeyCount, keyOffsetBase, valueOffsetBase, stateIdInterval);
mHashCodeSpace = std::make_shared<HashCodeSpace>(mBuffer, hashCodeBase);
mSeqIdSpace = std::make_shared<SeqIdSpace>(mBuffer, seqIdBase);
mIndexSpace = std::make_shared<IndexSpace>(mBuffer, indexBase, indexCount, indexWidth, false);
mSortedIndexSpace = std::make_shared<SortedIndexSpace>(mBuffer, sortedIndexBase);
mKeySpace = std::make_shared<KeySpace>(mBuffer, keyOffsetBase, kvCount);
mValueSpace = std::make_shared<ValueSpace>(mBuffer, valueOffsetBase, kvCount);
mSliceSpace.Init(mBuffer, kvCount, sortedKeyCount, totalKeySize, false);
return BSS_OK;
}
void Slice::FormatHeader(SliceHead *header, const SliceCreateMeta &meta, uint32_t keyCount, uint32_t indexCount,
uint32_t sortedKeyCount, uint32_t keyOffsetBase, uint32_t valueOffsetBase,
StateIdInterval stateIdInterval)
{
header->sliceId = meta.logicSliceId;
header->keyCount = keyCount;
header->indexCount = indexCount;
header->keyOffsetBase = keyOffsetBase;
header->valueOffsetBase = valueOffsetBase;
header->version = meta.version;
header->sortedKeyCount = sortedKeyCount;
header->formatVersion = 0;
header->stateIdInterval = stateIdInterval;
}
BResult Slice::FillBuffer(const std::vector<std::pair<SliceKey, Value>> &kvPairs,
std::vector<std::pair<SliceKey, uint32_t>> &sortedKeySlotList)
{
uint32_t curIndex = 0;
for (auto &kv : kvPairs) {
RETURN_NOT_OK(PutKv(curIndex++, kv));
}
if (!sortedKeySlotList.empty()) {
std::sort(sortedKeySlotList.begin(), sortedKeySlotList.end(),
[](const std::pair<SliceKey, uint32_t> &first, const std::pair<SliceKey, uint32_t> &second) {
return first.first.ComparePrimaryKeyAndSecondKey(second.first);
});
uint32_t slot = 0;
for (const auto &sortedKeySlot : sortedKeySlotList) {
mSortedIndexSpace->Put(slot++, sortedKeySlot.second);
}
}
return BSS_OK;
}
BResult Slice::FillBuffer(RawDataSlice &rawDataSlice, std::vector<std::pair<BinaryKey, uint32_t>> &sortedKeySlotList,
SliceTableManagerRef sliceTable)
{
uint32_t curIndex = 0;
auto &kvPairs = rawDataSlice.GetSliceData();
auto &indexVec = rawDataSlice.GetVectorGroup()->GetIndexVec();
uint32_t indexNum = indexVec.size();
const auto &freshSegment = *rawDataSlice.GetFreshSegment();
auto &mixHashCodeVec = rawDataSlice.GetVectorGroup()->GetMixHashCodeVec();
std::vector<uint32_t> indexIdVec;
indexIdVec.reserve(indexNum * sizeof(uint32_t));
#if defined(__aarch64__)
#ifdef BUILD_SVE
{
const uint32_t *indicesPtr = indexVec.data();
const uint32_t *srcVecPtr = mixHashCodeVec.data();
const auto &dataPtr = mHashCodeSpace->GetDataPtr();
const auto &srcIndexIdVecPtr = indexIdVec.data();
const uint32_t vl = svcntw();
const uint32_t mainLoopEnd = indexNum & ~(vl - 1);
const svbool_t allPred = svptrue_b32();
const svuint32_t maskVec = svdup_n_u32(mHeader->indexCount - 1);
for (uint32_t i = 0; i < mainLoopEnd; i += vl) {
svuint32_t index_vec = svld1_u32(allPred, indicesPtr + i);
svuint32_t data_vec = svld1_gather_u32index_u32(allPred, srcVecPtr, index_vec);
svst1_u32(allPred, dataPtr + i, data_vec);
svuint32_t indexId_vec = svand_u32_z(allPred, data_vec, maskVec);
svst1_u32(allPred, srcIndexIdVecPtr + i, indexId_vec);
}
uint32_t tailStart = mainLoopEnd;
if (tailStart < indexNum) {
const svbool_t tailPred = svwhilelt_b32(tailStart, indexNum);
svuint32_t index_vec = svld1_u32(tailPred, indicesPtr + tailStart);
svuint32_t data_vec = svld1_gather_u32index_u32(tailPred, srcVecPtr, index_vec);
svst1_u32(tailPred, dataPtr + tailStart, data_vec);
svuint32_t indexId_vec = svand_u32_z(tailPred, data_vec, maskVec);
svst1_u32(tailPred, srcIndexIdVecPtr + tailStart, indexId_vec);
}
}
#else
uint32_t hashIndex = 0;
for (uint32_t index : indexVec) {
mHashCodeSpace->Put(hashIndex, mixHashCodeVec[index]);
indexIdVec.emplace_back(mixHashCodeVec[index] & (mHeader->indexCount - 1));
hashIndex++;
}
#endif
#else
uint32_t hashIndex = 0;
for (uint32_t index : indexVec) {
std::string arr(IO_SIZE_256K, '\0');
char *arrPtr = &arr[0];
auto ret = memset_s(arrPtr, IO_SIZE_256K, 'a', IO_SIZE_256K);
if (UNLIKELY(ret != EOK)) {
LOG_ERROR("arrPtr memset failed, ret: " << ret << ".");
return BSS_ERR;
}
mHashCodeSpace->Put(hashIndex, mixHashCodeVec[index]);
indexIdVec.emplace_back(mixHashCodeVec[index] & (mHeader->indexCount - 1));
hashIndex++;
}
#endif
for (uint32_t index : indexVec) {
auto &kv = kvPairs[index];
mIndexSpace->Put(indexIdVec[curIndex], curIndex);
BinaryKey key = kv.first;
mKeySpace->Put(curIndex, key);
uint64_t seqId = 0;
RETURN_NOT_OK(mValueSpace->Put(curIndex, kv.second, freshSegment, kv.first.mIsValue, sliceTable,
key.KeyHashCode(), key.StateId(), seqId));
mSeqIdSpace->Put(curIndex, seqId);
curIndex++;
}
if (!sortedKeySlotList.empty()) {
std::sort(sortedKeySlotList.begin(), sortedKeySlotList.end(),
[](const std::pair<BinaryKey, uint32_t> &first, const std::pair<BinaryKey, uint32_t> &second) {
return first.first.ComparePrimaryKeyAndSecondKey(second.first) < 0;
});
#if defined(__aarch64__)
#ifdef BUILD_SVE
{
const uint32_t total = sortedKeySlotList.size();
uint32_t *destPtr = mSortedIndexSpace->GetDataPtr();
std::pair<BinaryKey, uint32_t> pair;
const uint32_t structSize = sizeof(std::pair<BinaryKey, uint32_t>);
const uint32_t secondOffset = reinterpret_cast<const char *>(&pair.second) -
reinterpret_cast<const char *>(&pair);
auto basePtr = reinterpret_cast<const uint32_t *>(sortedKeySlotList.data());
const uint32_t vl = svcntw();
const uint32_t mainLoopEnd = total & ~(vl - 1);
const svbool_t allPred = svptrue_b32();
for (uint32_t i = 0; i < mainLoopEnd; i += vl) {
svuint32_t indices = svindex_u32(i, 1);
svuint32_t byteOffsets = svmad_u32_x(allPred, indices, svdup_u32(structSize), svdup_u32(secondOffset));
svuint32_t data = svld1_gather_u32offset_u32(allPred, basePtr, byteOffsets);
svst1(allPred, &destPtr[i], data);
}
uint32_t tailStart = mainLoopEnd;
if (tailStart < total) {
const svbool_t tailPred = svwhilelt_b32(tailStart, total);
svuint32_t indices = svindex_u32(tailStart, 1);
svuint32_t byteOffsets = svmad_u32_x(tailPred, indices, svdup_u32(structSize), svdup_u32(secondOffset));
svuint32_t data = svld1_gather_u32offset_u32(tailPred, basePtr, byteOffsets);
svst1(tailPred, &destPtr[tailStart], data);
}
}
#else
uint32_t slot = 0;
for (const auto &sortedKeySlot : sortedKeySlotList) {
mSortedIndexSpace->Put(slot++, sortedKeySlot.second);
}
#endif
#else
uint32_t slot = 0;
for (const auto &sortedKeySlot : sortedKeySlotList) {
mSortedIndexSpace->Put(slot++, sortedKeySlot.second);
}
#endif
}
return BSS_OK;
}
void Slice::GetSliceKVMap(SliceKVMap &dataMap, bool skipDeleted)
{
uint32_t keyCount = mHeader->keyCount;
dataMap.clear();
dataMap.reserve(keyCount);
for (uint32_t i = 0; i < keyCount; ++i) {
auto sliceKey = GetBinaryKey(i);
Value value = mValueSpace->Get(i, mSeqIdSpace->Get(i));
if (skipDeleted && (value.ValueType() == DELETE)) {
continue;
}
dataMap[sliceKey] = value;
}
}
KeyValueIteratorRef Slice::SubIterator(const Key &prefixKey, bool skipDeleted)
{
if (mHeader->keyCount == 0 || mHeader->sortedKeyCount == 0) {
return nullptr;
}
if (!mHeader->stateIdInterval.Contains(prefixKey.PriKey().StateId())) {
return nullptr;
}
uint32_t startSlot = FindStartSortedIndexSlot(prefixKey);
if (startSlot >= mHeader->sortedKeyCount) {
return nullptr;
}
uint32_t endSlot = FindEndSortedIndexSlot(prefixKey);
if (endSlot < startSlot || endSlot == INVALID_U32_INDEX) {
return nullptr;
}
std::vector<KeyValueRef> keyValues;
for (uint32_t slot = startSlot; slot <= endSlot; ++slot) {
auto keyValue = MakeRef<KeyValue>();
mSliceSpace.GetKeyValue(mSliceSpace.GetIndexBySortIndex(slot), keyValue);
if (!skipDeleted || keyValue->value.ValueType() != DELETE) {
keyValues.emplace_back(std::move(keyValue));
}
}
return std::make_shared<KeyValueVectorIterator>(std::move(keyValues));
}
SliceKey Slice::GetBinaryKey(uint32_t keyCursor)
{
SliceKey sliceKey;
uint32_t mixHashCode = mHashCodeSpace->Get(keyCursor);
mKeySpace->Get(keyCursor, mixHashCode, sliceKey);
return sliceKey;
}
uint32_t Slice::GetCompactionCount()
{
if (!mInit) {
return 0;
}
return mHeader->keyCount;
}
BResult Slice::BytesSize(uint32_t &size)
{
if (!mInit) {
return BSS_NOT_READY;
}
size = mBuffer->Capacity();
return BSS_OK;
}
void Slice::RestoreSliceUseByteBuffer(ByteBufferRef byteBuffer, MemManagerRef memManager)
{
mInit = true;
mBuffer = byteBuffer;
mMemManager = memManager;
uint8_t *data = mBuffer->Data();
mHeader = reinterpret_cast<SliceHead *>(data);
uint32_t kvCount = mHeader->keyCount;
uint32_t indexCount = mHeader->indexCount;
uint32_t indexWidth = NO_4;
if (indexCount > BYTE4_MAX_SLOT_SIZE) {
indexWidth = NO_8;
}
uint32_t bufferSize = 0;
bufferSize += sizeof(SliceHead);
uint32_t indexBase = bufferSize;
bufferSize += indexCount * indexWidth;
uint32_t sortedIndexBase = bufferSize;
bufferSize += mHeader->sortedKeyCount * sizeof(uint32_t);
uint32_t hashCodeBase = bufferSize;
bufferSize += kvCount * sizeof(uint32_t);
uint32_t seqIdBase = bufferSize;
uint32_t keyOffsetBase = mHeader->keyOffsetBase;
uint32_t valueOffsetBase = mHeader->valueOffsetBase;
mHashCodeSpace = std::make_shared<HashCodeSpace>(mBuffer, hashCodeBase);
mSeqIdSpace = std::make_shared<SeqIdSpace>(mBuffer, seqIdBase);
mIndexSpace = std::make_shared<IndexSpace>(mBuffer, indexBase, indexCount, indexWidth, true);
mSortedIndexSpace = std::make_shared<SortedIndexSpace>(mBuffer, sortedIndexBase);
mKeySpace = std::make_shared<KeySpace>(mBuffer, keyOffsetBase, kvCount);
mValueSpace = std::make_shared<ValueSpace>(mBuffer, valueOffsetBase, kvCount);
uint32_t totalKeySize = valueOffsetBase - kvCount * sizeof(uint32_t) - keyOffsetBase;
mSliceSpace.Init(mBuffer, kvCount, mHeader->sortedKeyCount, totalKeySize, true);
}
bool Slice::Get(const Key &key, Value &value)
{
if (UNLIKELY(!mInit)) {
return false;
}
uint32_t targetMixedHashCode = key.MixedHashCode();
uint32_t indexLen = mHeader->indexCount;
uint32_t indexSlot = targetMixedHashCode & (indexLen - 1);
uint32_t count = 0;
uint32_t startSlot = 0;
if (!mIndexSpace->Get(indexSlot, count, startSlot)) {
return false;
}
bool useBinarySearch = (count > NO_3 && mHeader->formatVersion >= 0);
if (useBinarySearch) {
return BinarySearch(key, targetMixedHashCode, startSlot, count, value);
}
return LinearSearch(key, targetMixedHashCode, startSlot, count, value);
}
uint32_t Slice::FindStartSortedIndexSlot(const Key &startKey)
{
if (UNLIKELY(mHeader->sortedKeyCount == 0)) {
return 0;
}
uint32_t low = 0;
uint32_t high = mHeader->sortedKeyCount - 1;
auto &startPriKey = startKey.PriKey();
while (low <= high) {
uint32_t mid = low + ((high - low) >> NO_1);
int32_t cmp = mSliceSpace.FindPriKeyBySortedIndex(mid, startPriKey);
if (cmp < 0) {
low = mid + 1;
continue;
}
if (mid == 0) {
return 0;
}
high = mid - 1;
}
return high + 1;
}
uint32_t Slice::FindEndSortedIndexSlot(const Key &endKey)
{
uint32_t sortedKeyCount = mSliceSpace.SortedKeyCount();
if (UNLIKELY(sortedKeyCount == 0)) {
return 0;
}
uint32_t low = 0;
uint32_t high = sortedKeyCount - 1;
auto &endPriKey = endKey.PriKey();
while (low <= high) {
uint32_t mid = low + ((high - low) >> NO_1);
int32_t cmp = mSliceSpace.FindPriKeyBySortedIndex(mid, endPriKey);
if (cmp > 0) {
if (mid == 0) {
break;
}
high = mid - 1;
continue;
}
low = mid + 1;
}
return low == 0 ? INVALID_U32_INDEX : low - 1;
}
BResult Slice::BinarySearchBound(uint32_t targetMixedHashCode, uint32_t startSlot, uint32_t indexCount,
uint32_t &lowerBound, uint32_t &upperBound)
{
uint32_t low = 0;
uint32_t high = indexCount;
uint32_t mid = 0;
uint32_t mixedHashCode = 0;
while (low < high) {
mid = low + ((high - low) >> NO_1);
mixedHashCode = mSliceSpace.GetKeyMixedHashCode(startSlot + mid);
if (mixedHashCode < targetMixedHashCode) {
low = mid + 1;
} else {
high = mid;
}
}
if (low == indexCount || mSliceSpace.GetKeyMixedHashCode(startSlot + low) != targetMixedHashCode) {
return false;
}
lowerBound = low;
upperBound = lowerBound;
while (upperBound < indexCount && mSliceSpace.GetKeyMixedHashCode(startSlot + upperBound) == targetMixedHashCode) {
upperBound++;
}
return true;
}
void KeySpace::Get(uint32_t index, uint32_t mixHashCode, SliceKey &sliceKey)
{
uint32_t keyEndOffset = mKeyOffsets[index];
uint32_t keyStartOffset = 0;
if (index == 0) {
keyStartOffset = 0;
} else {
keyStartOffset = mKeyOffsets[index - 1];
}
sliceKey.Unpack(mBuffer, mKeyDataBaseOffset + keyStartOffset, keyEndOffset - keyStartOffset, mixHashCode);
}
}
}