* Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Description: Usage Monitor test
*/
#include <chrono>
#include <cstdint>
#include <functional>
#include <string>
#include <unordered_map>
#include <utility>
#include "ut/common.h"
#include "datasystem/common/util/random_data.h"
#include "datasystem/common/util/status_helper.h"
#include "datasystem/worker/stream_cache/client_worker_sc_service_impl.h"
#include "datasystem/worker/stream_cache/usage_monitor.h"
using namespace datasystem::worker::stream_cache;
namespace datasystem {
namespace worker {
namespace stream_cache {
class ClientWorkerSCServiceImplMock : public ClientWorkerSCServiceImpl {
public:
ClientWorkerSCServiceImplMock() : ClientWorkerSCServiceImpl(HostPort(), HostPort(), nullptr, nullptr, nullptr){};
Status SendBlockProducerReq(const std::string &streamName, const std::string &remoteWorkerAddr)
{
std::string id = streamName + remoteWorkerAddr;
blockedProducers.insert(id);
return Status::OK();
}
Status SendUnBlockProducerReq(const std::string &streamName, const std::string &remoteWorkerAddr)
{
std::string id = streamName + remoteWorkerAddr;
blockedProducers.erase(id);
return Status::OK();
}
std::multiset<std::string> blockedProducers;
};
}
}
namespace ut {
static constexpr uint64_t DEFAULT_TOTAL_SIZE = 10;
class UsageMonitorTest : public CommonTest {
public:
UsageMonitorTest()
{
}
~UsageMonitorTest() override = default;
void SetUp() override
{
FLAGS_v = 2;
cliWorkerMockPtr_ = std::make_shared<ClientWorkerSCServiceImplMock>();
impl_ = cliWorkerMockPtr_.get();
usageMonitor_ = std::make_unique<UsageMonitor>(impl_, DEFAULT_TOTAL_SIZE);
DS_ASSERT_OK(usageMonitor_->Init());
CommonTest::SetUp();
}
void TearDown() override
{
if (usageMonitor_) {
usageMonitor_->Stop();
usageMonitor_.reset();
}
CommonTest::TearDown();
}
std::shared_ptr<ClientWorkerSCServiceImplMock> cliWorkerMockPtr_;
ClientWorkerSCServiceImpl *impl_;
std::unique_ptr<UsageMonitor> usageMonitor_;
};
TEST_F(UsageMonitorTest, TestNormalCase)
{
const std::string streamName(RandomData().GetRandomString(8));
const std::string remoteWorkerAddr(RandomData().GetRandomString(8));
DS_ASSERT_OK(usageMonitor_->ReserveMemory(streamName, 0));
usageMonitor_->IncUsage(streamName, remoteWorkerAddr, 1);
usageMonitor_->DecUsage(streamName, remoteWorkerAddr, 1);
ASSERT_FALSE(usageMonitor_->CheckOverUsed().IsError());
usageMonitor_->IncUsage(streamName, remoteWorkerAddr, 4);
usageMonitor_->IncUsage(streamName, remoteWorkerAddr, 5);
ASSERT_TRUE(usageMonitor_->CheckOverUsed(0.8).IsError());
ASSERT_FALSE(usageMonitor_->CheckOverUsed(1).IsError());
usageMonitor_->IncUsage(streamName, remoteWorkerAddr, 11);
ASSERT_TRUE(usageMonitor_->CheckOverUsed().IsError());
}
TEST_F(UsageMonitorTest, TestNormalCaseUsagePerStream)
{
const std::string streamName(RandomData().GetRandomString(8));
const std::string remoteWorkerAddr1(RandomData().GetRandomString(8));
const std::string remoteWorkerAddr2(RandomData().GetRandomString(8));
DS_ASSERT_OK(usageMonitor_->ReserveMemory(streamName, 0));
usageMonitor_->IncUsage(streamName, remoteWorkerAddr1, 1);
usageMonitor_->DecUsage(streamName, remoteWorkerAddr1, 1);
ASSERT_FALSE(usageMonitor_->CheckNIncOverUsedForStream(streamName, remoteWorkerAddr1, 0, 1, 0).IsError());
usageMonitor_->IncUsage(streamName, remoteWorkerAddr1, 4);
usageMonitor_->IncUsage(streamName, remoteWorkerAddr2, 5);
const double threshold = 0.8;
ASSERT_TRUE(usageMonitor_->CheckNIncOverUsedForStream(streamName, remoteWorkerAddr1, 0, threshold, 0).IsError());
ASSERT_FALSE(usageMonitor_->CheckNIncOverUsedForStream(streamName, remoteWorkerAddr1, 0, 1, 0).IsError());
usageMonitor_->IncUsage(streamName, remoteWorkerAddr1, 11);
ASSERT_TRUE(usageMonitor_->CheckNIncOverUsedForStream(streamName, remoteWorkerAddr1, 0, 1, 0).IsError());
}
TEST_F(UsageMonitorTest, TestBGThreadLogic)
{
const std::string streamName(RandomData().GetRandomString(8));
const std::string remoteWorkerAddr(RandomData().GetRandomString(8));
DS_ASSERT_OK(usageMonitor_->ReserveMemory(streamName, 0));
usageMonitor_->IncUsage(streamName, remoteWorkerAddr, 1);
std::shared_ptr<UsageItem> usageItem;
usageMonitor_->GetMostUsed(usageItem);
ASSERT_FALSE(usageMonitor_->CheckOverUsed().IsError());
sleep(1);
ASSERT_FALSE(usageItem->usageBlocked);
usageMonitor_->IncUsage(streamName, remoteWorkerAddr, 11);
ASSERT_TRUE(usageMonitor_->CheckOverUsed().IsError());
sleep(1);
ASSERT_TRUE(usageItem->usageBlocked);
const std::string remoteWorkerAddr1(RandomData().GetRandomString(8));
usageMonitor_->IncUsage(streamName, remoteWorkerAddr1, 1);
std::shared_ptr<UsageItem> usageItem1;
usageMonitor_->GetMostUsed(usageItem1);
sleep(1);
ASSERT_TRUE(usageItem1->usageBlocked);
usageMonitor_->DecUsage(streamName, remoteWorkerAddr, 12);
sleep(1);
ASSERT_FALSE(usageItem->usageBlocked);
ASSERT_FALSE(usageItem1->usageBlocked);
}
TEST_F(UsageMonitorTest, TestGetMostUsed)
{
const std::string streamName(RandomData().GetRandomString(8));
const std::string remoteWorkerAddr1(RandomData().GetRandomString(8));
DS_ASSERT_OK(usageMonitor_->ReserveMemory(streamName, 0));
ASSERT_FALSE(usageMonitor_->CheckOverUsed().IsError());
usageMonitor_->IncUsage(streamName, remoteWorkerAddr1, 1);
const std::string remoteWorkerAddr2(RandomData().GetRandomString(8));
usageMonitor_->IncUsage(streamName, remoteWorkerAddr2, 10);
const std::string remoteWorkerAddr3(RandomData().GetRandomString(8));
usageMonitor_->IncUsage(streamName, remoteWorkerAddr3, 1);
std::shared_ptr<UsageItem> usageItem;
usageMonitor_->GetMostUsed(usageItem);
ASSERT_EQ(usageItem->streamName, streamName);
ASSERT_EQ(usageItem->remoteWorkerAddr, remoteWorkerAddr2);
ASSERT_EQ(usageItem->usage, DEFAULT_TOTAL_SIZE);
}
TEST_F(UsageMonitorTest, DISABLED_TestGetMostUsedBlocked)
{
const std::string streamName(RandomData().GetRandomString(8));
const std::string remoteWorkerAddr1(RandomData().GetRandomString(8));
DS_ASSERT_OK(usageMonitor_->ReserveMemory(streamName, 0));
ASSERT_FALSE(usageMonitor_->CheckOverUsed().IsError());
usageMonitor_->IncUsage(streamName, remoteWorkerAddr1, 1);
const std::string remoteWorkerAddr2(RandomData().GetRandomString(8));
usageMonitor_->IncUsage(streamName, remoteWorkerAddr2, 10);
std::shared_ptr<UsageItem> usageItem;
usageMonitor_->GetMostUsed(usageItem);
usageItem->usageBlocked = true;
const std::string remoteWorkerAddr3(RandomData().GetRandomString(8));
usageMonitor_->IncUsage(streamName, remoteWorkerAddr3, 2);
std::shared_ptr<UsageItem> usageItem1;
usageMonitor_->GetMostUsed(usageItem1);
ASSERT_EQ(usageItem1->streamName, streamName);
ASSERT_EQ(usageItem1->remoteWorkerAddr, remoteWorkerAddr3);
ASSERT_EQ(usageItem1->usage, (uint64_t)2);
}
TEST_F(UsageMonitorTest, TestBasicReserve1)
{
const int moreThanHalf = 6;
const std::string streamName(RandomData().GetRandomString(8));
const std::string streamName2(RandomData().GetRandomString(8));
const std::string remoteWorkerAddr1(RandomData().GetRandomString(8));
DS_ASSERT_OK(usageMonitor_->ReserveMemory(streamName, moreThanHalf));
DS_ASSERT_NOT_OK(usageMonitor_->ReserveMemory(streamName2, moreThanHalf));
usageMonitor_->UndoReserveMemory(streamName);
DS_ASSERT_OK(usageMonitor_->ReserveMemory(streamName2, moreThanHalf));
usageMonitor_->UndoReserveMemory(streamName2);
DS_ASSERT_OK(usageMonitor_->ReserveMemory(streamName, 1));
DS_ASSERT_OK(usageMonitor_->CheckNIncOverUsedForStream(streamName, remoteWorkerAddr1, 0, 1, moreThanHalf));
DS_ASSERT_OK(usageMonitor_->ReserveMemory(streamName2, moreThanHalf));
DS_ASSERT_NOT_OK(usageMonitor_->CheckNIncOverUsedForStream(streamName, remoteWorkerAddr1, 0, 1, moreThanHalf));
DS_ASSERT_NOT_OK(
usageMonitor_->CheckNIncOverUsedForStream(streamName2, remoteWorkerAddr1, 0, 1, DEFAULT_TOTAL_SIZE));
DS_ASSERT_OK(
usageMonitor_->CheckNIncOverUsedForStream(streamName2, remoteWorkerAddr1, 0, 1, DEFAULT_TOTAL_SIZE - 1));
}
TEST_F(UsageMonitorTest, TestBasicReserve2)
{
const int moreThanHalf = 6;
const std::string streamName(RandomData().GetRandomString(8));
const std::string streamName2(RandomData().GetRandomString(8));
const std::string remoteWorkerAddr1(RandomData().GetRandomString(8));
DS_ASSERT_OK(usageMonitor_->ReserveMemory(streamName, moreThanHalf));
DS_ASSERT_NOT_OK(usageMonitor_->ReserveMemory(streamName, DEFAULT_TOTAL_SIZE + 1));
DS_ASSERT_OK(usageMonitor_->ReserveMemory(streamName2, 1));
DS_ASSERT_NOT_OK(usageMonitor_->ReserveMemory(streamName, DEFAULT_TOTAL_SIZE));
}
TEST_F(UsageMonitorTest, TestLowerBound)
{
const std::string streamName(RandomData().GetRandomString(8));
const std::string remoteWorkerAddr1(RandomData().GetRandomString(8));
const std::string remoteWorkerAddr2(RandomData().GetRandomString(8));
DS_ASSERT_OK(usageMonitor_->ReserveMemory(streamName, 0));
usageMonitor_->IncUsage(streamName, remoteWorkerAddr1, 1);
const double lowerTh = 0.01;
ASSERT_TRUE(usageMonitor_->CheckNIncOverUsedForStream(streamName, remoteWorkerAddr1, 0, lowerTh, 0).IsError());
ASSERT_FALSE(usageMonitor_->CheckNIncOverUsedForStream(streamName, remoteWorkerAddr1, 1, lowerTh, 0).IsError());
LOG_IF_ERROR(usageMonitor_->CheckNIncOverUsedForStream(streamName, remoteWorkerAddr1, 0, lowerTh, 0), "OOM");
usageMonitor_->IncUsage(streamName, remoteWorkerAddr1, 4);
usageMonitor_->IncUsage(streamName, remoteWorkerAddr2, 5);
const double higherTh = 0.8;
ASSERT_TRUE(usageMonitor_->CheckNIncOverUsedForStream(streamName, remoteWorkerAddr1, 1, higherTh, 0).IsError());
ASSERT_FALSE(usageMonitor_->CheckNIncOverUsedForStream(streamName, remoteWorkerAddr1, 1, 1, 0).IsError());
usageMonitor_->IncUsage(streamName, remoteWorkerAddr1, 11);
ASSERT_TRUE(usageMonitor_->CheckNIncOverUsedForStream(streamName, remoteWorkerAddr1, 1, 1, 0).IsError());
}
}
}