* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Description: SharedPageQueue test
*/
#include <map>
#include "ut/common.h"
#include "../../../common/binmock/binmock.h"
#include "datasystem/common/constants.h"
#include "datasystem/stream/stream_config.h"
#include "datasystem/worker/stream_cache/client_worker_sc_service_impl.h"
#include "datasystem/common/stream_cache/stream_data_page.h"
#include "datasystem/common/stream_cache/stream_fields.h"
#include "datasystem/worker/stream_cache/page_queue/page_queue_handler.h"
#include "datasystem/worker/stream_cache/page_queue/shared_page_queue.h"
#include "datasystem/worker/stream_cache/remote_worker_manager.h"
#include "datasystem/common/util/uuid_generator.h"
DS_DECLARE_uint32(sc_shared_page_size_mb);
using namespace ::testing;
using namespace datasystem::worker::stream_cache;
namespace datasystem {
namespace ut {
constexpr int K_TWO = 2;
constexpr uint64_t SHM_CAP = 128L * 1024L * 1024L;
class SharedPageQueueTest : public CommonTest {
public:
void SetUp() override
{
datasystem::memory::Allocator::Instance()->Init(SHM_CAP);
allocate_ = std::make_shared<worker::stream_cache::WorkerSCAllocateMemory>(nullptr);
HostPort hostPort;
LOG_IF_ERROR(hostPort.ParseString("127.0.0.1:9000"), "ParseString failed");
svc_ = std::make_shared<ClientWorkerSCServiceImpl>(hostPort, hostPort, nullptr, nullptr, allocate_);
svc_->Init();
CommonTest::SetUp();
}
void TearDown() override
{
CommonTest::TearDown();
}
std::shared_ptr<SharedPageQueue> CreateSharedPageQueue()
{
HostPort hostPort;
LOG_IF_ERROR(hostPort.ParseString("127.0.0.1:9000"), "ParseString failed");
FLAGS_sc_shared_page_size_mb = 1;
return std::make_shared<SharedPageQueue>("tenant", std::move(hostPort), 0, allocate_, svc_.get());
}
protected:
std::shared_ptr<WorkerSCAllocateMemory> allocate_;
std::shared_ptr<ClientWorkerSCServiceImpl> svc_;
};
TEST_F(SharedPageQueueTest, TestAllocateSharedPage)
{
StreamFields streamFields;
streamFields.streamMode_ = StreamMode::MPSC;
PageQueueHandler handler(nullptr, Optional<StreamFields>(streamFields));
auto pageQueue = CreateSharedPageQueue();
handler.SetSharedPageQueue(pageQueue);
std::shared_ptr<Cursor> out;
ShmView view;
DS_ASSERT_OK(handler.AddCursor("id", true, out, view));
out->SetClientVersion(Cursor::K_CURSOR_SIZE_V2);
const int timeoutMs = 3000;
ShmView preLastView;
std::shared_ptr<StreamDataPage> lastPage;
DS_ASSERT_OK(pageQueue->CreateOrGetLastDataPage(timeoutMs, preLastView, lastPage, false));
ASSERT_TRUE(lastPage->IsSharedPage());
preLastView = lastPage->GetShmView();
std::shared_ptr<StreamDataPage> lastPage2;
DS_ASSERT_OK(pageQueue->CreateOrGetLastDataPage(timeoutMs, preLastView, lastPage2, false));
ASSERT_TRUE(lastPage2->IsSharedPage());
ASSERT_EQ(lastPage, lastPage2);
auto shmUnitInfo = lastPage->GetShmUnitInfo();
shmUnitInfo->pointer = reinterpret_cast<uint8_t *>(shmUnitInfo->pointer) - shmUnitInfo->offset;
int lockId = 12;
auto clientPage = std::make_shared<StreamDataPage>(shmUnitInfo, lockId, true, false);
DS_ASSERT_OK(clientPage->Init());
ASSERT_TRUE(clientPage->IsSharedPage());
int lockTimeout = 10000;
out->SetLastLockedPage(clientPage->GetShmView(), lockTimeout);
StreamPageLock pageLock(clientPage);
DS_ASSERT_OK(pageLock.Lock(lockTimeout));
pageQueue->TryUnlockByLockId(lockId);
StreamPageLock pageLock2(clientPage);
DS_ASSERT_OK(pageLock2.Lock(lockTimeout));
}
TEST_F(SharedPageQueueTest, TestReadWrite)
{
auto pageQueue = CreateSharedPageQueue();
const int timeoutMs = 3000;
const size_t eleSz1 = 102ul;
const size_t eleSz2 = 1004ul;
std::string data1(eleSz1, 'a');
std::string data2(eleSz2, 'b');
ShmView preLastView;
std::shared_ptr<StreamDataPage> lastPage;
DS_ASSERT_OK(pageQueue->CreateOrGetLastDataPage(timeoutMs, preLastView, lastPage, false));
uint64_t streamNo1 = 111;
uint64_t streamNo2 = 112;
HeaderAndData element1((uint8_t *)data1.c_str(), data1.size(), streamNo1);
HeaderAndData element2((uint8_t *)data2.c_str(), data2.size(), streamNo2);
InsertFlags flags = InsertFlags::NONE;
DS_ASSERT_OK(lastPage->Insert(element1, timeoutMs, flags));
DS_ASSERT_OK(lastPage->Insert(element2, timeoutMs, flags));
std::vector<DataElement> out;
DS_ASSERT_OK(lastPage->Receive(0, timeoutMs, out));
ASSERT_EQ(out.size(), 2ul);
std::string r1((char *)out[0].ptr, out[0].size);
std::string r2((char *)out[1].ptr, out[1].size);
ASSERT_EQ(data1, r1);
ASSERT_EQ(out[0].streamNo_, streamNo1);
ASSERT_EQ(data2, r2);
ASSERT_EQ(out[1].streamNo_, streamNo2);
}
TEST_F(SharedPageQueueTest, TestWriteMaxSize)
{
auto pageQueue = CreateSharedPageQueue();
const int timeoutMs = 3000;
InsertFlags flags = InsertFlags::NONE;
ShmView preLastView;
std::shared_ptr<StreamDataPage> lastPage;
DS_ASSERT_OK(pageQueue->CreateOrGetLastDataPage(timeoutMs, preLastView, lastPage, false));
const size_t elementMaxSize = FLAGS_sc_shared_page_size_mb * MB_TO_BYTES - StreamDataPage::PageOverhead(true);
std::string data1(elementMaxSize + 1, 'a');
uint64_t streamNo1 = 111;
HeaderAndData element1((uint8_t *)data1.c_str(), data1.size(), streamNo1);
DS_ASSERT_NOT_OK(lastPage->Insert(element1, timeoutMs, flags));
std::string data2(elementMaxSize, 'a');
uint64_t streamNo2 = 112;
HeaderAndData element2((uint8_t *)data2.c_str(), data2.size(), streamNo2);
DS_ASSERT_OK(lastPage->Insert(element2, timeoutMs, flags));
std::vector<DataElement> out;
DS_ASSERT_OK(lastPage->Receive(0, timeoutMs, out));
ASSERT_EQ(out.size(), 1ul);
std::string r2((char *)out[0].ptr, out[0].size);
ASSERT_EQ(data2, r2);
ASSERT_EQ(out[0].streamNo_, streamNo2);
}
TEST_F(SharedPageQueueTest, TestWriteFull)
{
auto pageQueue = CreateSharedPageQueue();
const int timeoutMs = 3000;
InsertFlags flags = InsertFlags::NONE;
ShmView preLastView;
std::shared_ptr<StreamDataPage> lastPage;
DS_ASSERT_OK(pageQueue->CreateOrGetLastDataPage(timeoutMs, preLastView, lastPage, false));
size_t remainingSize = lastPage->PagePayloadSize();
size_t count = 30;
size_t perSize = remainingSize % count;
uint64_t streamNo = 111;
size_t metaSize = StreamDataPage::GetMetaSize(true);
while (remainingSize > StreamDataPage::GetMetaSize(true)) {
size_t dataWithMetaSize = std::max(perSize, remainingSize);
size_t dataSize = dataWithMetaSize - metaSize;
remainingSize -= dataWithMetaSize;
std::string data(dataSize, 'a');
HeaderAndData element((uint8_t *)data.c_str(), data.size(), streamNo);
DS_ASSERT_OK(lastPage->Insert(element, timeoutMs, flags));
}
ASSERT_EQ(lastPage->GetFreeSpaceSize(), remainingSize);
std::string data(1, 'a');
uint64_t streamNo1 = 111;
HeaderAndData element((uint8_t *)data.c_str(), data.size(), streamNo1);
DS_ASSERT_NOT_OK(lastPage->Insert(element, timeoutMs, flags));
std::vector<DataElement> out;
DS_ASSERT_OK(lastPage->Receive(0, timeoutMs, out));
size_t recvSize = 0;
for (const auto &e : out) {
ASSERT_EQ(e.streamNo_, streamNo);
recvSize += e.size + metaSize;
}
ASSERT_EQ(recvSize + remainingSize, lastPage->PagePayloadSize());
}
TEST_F(SharedPageQueueTest, TestAllocMemeoryAndRelease)
{
BINEXPECT_CALL(&PageQueueHandler::CreateExclusivePageQueue, (_, _)).Times(1).WillOnce(Return(nullptr));
PageQueueHandler handler(nullptr, Optional<StreamFields>());
auto pageQueue = CreateSharedPageQueue();
handler.SetSharedPageQueue(pageQueue);
ASSERT_EQ(handler.GetPageSize(), FLAGS_sc_shared_page_size_mb * MB_TO_BYTES);
size_t pageSize = 1024 * 1024 * 5;
std::shared_ptr<ShmUnitInfo> pageUnitInfo;
DS_ASSERT_OK(handler.AllocMemory(pageSize, true, pageUnitInfo, false));
handler.DumpPoolPages(1);
ShmView shmView{
.fd = pageUnitInfo->fd, .mmapSz = pageUnitInfo->mmapSize, .off = pageUnitInfo->offset, .sz = pageUnitInfo->size
};
DS_ASSERT_OK(handler.ReleaseMemory(shmView));
handler.DumpPoolPages(1);
}
TEST_F(SharedPageQueueTest, TestRemoteAck)
{
auto pageQueue = CreateSharedPageQueue();
const int timeoutMs = 3000;
const size_t eleSz1 = 102ul;
const size_t eleSz2 = 1004ul;
std::string data1(eleSz1, 'a');
std::string data2(eleSz2, 'b');
ShmView preLastView;
std::shared_ptr<StreamDataPage> lastPage;
DS_ASSERT_OK(pageQueue->CreateOrGetLastDataPage(timeoutMs, preLastView, lastPage, false));
uint64_t streamNo1 = 111;
uint64_t streamNo2 = 112;
HeaderAndData element1((uint8_t *)data1.c_str(), data1.size(), streamNo1);
HeaderAndData element2((uint8_t *)data2.c_str(), data2.size(), streamNo2);
InsertFlags flags = InsertFlags::NONE;
DS_ASSERT_OK(lastPage->Insert(element1, timeoutMs, flags));
DS_ASSERT_OK(lastPage->Insert(element2, timeoutMs, flags));
std::vector<DataElement> out;
DS_ASSERT_OK(lastPage->Receive(0, timeoutMs, out));
ASSERT_EQ(out.size(), 2ul);
std::string r1((char *)out[0].ptr, out[0].size);
std::string r2((char *)out[1].ptr, out[1].size);
ASSERT_EQ(data1, r1);
ASSERT_EQ(out[0].streamNo_, streamNo1);
ASSERT_EQ(data2, r2);
ASSERT_EQ(out[1].streamNo_, streamNo2);
BINEXPECT_CALL(&RemoteWorkerManager::GetLastAckCursor, (_)).WillRepeatedly(Return(K_TWO));
DS_ASSERT_OK(pageQueue->RemoteAck());
}
}
}