* Copyright (c) Huawei Technologies Co., Ltd. 2024. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <linux/futex.h>
#include <cstdint>
#include <numeric>
#include <utility>
#include <sys/mman.h>
#include <sys/syscall.h>
#include <unistd.h>
#include "datasystem/common/string_intern/string_ref.h"
#include "securec.h"
#include "datasystem/common/util/raii.h"
#include "datasystem/common/util/status_helper.h"
#include "datasystem/common/inject/inject_point.h"
#include "datasystem/common/perf/perf_manager.h"
#include "datasystem/common/stream_cache/stream_data_page.h"
#include "datasystem/common/util/bitmask_enum.h"
#include "datasystem/common/util/format.h"
#include "datasystem/common/util/memory.h"
#include "datasystem/common/util/strings_util.h"
#include "datasystem/stream/stream_config.h"
namespace datasystem {
Status PageLock::FutexWait(uint32_t *lockArea, uint32_t *waitCount, uint32_t val, uint64_t timeoutMs)
{
auto t = MilliSecondsToTimeSpec(timeoutMs);
auto fetchVal1 = __atomic_fetch_add(waitCount, 1, __ATOMIC_SEQ_CST);
auto res = syscall(SYS_futex, lockArea, FUTEX_WAIT, val, &t, nullptr, 0);
auto fetchVal2 = __atomic_fetch_sub(waitCount, 1, __ATOMIC_SEQ_CST);
const int warningVal = 1000;
LOG_IF(INFO, fetchVal2 > warningVal) << FormatString(
"Wait count before increment: %zu, wait count before decrement: %zu", fetchVal1, fetchVal2);
CHECK_FAIL_RETURN_STATUS_PRINT_ERROR(
res != -1 || errno == EAGAIN || errno == ETIMEDOUT || errno == EINTR, K_RUNTIME_ERROR,
FormatString("Futex wait error. Errno = %d. Message %s", errno, StrErr(errno)));
RETURN_OK_IF_TRUE(res == 0 || errno == EAGAIN || errno == EINTR);
RETURN_STATUS(K_TRY_AGAIN, FormatString("[%s:%s] Timeout after %zu ms", __FUNCTION__, __LINE__, timeoutMs));
}
Status PageLock::FutexWake(uint32_t *lockArea, uint32_t *waitCount, int numToWakeUp)
{
PerfPoint point1(PerfKey::PAGE_WAKE_CONSUMER);
auto numWaiter = __atomic_load_n(waitCount, __ATOMIC_SEQ_CST);
RETURN_OK_IF_TRUE(numWaiter == 0);
PerfPoint point(PerfKey::PAGE_FUTEX_WAKE);
auto res = syscall(SYS_futex, lockArea, FUTEX_WAKE, numToWakeUp, nullptr, nullptr, 0);
point.Record();
CHECK_FAIL_RETURN_STATUS_PRINT_ERROR(
res != -1, K_RUNTIME_ERROR, FormatString("futex wake error. Errno = %d. Message %s", errno, StrErr(errno)));
VLOG_IF(SC_INTERNAL_LOG_LEVEL, res > 0) << FormatString("Wake up %zu waiters", res);
return Status::OK();
}
PageLock::PageLock(uint32_t *lockArea, uint32_t *waitArea, uint32_t lockId)
: lockFlag_(lockArea), waitCount_(waitArea), lockId_(lockId)
{
}
Status PageLock::Lock(uint64_t timeoutMs)
{
const uint64_t minTimeoutMs = 5;
timeoutMs = std::max(minTimeoutMs, timeoutMs);
PerfPoint point(PerfKey::PAGE_INSERT_GET_LOCK);
Timer timer;
uint64_t useTimeMs = 0;
const uint64_t futexThreshold = 10;
auto lockFunc = [this]() {
uint32_t val = __atomic_load_n(lockFlag_, __ATOMIC_SEQ_CST);
if (val & WRITE_LOCK_NUM) {
return false;
}
uint32_t lockVal = (lockId_ << SHIFT) | WRITE_LOCK_NUM;
return __atomic_compare_exchange_n(lockFlag_, &val, lockVal, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
};
do {
RETURN_OK_IF_TRUE(lockFunc());
useTimeMs = static_cast<uint64_t>(timer.ElapsedMilliSecond());
CHECK_FAIL_RETURN_STATUS(useTimeMs < timeoutMs, K_TRY_AGAIN,
FormatString("[%s:%s] Timeout after %zu ms", __FUNCTION__, __LINE__, timeoutMs));
if (static_cast<uint64_t>(timer.ElapsedMilliSecond() >= futexThreshold)) {
auto remainingMs = timeoutMs - useTimeMs;
Status rc = PageLock::FutexWait(lockFlag_, waitCount_, WRITE_LOCK_NUM, remainingMs);
useTimeMs = static_cast<uint64_t>(timer.ElapsedMilliSecond());
if (rc.IsOk()) {
continue;
}
RETURN_IF_NOT_OK_EXCEPT(rc, K_TRY_AGAIN);
}
} while (useTimeMs < timeoutMs);
RETURN_STATUS(K_TRY_AGAIN, FormatString("[%s:%s] Timeout after %zu ms", __FUNCTION__, __LINE__, timeoutMs));
}
void PageLock::Unlock()
{
PerfPoint point(PerfKey::PAGE_INSERT_RELEASE_LOCK);
if (__atomic_load_n(lockFlag_, __ATOMIC_SEQ_CST) & WRITE_LOCK_NUM) {
uint32_t expectedVal = (lockId_ << SHIFT) | WRITE_LOCK_NUM;
if (__atomic_compare_exchange_n(lockFlag_, &expectedVal, NO_LOCK_NUM, false, __ATOMIC_SEQ_CST,
__ATOMIC_SEQ_CST)) {
VLOG(SC_DEBUG_LOG_LEVEL) << "Success to unlock the write lock";
LOG_IF_ERROR(PageLock::FutexWake(lockFlag_, waitCount_, 1), "Futex unlock");
}
}
}
bool PageLock::TryUnlockByLockId(uint32_t lockId)
{
uint32_t expectedVal = (lockId << SHIFT) | WRITE_LOCK_NUM;
uint32_t newVal = WRITE_LOCK_NUM;
return __atomic_compare_exchange_n(lockFlag_, &expectedVal, newVal, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
}
StreamPageLock::StreamPageLock(std::shared_ptr<StreamDataPage> page) : pageLocked_(false), page_(std::move(page))
{
}
StreamPageLock::~StreamPageLock()
{
if (pageLocked_) {
page_->Unlock();
}
}
Status StreamPageLock::Lock(uint64_t timeoutMs)
{
RETURN_RUNTIME_ERROR_IF_NULL(page_);
RETURN_IF_NOT_OK(page_->Lock(timeoutMs));
pageLocked_ = true;
return Status::OK();
}
void StreamPageLock::Unlock()
{
if (pageLocked_) {
page_->Unlock();
pageLocked_ = false;
}
}
void ElementHeader::Set(Ptr ptr, Size size, Version version)
{
headerPtr_ = ptr;
headerSize_ = size;
headerVersion_ = version;
}
HeaderAndData::HeaderAndData(const Element &element, const ElementHeader &header, uint64_t streamNo)
: Element(std::move(element)), ElementHeader(std::move(header)), streamNo(streamNo)
{
}
HeaderAndData::HeaderAndData(const Ptr ptr, const Size size, uint64_t streamNo) : Element(ptr, size), streamNo(streamNo)
{
}
HeaderAndData::Size HeaderAndData::TotalSize() const
{
if (headerSize_) {
return size + headerSize_ + sizeof(Version);
}
return size;
}
Status HeaderAndData::MemoryCopyTo(Ptr dest) const
{
if (headerSize_) {
*dest = headerVersion_;
dest++;
RETURN_IF_NOT_OK(HugeMemoryCopy(dest, headerSize_, headerPtr_, headerSize_));
dest += headerSize_;
}
return HugeMemoryCopy(dest, size, ptr, size);
}
DataVerificationHeader::DataVerificationHeader(SeqNo seqNo, SenderProducerNo senderProducerNo, Address address,
Port port)
{
hdr.seqNo = seqNo;
hdr.senderProducerNo = senderProducerNo;
hdr.address = address;
hdr.port = port;
}
DataVerificationHeader::DataVerificationHeader(const ElementHeader &ele)
{
HugeMemoryCopy(bytes, sizeof(bytes), ele.headerPtr_, sizeof(bytes));
}
DataVerificationHeader::SeqNo DataVerificationHeader::GetSeqNo() const
{
return hdr.seqNo;
}
DataVerificationHeader::SenderProducerNo DataVerificationHeader::GetSenderProducerNo() const
{
return hdr.senderProducerNo;
}
DataVerificationHeader::Address DataVerificationHeader::GetAddress() const
{
return hdr.address;
}
DataVerificationHeader::Port DataVerificationHeader::GetPort() const
{
return hdr.port;
}
DataVerificationHeader::Size DataVerificationHeader::HeaderSize() const
{
return sizeof(bytes);
}
void DataVerificationHeader::Set(SeqNo seqNo, SenderProducerNo senderProducerNo, Address address, Port port)
{
hdr.seqNo = seqNo;
hdr.senderProducerNo = senderProducerNo;
hdr.address = address;
hdr.port = port;
}
Status DataVerificationHeader::ExtractHeader(DataElement &element, ElementHeader &header)
{
CHECK_FAIL_RETURN_STATUS(
element.size > sizeof(bytes), K_OUT_OF_RANGE,
FormatString("Element (header + data) size %llu is not greater than DataVerificationHeader size %lu",
element.size, sizeof(bytes)));
header.Set(element.ptr, sizeof(bytes), DATA_VERIFICATION_HEADER);
element.ptr += sizeof(bytes);
element.size -= sizeof(bytes);
return Status::OK();
}
ShmKey StreamPageBase::CreatePageId(const std::shared_ptr<ShmUnitInfo> &shmInfo)
{
return ShmKey::Intern(
FormatString("F:%zu-M:%zu-O:%zu-S:%zu", shmInfo->fd, shmInfo->mmapSize, shmInfo->offset, shmInfo->size));
}
StreamPageBase::StreamPageBase(std::shared_ptr<ShmUnitInfo> shmInfo)
{
pageUnit_ = std::move(shmInfo);
pageUnit_->id = CreatePageId(pageUnit_);
}
StreamPageBase::StreamPageBase(std::shared_ptr<ShmUnitInfo> shmInfo, std::shared_ptr<client::IMmapTableEntry> mmapEntry)
: StreamPageBase(std::move(shmInfo))
{
mmapEntry_ = std::move(mmapEntry);
}
void StreamPageBase::Init(bool isClient)
{
startOfPage_ = reinterpret_cast<uint8_t *>(pageUnit_->pointer) + ((isClient) ? pageUnit_->offset : 0);
}
ShmView StreamPageBase::GetShmView() const
{
ShmView v = { .fd = pageUnit_->fd, .mmapSz = pageUnit_->mmapSize, .off = pageUnit_->offset, .sz = pageUnit_->size };
return v;
}
std::shared_ptr<ShmUnitInfo> StreamPageBase::GetShmUnitInfo() const
{
return std::make_shared<ShmUnitInfo>(pageUnit_->id, GetShmView(), pageUnit_->pointer);
}
StreamLobPage::StreamLobPage(std::shared_ptr<ShmUnitInfo> shmInfo, bool isClient)
: StreamPageBase(std::move(shmInfo)), isClient_(isClient)
{
}
StreamLobPage::StreamLobPage(std::shared_ptr<ShmUnitInfo> shmInfo, bool isClient,
std::shared_ptr<client::IMmapTableEntry> mmapEntry)
: StreamPageBase(std::move(shmInfo), std::move(mmapEntry)), isClient_(isClient)
{
}
Status StreamLobPage::Insert(const HeaderAndData &element)
{
size_t totalFreeSpace = pageUnit_->size;
auto spaceNeeded = element.TotalSize();
CHECK_FAIL_RETURN_STATUS(spaceNeeded <= totalFreeSpace, K_NO_SPACE, "Not enough space");
RETURN_IF_NOT_OK(element.MemoryCopyTo(startOfPage_));
LOG(INFO) << FormatString("[%s] Big element insert successful. Size %zu", GetPageId(), element.size);
return Status::OK();
}
Status StreamLobPage::Init()
{
RETURN_RUNTIME_ERROR_IF_NULL(pageUnit_->pointer);
StreamPageBase::Init(isClient_);
return Status::OK();
}
StreamDataPage::StreamDataPage(std::shared_ptr<ShmUnitInfo> shmInfo, uint32_t lockId, bool isClient, bool isSharedPage,
std::shared_ptr<client::IMmapTableEntry> mmapEntry)
: StreamPageBase(std::move(shmInfo), std::move(mmapEntry)),
lockId_(lockId),
isClient_(isClient),
maxElementSize_(0),
isSharedPage_(isSharedPage)
{
}
Status StreamDataPage::Init()
{
RETURN_RUNTIME_ERROR_IF_NULL(pageUnit_->pointer);
StreamPageBase::Init(isClient_);
auto *data = startOfPage_;
pageHeader_ = reinterpret_cast<decltype(pageHeader_)>(data);
tail_ = reinterpret_cast<SharedMemView *>(data + PageSize() - sizeof(SharedMemView));
pageLock_ = std::make_shared<PageLock>(&pageHeader_->lockArea_, &pageHeader_->lockWait_, lockId_);
slotDir_ = reinterpret_cast<SlotFlagOffset *>(&pageHeader_->slot0_);
if (isClient_) {
auto slot0 = __atomic_load_n(slotDir_, __ATOMIC_SEQ_CST);
isSharedPage_ = slot0 & PAGE_SHARED_BIT;
}
maxElementSize_ = static_cast<int64_t>(PageSize()) - static_cast<int64_t>(PageOverhead(isSharedPage_));
CHECK_FAIL_RETURN_STATUS_PRINT_ERROR(maxElementSize_ > 0, K_INVALID,
FormatString("Page size %zu too small", PageSize()));
nextPage_ = std::make_shared<SharedMemViewImpl>(tail_, sizeof(SharedMemView), lockId_);
RETURN_IF_NOT_OK(nextPage_->Init());
return Status::OK();
}
Status StreamDataPage::ResetToEmpty()
{
CHECK_FAIL_RETURN_STATUS(!isClient_, K_INVALID, "Only worker can init the page");
StreamPageLock xlock(shared_from_this());
RETURN_IF_NOT_OK(xlock.Lock(std::numeric_limits<uint64_t>::max()));
do {
uint32_t expected = __atomic_load_n(&pageHeader_->refCount_, __ATOMIC_RELAXED);
if (expected == 1) {
break;
}
VLOG(SC_NORMAL_LOG_LEVEL) << FormatString("Waiting for page<%s> to be unreferenced. Current ref count %zu",
GetPageId(), expected);
std::this_thread::sleep_for(std::chrono::milliseconds(1));
} while (true);
auto totalFreeSpace = PagePayloadSize();
__atomic_store_n(&pageHeader_->totalFreeSpace_, totalFreeSpace, __ATOMIC_RELAXED);
__atomic_store_n(&pageHeader_->slotCount_, 0, __ATOMIC_RELAXED);
__atomic_store_n(&pageHeader_->begCursor_, 0, __ATOMIC_RELAXED);
nextPage_->SetView(ShmView(), false, std::numeric_limits<uint64_t>::max());
UnsetPageHasBigElement();
return Status::OK();
}
Status StreamDataPage::InitEmptyPage()
{
CHECK_FAIL_RETURN_STATUS(!isClient_, K_INVALID, "Only worker can init the page");
size_t freeSpace = PageSize();
size_t destSz = reinterpret_cast<uint8_t *>(slotDir_) - startOfPage_;
freeSpace -= destSz;
auto rc = memset_s(startOfPage_, PageSize(), 0, destSz);
CHECK_FAIL_RETURN_STATUS(rc == 0, K_RUNTIME_ERROR, FormatString("memset_s fails. Errno = %d", errno));
uint8_t *endOfPage = startOfPage_ + PageSize();
auto *nextPtr = reinterpret_cast<uint8_t *>(tail_);
destSz = endOfPage - nextPtr;
RETURN_IF_NOT_OK(nextPage_->Init(true));
freeSpace -= destSz;
freeSpace -= GetMetaSize(isSharedPage_);
auto totalFreeSpace = PagePayloadSize();
CHECK_FAIL_RETURN_STATUS_PRINT_ERROR(
freeSpace == totalFreeSpace, K_RUNTIME_ERROR,
FormatString("Free space mismatch. Expect %zu but get %zu", freeSpace, totalFreeSpace));
CHECK_FAIL_RETURN_STATUS_PRINT_ERROR(
static_cast<size_t>(maxElementSize_) + GetMetaSize(isSharedPage_) == freeSpace, K_RUNTIME_ERROR,
FormatString("Expect max element size %zu + one slot == free space %zu", maxElementSize_, freeSpace));
__atomic_store_n(&pageHeader_->totalFreeSpace_, freeSpace, __ATOMIC_SEQ_CST);
__atomic_store_n(&pageHeader_->begCursor_, 1, __ATOMIC_SEQ_CST);
__atomic_store_n(&pageHeader_->refCount_, 1, __ATOMIC_SEQ_CST);
auto offset = static_cast<SlotOffset>(nextPtr - reinterpret_cast<uint8_t *>(slotDir_));
auto slotFlag = isSharedPage_ ? PAGE_SHARED_BIT : 0;
auto slotAddr = GetSlotAddr(0);
slotAddr->StoreAll(isSharedPage_, slotFlag, offset, 0);
VLOG(SC_INTERNAL_LOG_LEVEL) << FormatString(
"Init page<%s> success. freeSpace = %zu, maxElementSize = %zu, slotDir_[0] = %d", GetPageId(), freeSpace,
maxElementSize_, slotDir_[0]);
return Status::OK();
}
Status StreamDataPage::Lock(uint64_t timeoutMs)
{
return pageLock_->Lock(timeoutMs);
}
void StreamDataPage::Unlock()
{
pageLock_->Unlock();
}
void StreamDataPage::TryUnlockByLockId(uint32_t lockId)
{
if (pageLock_->TryUnlockByLockId(lockId)) {
(void)ReleasePage(FormatString("%s:%s", __FUNCTION__, __LINE__));
uint32_t slotCount = 0;
auto pendingSlotCount = GetSlotCount();
auto totalFreeSpace = PagePayloadSize();
for (uint32_t i = 0; i < pendingSlotCount; ++i) {
auto slot = i + 1;
if (GetSlotFlag(slot) & ELEMENT_DATA_CONSISTENT) {
++slotCount;
totalFreeSpace -= GetMetaSize(isSharedPage_);
totalFreeSpace -= (GetSlotOffset(slot - 1) - GetSlotOffset(slot));
} else {
break;
}
}
__atomic_store_n(&pageHeader_->slotCount_, slotCount, __ATOMIC_SEQ_CST);
__atomic_store_n(&pageHeader_->totalFreeSpace_, totalFreeSpace, __ATOMIC_SEQ_CST);
auto begCursor = pageHeader_->begCursor_;
VLOG(SC_NORMAL_LOG_LEVEL) << FormatString(
"[Page:%s] Page recover success. begCursor = %zu, slot count = %zu, freeSpace = %zu", GetPageId(),
begCursor, slotCount, totalFreeSpace);
Unlock();
}
}
uint64_t StreamDataPage::GetBegCursor() const
{
auto begCursor = __atomic_load_n(&pageHeader_->begCursor_, __ATOMIC_RELAXED);
return begCursor;
}
uint32_t StreamDataPage::GetSlotCount() const
{
return __atomic_load_n(&pageHeader_->slotCount_, __ATOMIC_ACQUIRE);
}
uint64_t StreamDataPage::GetLastCursor() const
{
return GetBegCursor() + GetSlotCount() - 1;
}
bool StreamDataPage::Empty() const
{
return GetSlotCount() == 0;
}
void StreamDataPage::UpdateSlotConsistentBit(uint32_t slot)
{
auto slotAddr = GetSlotAddr(slot);
slotAddr->SetFlagBit(ELEMENT_DATA_CONSISTENT);
}
uint32_t StreamDataPage::GetRefCount() const
{
return __atomic_load_n(&pageHeader_->refCount_, __ATOMIC_RELAXED);
}
Status StreamDataPage::RefPage(const std::string &logPrefix)
{
PerfPoint point(PerfKey::PAGE_REF_INC);
auto curCount = __atomic_fetch_add(&pageHeader_->refCount_, 1, __ATOMIC_RELAXED);
if (VLOG_IS_ON(SC_INTERNAL_LOG_LEVEL)) {
VLOG(SC_INTERNAL_LOG_LEVEL) << FormatString("[%s, Page:%s] refCount after increase: %zu", logPrefix,
GetPageId(), 1 + curCount);
}
return Status::OK();
}
Status StreamDataPage::ReleasePage(const std::string &logPrefix)
{
PerfPoint point(PerfKey::PAGE_REF_DEC);
constexpr static uint64_t MIN_REF_COUNT = 2;
bool success = false;
uint32_t curCount = 0;
do {
curCount = __atomic_load_n(&pageHeader_->refCount_, __ATOMIC_SEQ_CST);
CHECK_FAIL_RETURN_STATUS_PRINT_ERROR(
curCount >= MIN_REF_COUNT, K_RUNTIME_ERROR,
FormatString("[%s, Page:%s] Unexpected reference count %zu", logPrefix, GetPageId(), curCount));
success = __atomic_compare_exchange_n(&pageHeader_->refCount_, &curCount, curCount - 1, false, __ATOMIC_RELAXED,
__ATOMIC_RELAXED);
} while (!success);
const int warningVal = 1000;
if (curCount > warningVal) {
LOG(INFO) << FormatString("[%s, Page:%s] refCount after decrease: %zu", logPrefix, GetPageId(), curCount - 1);
} else {
const int logPerCount = VLOG_IS_ON(SC_INTERNAL_LOG_LEVEL) ? 1 : 1000;
LOG_EVERY_N(INFO, logPerCount) << FormatString("[%s, Page:%s] refCount after decrease: %zu", logPrefix,
GetPageId(), curCount - 1);
}
return Status::OK();
}
bool StreamDataPage::HasNextPage() const
{
ShmView v = GetNextPage();
return v.fd != 0 && v.fd != -1;
}
void StreamDataPage::SetNextPage(const ShmView &shm)
{
nextPage_->SetView(shm, false, std::numeric_limits<uint64_t>::max());
}
ShmView StreamDataPage::GetNextPage() const
{
ShmView v;
bool isFreePage;
nextPage_->GetView(v, isFreePage, std::numeric_limits<uint64_t>::max());
if (isFreePage) {
return {};
}
return v;
}
Status StreamDataPage::WakeUpConsumers()
{
return PageLock::FutexWake(&pageHeader_->slotCount_, &pageHeader_->slotWait_);
}
inline void SetAttributeBits(InsertFlags flags, SlotFlag &offset)
{
if (TESTFLAG(flags, InsertFlags::REMOTE_ELEMENT)) {
offset |= REMOTE_ELEMENT_BIT;
}
if (TESTFLAG(flags, InsertFlags::BIG_ELEMENT)) {
offset |= BIG_ELEMENT_BIT;
}
if (TESTFLAG(flags, InsertFlags::HEADER)) {
offset |= HEADER_BIT;
}
}
void StreamDataPage::SetPageHasBigElement()
{
SlotFlagOffset offset = __atomic_load_n(slotDir_, __ATOMIC_ACQUIRE);
offset |= BIG_ELEMENT_BIT;
__atomic_store_n(slotDir_, offset, __ATOMIC_RELEASE);
}
void StreamDataPage::UnsetPageHasBigElement()
{
SlotFlagOffset offset = __atomic_load_n(slotDir_, __ATOMIC_ACQUIRE);
offset &= ~BIG_ELEMENT_BIT;
__atomic_store_n(slotDir_, offset, __ATOMIC_RELEASE);
}
bool StreamDataPage::PageHasBigElement()
{
SlotFlagOffset offset = __atomic_load_n(slotDir_, __ATOMIC_RELAXED);
return TESTFLAG(offset, BIG_ELEMENT_BIT);
}
size_t StreamDataPage::GetMetaSize(bool isSharedPage)
{
return isSharedPage ? sizeof(SlotType) : sizeof(SlotFlagOffset);
}
size_t StreamDataPage::PageOverhead(bool isSharedPage)
{
return offsetof(StreamPageHeader, slot0_) + sizeof(SharedMemView) + GetMetaSize(isSharedPage)
+ GetMetaSize(isSharedPage);
}
size_t StreamDataPage::PagePayloadSize()
{
return PageSize() - offsetof(StreamPageHeader, slot0_) - sizeof(SharedMemView) - GetMetaSize(isSharedPage_);
}
size_t StreamDataPage::GetFreeSpaceSize()
{
return __atomic_load_n(&pageHeader_->totalFreeSpace_, __ATOMIC_RELAXED);
}
SlotFlag StreamDataPage::GetSlotFlag(size_t index)
{
auto addr = GetSlotAddr(index);
return isSharedPage_ ? addr->value.flag : (addr->flagWithOffset & ~SLOT_VALUE_MASK);
}
SlotOffset StreamDataPage::GetSlotOffset(size_t index)
{
auto addr = GetSlotAddr(index);
return isSharedPage_ ? addr->value.offset : (addr->flagWithOffset & SLOT_VALUE_MASK);
}
SlotType *StreamDataPage::GetSlotAddr(size_t index)
{
if (!isSharedPage_) {
return reinterpret_cast<SlotType *>(slotDir_ + index);
}
return reinterpret_cast<SlotType *>(slotDir_) + index;
}
Status StreamDataPage::ExtractBigElementsUpTo(uint64_t ackCursor, std::vector<std::pair<uint64_t, ShmView>> &bigId,
bool deCouple)
{
auto begCursor = GetBegCursor();
RETURN_OK_IF_TRUE(ackCursor < begCursor);
StreamPageLock pageLock(shared_from_this());
const uint64_t DEFAULT_TIMEOUT_MS = 1000;
RETURN_IF_NOT_OK(pageLock.Lock(DEFAULT_TIMEOUT_MS));
RETURN_OK_IF_TRUE(!PageHasBigElement());
size_t offset1 = reinterpret_cast<uint8_t *>(slotDir_) - startOfPage_;
auto slotCount = GetSlotCount();
for (size_t i = 0; i < slotCount; ++i) {
uint64_t cursor = begCursor + i;
if (cursor > ackCursor) {
break;
}
auto slotAddr = GetSlotAddr(i + 1);
DataElement ele;
ele.attr_ = slotAddr->LoadFlag(isSharedPage_);
if (!ele.DataIsReady()) {
break;
}
if (!ele.IsBigElement()) {
continue;
}
auto offset = slotAddr->LoadOffset(isSharedPage_);
SlotOffset b4 = GetSlotAddr(i)->LoadOffset(isSharedPage_);
ele.size = b4 - offset;
ele.ptr = startOfPage_ + offset1 + offset;
ele.id = cursor;
ShmView pageView;
RETURN_IF_NOT_OK_PRINT_ERROR_MSG(ParseShmViewPb(ele.ptr, ele.size, pageView), "ReleaseBigElementsUpTo");
if (deCouple) {
slotAddr->ClearFlagBit(BIG_ELEMENT_BIT);
}
bigId.emplace_back(ele.id, pageView);
}
return Status::OK();
}
Status StreamDataPage::Insert(const HeaderAndData &element, uint64_t timeoutMs, InsertFlags &flags,
const std::string &logPrefix)
{
INJECT_POINT("producer_insert");
PerfPoint point(PerfKey::PAGE_INSERT_ELEMENT);
auto *totalFreeSpace_ = &pageHeader_->totalFreeSpace_;
auto *slotCount_ = &pageHeader_->slotCount_;
size_t finalElementSize = element.TotalSize();
size_t spaceNeeded = GetMetaSize(isSharedPage_) + finalElementSize;
CHECK_FAIL_RETURN_STATUS_PRINT_ERROR(
finalElementSize <= static_cast<size_t>(maxElementSize_), K_INVALID,
FormatString("Element size %zu (plus internal overhead) is exceeding the maximum free space %zu",
finalElementSize, maxElementSize_));
CHECK_FAIL_RETURN_STATUS_PRINT_ERROR(element.size > 0, K_INVALID, "Element size should be greater than 0");
CHECK_FAIL_RETURN_STATUS_PRINT_ERROR(element.ptr != nullptr, K_INVALID, "Element ptr should not be a nullptr");
CHECK_FAIL_RETURN_STATUS_PRINT_ERROR(
(finalElementSize & ~(static_cast<uint64_t>(SLOT_VALUE_MASK))) == 0, K_INVALID,
FormatString("Element size %zu is exceeding the maximum length", finalElementSize));
StreamPageLock pageLock(shared_from_this());
auto totalFreeSpace = __atomic_load_n(totalFreeSpace_, __ATOMIC_RELAXED);
CHECK_FAIL_RETURN_STATUS(spaceNeeded <= totalFreeSpace, K_NO_SPACE, "Not enough space");
CHECK_FAIL_RETURN_STATUS(!HasNextPage(), K_SC_END_OF_PAGE, "Check next page for new elements");
if (!TESTFLAG(flags, InsertFlags::SKIP_LOCK)) {
RETURN_IF_NOT_OK(pageLock.Lock(timeoutMs));
}
INJECT_POINT("producer_obtained_lock");
auto begCursor = __atomic_load_n(&pageHeader_->begCursor_, __ATOMIC_RELAXED);
CHECK_FAIL_RETURN_STATUS(begCursor > 0, K_TRY_AGAIN, "Page is already recycled");
CHECK_FAIL_RETURN_STATUS(!HasNextPage(), K_SC_END_OF_PAGE, "Check next page for new elements.");
totalFreeSpace = __atomic_load_n(totalFreeSpace_, __ATOMIC_RELAXED);
CHECK_FAIL_RETURN_STATUS(spaceNeeded <= totalFreeSpace, K_NO_SPACE, "Not enough space");
totalFreeSpace = __atomic_sub_fetch(totalFreeSpace_, spaceNeeded, __ATOMIC_RELAXED);
INJECT_POINT("producer_update_free_space");
auto numElement = __atomic_load_n(slotCount_, __ATOMIC_ACQUIRE);
SlotOffset offset = GetSlotOffset(numElement) - static_cast<SlotOffset>(finalElementSize);
uint8_t *dest = reinterpret_cast<uint8_t *>(slotDir_) + offset;
SlotFlag slotFlag = 0;
SetAttributeBits(flags, slotFlag);
auto slotAddr = GetSlotAddr(numElement + 1);
slotAddr->StoreAll(isSharedPage_, slotFlag, offset, element.streamNo);
if (TESTFLAG(flags, InsertFlags::BIG_ELEMENT)) {
SetPageHasBigElement();
}
INJECT_POINT("producer_update_slot_directory");
__atomic_store_n(slotCount_, 1 + numElement, __ATOMIC_RELEASE);
INJECT_POINT("producer_update_pending_slot_count_holding_lock");
pageLock.Unlock();
INJECT_POINT("producer_update_pending_slot_count_without_lock");
PerfPoint perfPoint(PerfKey::PAGE_ELEMENT_MEMORY_COPY);
RETURN_IF_NOT_OK(element.MemoryCopyTo(dest));
perfPoint.RecordAndReset(PerfKey::PAGE_CAS_SLOT_COUNT);
UpdateSlotConsistentBit(numElement + 1);
perfPoint.Record();
if (!TESTFLAG(flags, InsertFlags::DELAY_WAKE)) {
RETURN_IF_NOT_OK(WakeUpConsumers());
}
const int logPerCount = VLOG_IS_ON(SC_INTERNAL_LOG_LEVEL) ? 1 : 1000;
LOG_EVERY_N(INFO, logPerCount) << FormatString(
"[%sCursor %zu] Add element success. slot = %zu, offset = %zu, length = %zu, freeSpace = %zu, bigEle = %s, "
"header = %s, sharedPage = %s, streamNo = %zu, pageId = %s",
(!logPrefix.empty() ? logPrefix + " " : ""), pageHeader_->begCursor_ + numElement, numElement + 1, offset,
finalElementSize, totalFreeSpace, BoolToString(TESTFLAG(flags, InsertFlags::BIG_ELEMENT)),
BoolToString(TESTFLAG(flags, InsertFlags::HEADER)), BoolToString(isSharedPage_), element.streamNo, GetPageId());
SETFLAG(flags, InsertFlags::INSERT_SUCCESS);
return Status::OK();
}
Status CalcMaxAllowRows(void *buf, std::vector<size_t> &sz, const size_t totalFreeSpace, bool isSharedPage,
StreamMetaShm *streamMetaShm, uint8_t *&src, size_t &spaceNeeded, size_t &numInsert,
size_t &totalLength)
{
size_t bufSz = std::accumulate(sz.begin(), sz.end(), 0ul);
src = reinterpret_cast<uint8_t *>(buf) + bufSz;
spaceNeeded = 0;
for (size_t i = 0; i < sz.size(); ++i) {
auto eleSz = sz[i];
auto sizeNeeded = eleSz + StreamDataPage::GetMetaSize(isSharedPage);
if (spaceNeeded + sizeNeeded > totalFreeSpace) {
break;
}
if (streamMetaShm != nullptr) {
RETURN_IF_NOT_OK(streamMetaShm->TryIncUsage(eleSz));
}
spaceNeeded += sizeNeeded;
src -= eleSz;
totalLength += eleSz;
++numInsert;
}
return Status::OK();
}
Status StreamDataPage::BatchInsert(void *buf, std::vector<size_t> &sz, uint64_t timeoutMs,
std::pair<size_t, size_t> &res, InsertFlags flags,
const std::vector<bool> &headerBits, StreamMetaShm *streamMetaShm)
{
CHECK_FAIL_RETURN_STATUS_PRINT_ERROR(!isSharedPage_, K_RUNTIME_ERROR,
FormatString("BatchInsert not allow apply for shared page %s", GetPageId()));
size_t numInsert = 0;
size_t totalLength = 0;
Raii raii([&res, &numInsert, &totalLength]() {
res.first = numInsert;
res.second = totalLength;
});
auto *totalFreeSpace_ = &pageHeader_->totalFreeSpace_;
auto *slotCount_ = &pageHeader_->slotCount_;
StreamPageLock pageLock(shared_from_this());
if (!TESTFLAG(flags, InsertFlags::SKIP_LOCK)) {
RETURN_IF_NOT_OK(pageLock.Lock(timeoutMs));
}
auto begCursor = __atomic_load_n(&pageHeader_->begCursor_, __ATOMIC_RELAXED);
CHECK_FAIL_RETURN_STATUS(begCursor > 0, K_TRY_AGAIN, "Page is already recycled");
CHECK_FAIL_RETURN_STATUS(!HasNextPage(), K_SC_END_OF_PAGE, "Check next page for new elements");
auto totalFreeSpace = __atomic_load_n(totalFreeSpace_, __ATOMIC_RELAXED);
uint8_t *src = nullptr;
size_t spaceNeeded = 0;
auto rc = CalcMaxAllowRows(buf, sz, totalFreeSpace, isSharedPage_, streamMetaShm, src, spaceNeeded, numInsert,
totalLength);
if (numInsert == 0) {
return rc.IsError() ? rc : Status(K_NO_SPACE, "Not enough space");
}
uint32_t numElement = __atomic_load_n(slotCount_, __ATOMIC_ACQUIRE);
totalFreeSpace = __atomic_sub_fetch(totalFreeSpace_, spaceNeeded, __ATOMIC_RELAXED);
for (size_t i = 0; i < numInsert; ++i) {
auto slot = numElement + i + 1;
SlotFlagOffset offset = GetSlotOffset(slot - 1) - static_cast<uint32_t>(sz[i]);
if (headerBits[i]) {
SetAttributeBits(flags | InsertFlags::HEADER, offset);
} else {
SetAttributeBits(flags, offset);
}
__atomic_store_n(static_cast<SlotFlagOffset *>(slotDir_ + slot), offset, __ATOMIC_RELEASE);
}
if (TESTFLAG(flags, InsertFlags::BIG_ELEMENT)) {
SetPageHasBigElement();
}
uint8_t *dest = reinterpret_cast<uint8_t *>(slotDir_) + GetSlotOffset(numElement + numInsert);
__atomic_store_n(slotCount_, numElement + static_cast<int32_t>(numInsert), __ATOMIC_RELEASE);
pageLock.Unlock();
RETURN_IF_NOT_OK(HugeMemoryCopy(dest, totalLength, src, totalLength));
for (size_t i = 0; i < numInsert; ++i) {
auto slot = numElement + i + 1;
UpdateSlotConsistentBit(slot);
}
if (!TESTFLAG(flags, InsertFlags::DELAY_WAKE)) {
RETURN_IF_NOT_OK(WakeUpConsumers());
}
VLOG(SC_INTERNAL_LOG_LEVEL) << FormatString(
"Batch add %zu element into %s success. "
"begCursor = %zu, freeSpace = %zu, bigEle = %s",
numInsert, GetPageId(), pageHeader_->begCursor_ + numElement, totalFreeSpace,
BoolToString(TESTFLAG(flags, InsertFlags::BIG_ELEMENT)));
return Status::OK();
}
Status StreamDataPage::WaitForNewElement(uint64_t lastRecvCursor, uint64_t timeoutMs)
{
uint32_t slotCount = GetSlotCount();
auto endCursor = pageHeader_->begCursor_ + slotCount;
RETURN_OK_IF_TRUE(lastRecvCursor + 1 < endCursor);
if (HasNextPage()) {
slotCount = GetSlotCount();
endCursor = pageHeader_->begCursor_ + slotCount;
RETURN_OK_IF_TRUE(lastRecvCursor + 1 < endCursor);
RETURN_STATUS(K_SC_END_OF_PAGE, "Check next page for new elements.");
}
if (timeoutMs == 0) {
RETURN_STATUS(K_TRY_AGAIN, "Non-blocking call and there is no new element inserted");
}
INJECT_POINT("StreamDataPage.WaitOnFutexForever", [this, timeoutMs, slotCount]() {
LOG(INFO) << "Wait for signal from producer. TimeoutMs = " << timeoutMs << ". slotCount = " << slotCount;
while (!HasNextPage()) {
std::this_thread::yield();
}
const auto sleepMs = 5'000ul;
std::this_thread::sleep_for(std::chrono::milliseconds(sleepMs));
return Status::OK();
});
Status rc = PageLock::FutexWait(&pageHeader_->slotCount_, &pageHeader_->slotWait_, slotCount, timeoutMs);
if (rc.IsOk()) {
slotCount = GetSlotCount();
endCursor = pageHeader_->begCursor_ + slotCount;
RETURN_OK_IF_TRUE(lastRecvCursor + 1 < endCursor);
RETURN_STATUS(K_TRY_AGAIN, "No new element inserted");
}
return rc;
}
Status StreamDataPage::ParseShmViewPb(const void *ptr, size_t sz, ShmView &out)
{
ShmViewPb pb;
bool success = pb.ParseFromArray(ptr, sz);
CHECK_FAIL_RETURN_STATUS_PRINT_ERROR(success, K_OUT_OF_RANGE, "ShmViewPb parse error");
ShmView v{ .fd = pb.fd(), .mmapSz = pb.mmap_size(), .off = static_cast<ptrdiff_t>(pb.offset()), .sz = pb.size() };
out = v;
return Status::OK();
}
Status StreamDataPage::SerializeToShmViewPb(const ShmView &pageView, std::string &out)
{
ShmViewPb pb;
pb.set_fd(pageView.fd);
pb.set_mmap_size(pageView.mmapSz);
pb.set_offset(pageView.off);
pb.set_size(pageView.sz);
bool rc = pb.SerializeToString(&out);
CHECK_FAIL_RETURN_STATUS(rc, K_RUNTIME_ERROR, "Serialization error");
return Status::OK();
}
Status StreamDataPage::Receive(uint64_t lastRecvCursor, uint64_t timeoutMs, std::vector<DataElement> &out,
const std::string &logPrefix)
{
const auto &begCursor = pageHeader_->begCursor_;
auto startCursor = lastRecvCursor + 1;
if (startCursor < begCursor) {
RETURN_STATUS_LOG_ERROR(
K_OUT_OF_RANGE, FormatString("[P:%s] Starting read position %zu not on this page [%zu, %zu)", GetPageId(),
startCursor, begCursor, begCursor + GetSlotCount()));
}
INJECT_POINT("StreamDataPage::Receive.sleep");
RETURN_IF_NOT_OK(WaitForNewElement(lastRecvCursor, timeoutMs));
uint32_t slotCount = GetSlotCount();
auto endCursor = begCursor + slotCount;
size_t offset1 = reinterpret_cast<uint8_t *>(slotDir_) - startOfPage_;
for (auto i = startCursor; i < endCursor; ++i) {
auto slot = i - begCursor;
auto slotAddr = GetSlotAddr(slot + 1);
DataElement ele;
ele.attr_ = slotAddr->LoadFlag(isSharedPage_, __ATOMIC_ACQUIRE);
INJECT_POINT("StreamDataPage::Receive.fake.BIG_ELEMENT", [startCursor, begCursor, &ele]() {
LOG(INFO) << "startCursor = " << startCursor << " begCursor = " << begCursor;
if (startCursor != begCursor) {
ele.attr_ |= BIG_ELEMENT_BIT;
ele.attr_ |= ELEMENT_DATA_CONSISTENT;
}
return Status::OK();
});
if (!ele.DataIsReady()) {
break;
}
auto offset = slotAddr->LoadOffset(isSharedPage_, __ATOMIC_ACQUIRE);
SlotOffset b4 = GetSlotAddr(slot)->LoadOffset(isSharedPage_);
ele.size = b4 - offset;
ele.ptr = startOfPage_ + offset1 + offset;
ele.id = i;
ele.streamNo_ = slotAddr->LoadStreamNo(isSharedPage_);
const int logPerCount = VLOG_IS_ON(SC_INTERNAL_LOG_LEVEL) ? 1 : 1000;
LOG_EVERY_N(INFO, logPerCount) << FormatString(
"[%sCursor %zu] Fetch element success. slot = %zu, offset = %zu, length = %zu, local = %s, "
"bigElement = %s, header = %s, sharedPage = %s, streamNo = %zu, pageId = %s",
(!logPrefix.empty() ? logPrefix + " " : ""), ele.id, slot + 1, offset, ele.size,
BoolToString(!ele.IsRemote()), BoolToString(ele.IsBigElement()), BoolToString(ele.HasHeader()),
BoolToString(isSharedPage_), ele.streamNo_, GetPageId());
out.emplace_back(ele);
}
return Status::OK();
}
Status StreamDataPage::Seal(const ShmView &nextPage, uint64_t timeoutMs,
std::function<Status(const ShmView &, std::shared_ptr<StreamDataPage> &)> locatePage,
const std::string &logPrefix)
{
CHECK_FAIL_RETURN_STATUS(nextPage.fd > 0, K_INVALID,
FormatString("[%s] Seal a page with invalid pointer %s", GetPageId(), nextPage.ToStr()));
StreamPageLock pageLock(shared_from_this());
RETURN_IF_NOT_OK(pageLock.Lock(timeoutMs));
CHECK_FAIL_RETURN_STATUS(!Empty(), K_RUNTIME_ERROR, FormatString("[%s] Empty page", GetPageId()));
ShmView v = GetNextPage();
if (v.fd <= 0) {
uint64_t lastAppendCursor = GetLastCursor();
std::shared_ptr<StreamDataPage> page;
RETURN_IF_NOT_OK(locatePage(nextPage, page));
auto func = [this, &nextPage, lastAppendCursor, &page]() {
__atomic_store_n(&page->pageHeader_->begCursor_, lastAppendCursor + 1, __ATOMIC_SEQ_CST);
tail_->CopyFrom(nextPage);
};
nextPage_->LockExclusiveAndExec(func, std::numeric_limits<uint64_t>::max());
VLOG(SC_INTERNAL_LOG_LEVEL) << FormatString("[%s] Chain page<%s> [%zu, ) to page<%s> [%zu, %zu]", logPrefix,
page->GetPageId(), lastAppendCursor + 1, GetPageId(),
GetBegCursor(), lastAppendCursor);
RETURN_IF_NOT_OK(WakeUpConsumers());
return Status::OK();
}
CHECK_FAIL_RETURN_STATUS(v == nextPage, K_RUNTIME_ERROR,
FormatString("Page<%s> is sealed already. Next page %s", GetPageId(), v.ToStr()));
RETURN_STATUS(K_DUPLICATED, FormatString("Page<%s> is sealed already", GetPageId()));
}
size_t StreamDataPage::GetTotalEleSize()
{
if (GetSlotCount() == 0) {
return 0;
}
auto end = GetSlotAddr(0)->LoadOffset(isSharedPage_);
auto start = GetSlotAddr(GetSlotCount())->LoadOffset(isSharedPage_);
if (end <= start) {
LOG(WARNING) << FormatString("The layout of this page may be confusing, start: %zu, end: %zu", start, end);
return 0;
}
return end - start;
}
Status DataElement::CheckAttribute() const
{
auto highBits = attr_;
CLEARFLAG(highBits, SLOT_VALUE_MASK);
CLEARFLAG(highBits, ELEMENT_DATA_CONSISTENT);
CLEARFLAG(highBits, REMOTE_ELEMENT_BIT);
CLEARFLAG(highBits, BIG_ELEMENT_BIT);
CLEARFLAG(highBits, HEADER_BIT);
INJECT_POINT("StreamDataPage.CheckHighBits", [&highBits](uint32_t v) {
highBits = v;
return Status::OK();
});
RETURN_OK_IF_TRUE(highBits == 0);
std::stringstream oss;
oss << "Incompatibility with up level worker detected. Slot value = 0x" << std::hex << attr_;
RETURN_STATUS_LOG_ERROR(K_CLIENT_WORKER_VERSION_MISMATCH, oss.str());
}
bool DataElement::DataIsReady() const
{
return TESTFLAG(attr_, ELEMENT_DATA_CONSISTENT);
}
bool DataElement::IsRemote() const
{
return TESTFLAG(attr_, REMOTE_ELEMENT_BIT);
}
bool DataElement::IsBigElement() const
{
return TESTFLAG(attr_, BIG_ELEMENT_BIT);
}
bool DataElement::HasHeader() const
{
return TESTFLAG(attr_, HEADER_BIT);
}
uint64_t DataElement::GetStreamNo() const
{
return streamNo_;
}
}