* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Description: Test StreamPage StreamPageOwner classes.
*/
#include "ut/common.h"
#include "datasystem/common/constants.h"
#include "datasystem/common/shared_memory/allocator.h"
#include "datasystem/common/stream_cache/cursor.h"
#include "datasystem/common/util/thread_pool.h"
#include "datasystem/stream/stream_config.h"
#include "datasystem/worker/stream_cache/producer.h"
namespace datasystem {
namespace ut {
constexpr size_t NUM_THREADS = 4;
constexpr uint64_t SHM_CAP = 64L * 1024L * 1024L;
class StreamCursorTest : public CommonTest {
protected:
void SetUp() override;
void TearDown() override;
StreamCursorTest() : pool_(NUM_THREADS)
{
}
~StreamCursorTest() override = default;
std::shared_ptr<ShmUnit> shmUnit_;
std::shared_ptr<datasystem::Cursor> cursor_;
ThreadPool pool_;
};
void StreamCursorTest::SetUp()
{
FLAGS_v = datasystem::SC_INTERNAL_LOG_LEVEL;
datasystem::memory::Allocator::Instance()->Init(SHM_CAP);
shmUnit_ = std::make_shared<ShmUnit>();
DS_ASSERT_OK(shmUnit_->AllocateMemory("CursorTest", Cursor::K_CURSOR_SIZE_V2, false));
cursor_ = std::make_shared<datasystem::Cursor>(shmUnit_->GetPointer(), Cursor::K_CURSOR_SIZE_V2, 0);
ASSERT_EQ(cursor_->Init(), Status::OK());
}
void StreamCursorTest::TearDown()
{
}
TEST_F(StreamCursorTest, TestShmViewAndFutexArea)
{
cursor_->InitFutexArea();
const int32_t val = K_OUT_OF_MEMORY;
auto fut = pool_.Submit([this]() {
ShmView view;
Timer t(DEFAULT_TIMEOUT_MS);
while (t.GetRemainingTimeMs() > 0) {
RETURN_IF_NOT_OK(cursor_->GetLastPageView(view, DEFAULT_TIMEOUT_MS));
if (view != ShmView()) {
break;
}
}
LOG(INFO) << "LastPage view: " << view.ToStr();
CHECK_FAIL_RETURN_STATUS(view == shmUnit_->GetShmView(), K_RUNTIME_ERROR, "GetLastPageView error");
size_t numWaiter;
RETURN_IF_NOT_OK(cursor_->Wake(val, numWaiter));
LOG(INFO) << "Wake up " << numWaiter << " waiters";
return Status::OK();
});
DS_ASSERT_OK(cursor_->SetLastPage(shmUnit_->GetShmView(), DEFAULT_TIMEOUT_MS));
int32_t fetchVal;
DS_ASSERT_OK(cursor_->Wait(DEFAULT_TIMEOUT_MS, fetchVal));
ASSERT_EQ(val, fetchVal);
DS_ASSERT_OK(fut.get());
}
TEST_F(StreamCursorTest, TestForceClose)
{
auto fut = pool_.Submit([this]() {
Timer t(DEFAULT_TIMEOUT_MS);
while (t.GetRemainingTimeMs() > 0) {
if (cursor_->ForceClose()) {
return Status::OK();
}
}
RETURN_STATUS(K_RUNTIME_ERROR, "Force close error");
});
cursor_->SetForceClose();
DS_ASSERT_OK(fut.get());
}
TEST_F(StreamCursorTest, TestSetCusorToProducer)
{
auto producer = std::make_shared<datasystem::worker::stream_cache::Producer>("producerId", "StreamName", nullptr);
auto copyCursor = cursor_;
producer->SetCursor(std::move(copyCursor));
auto fut = pool_.Submit([this]() {
Timer t(DEFAULT_TIMEOUT_MS);
while (t.GetRemainingTimeMs() > 0) {
if (cursor_->ForceClose()) {
return Status::OK();
}
}
RETURN_STATUS(K_RUNTIME_ERROR, "Force close error");
});
producer->SetForceClose();
DS_ASSERT_OK(fut.get());
}
}
}