* Copyright (c) 2025 Huawei Technologies Co., Ltd.
* This program is free software, you can redistribute it and/or modify it under the terms and conditions of
* CANN Open Software License Agreement Version 2.0 (the "License").
* Please refer to the License for details. You may not use this file except in compliance with the License.
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED,
* INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE.
* See LICENSE in the root of the software repository for the full text of the License.
*/
#include <pthread.h>
#include <errno.h>
#include <strings.h>
#include <string.h>
#include <unistd.h>
#include <memory>
#include <mutex>
#include "gtest/gtest.h"
#include "mockcpp/mockcpp.hpp"
#include "prof_channel.h"
#include "collection_entry.h"
#include "uploader_mgr.h"
using namespace analysis::dvvp::common::error;
using namespace Analysis::Dvvp::Common::Statistics;
using namespace Analysis::Dvvp::MsprofErrMgr;
#define MAX_BUFFER_SIZE (1024 * 1024 * 2)
#define MAX_THRESHOLD_SIZE (MAX_BUFFER_SIZE * 0.8)
class TRANSPORT_PROF_CHANNELREADER_UTEST: public testing::Test {
protected:
virtual void SetUp() {
std::shared_ptr<analysis::dvvp::message::JobContext> jobCtx(
new analysis::dvvp::message::JobContext);
_job_ctx = jobCtx;
}
virtual void TearDown() {
}
public:
std::shared_ptr<analysis::dvvp::message::JobContext> _job_ctx;
void EXPECT_HashIdDiff(analysis::dvvp::driver::AI_DRV_CHANNEL channel1,
analysis::dvvp::driver::AI_DRV_CHANNEL channel2);
};
TEST_F(TRANSPORT_PROF_CHANNELREADER_UTEST, Execute) {
GlobalMockObject::verify();
std::shared_ptr<analysis::dvvp::transport::ChannelReader> reader(
new analysis::dvvp::transport::ChannelReader(
0, analysis::dvvp::driver::PROF_CHANNEL_TS_CPU, "data/ts.12.0.0",
_job_ctx));
reader->Init();
MOCKER(&analysis::dvvp::driver::DrvChannelRead)
.stubs()
.will(returnValue(PROFILING_FAILED))
.then(returnValue(64))
.then(returnValue(0));
MOCKER_CPP(&analysis::dvvp::transport::UploaderMgr::UploadData,
int(analysis::dvvp::transport::UploaderMgr::*)(const std::string &, const void *, uint32_t))
.stubs()
.will(returnValue(PROFILING_FAILED));
reader->dataSize_ = MAX_THRESHOLD_SIZE + 1;
EXPECT_EQ(PROFILING_SUCCESS, reader->Execute());
EXPECT_EQ(PROFILING_SUCCESS, reader->Execute());
EXPECT_EQ(PROFILING_SUCCESS, reader->Execute());
reader->UploadData();
reader->SetChannelStopped();
EXPECT_EQ(PROFILING_SUCCESS, reader->Execute());
reader.reset();
}
TEST_F(TRANSPORT_PROF_CHANNELREADER_UTEST, HashId) {
GlobalMockObject::verify();
std::shared_ptr<analysis::dvvp::transport::ChannelReader> reader(
new analysis::dvvp::transport::ChannelReader(
0, analysis::dvvp::driver::PROF_CHANNEL_TS_CPU, "data/ts.12.0.0",
_job_ctx));
reader->Init();
EXPECT_NE((size_t)0, reader->HashId());
reader.reset();
}
void TRANSPORT_PROF_CHANNELREADER_UTEST::EXPECT_HashIdDiff(analysis::dvvp::driver::AI_DRV_CHANNEL channel1,
analysis::dvvp::driver::AI_DRV_CHANNEL channel2)
{
std::shared_ptr<analysis::dvvp::transport::ChannelReader> reader1(
new analysis::dvvp::transport::ChannelReader(0, channel1, "tmp", _job_ctx));
std::shared_ptr<analysis::dvvp::transport::ChannelReader> reader2(
new analysis::dvvp::transport::ChannelReader(0, channel2, "tmp", _job_ctx));
reader1->Init();
reader2->Init();
uint32_t threadNum = 4;
EXPECT_NE(reader1->HashId() % threadNum, reader2->HashId() % threadNum);
threadNum = 8;
EXPECT_NE(reader1->HashId() % threadNum, reader2->HashId() % threadNum);
}
TEST_F(TRANSPORT_PROF_CHANNELREADER_UTEST, HashIdDifferent) {
EXPECT_HashIdDiff(analysis::dvvp::driver::PROF_CHANNEL_STARS_SOC_LOG, analysis::dvvp::driver::PROF_CHANNEL_FFTS_PROFILE_TASK);
EXPECT_HashIdDiff(analysis::dvvp::driver::PROF_CHANNEL_STARS_SOC_LOG, analysis::dvvp::driver::PROF_CHANNEL_TS_FW);
EXPECT_HashIdDiff(analysis::dvvp::driver::PROF_CHANNEL_STARS_SOC_LOG, analysis::dvvp::driver::PROF_CHANNEL_LP);
EXPECT_HashIdDiff(analysis::dvvp::driver::PROF_CHANNEL_FFTS_PROFILE_TASK, analysis::dvvp::driver::PROF_CHANNEL_TS_FW);
EXPECT_HashIdDiff(analysis::dvvp::driver::PROF_CHANNEL_FFTS_PROFILE_TASK, analysis::dvvp::driver::PROF_CHANNEL_LP);
GlobalMockObject::verify();
}
TEST_F(TRANSPORT_PROF_CHANNELREADER_UTEST, SetChannelStopped) {
GlobalMockObject::verify();
MOCKER_CPP(&analysis::dvvp::transport::ChannelReader::UploadData)
.stubs();
std::shared_ptr<analysis::dvvp::transport::ChannelReader> reader(
new analysis::dvvp::transport::ChannelReader(
0, analysis::dvvp::driver::PROF_CHANNEL_TS_CPU, "data/ts.12.0.0",
_job_ctx));
EXPECT_EQ(PROFILING_SUCCESS, reader->Init());
reader->dataSize_ = 10;
reader->SetChannelStopped();
reader.reset();
}
TEST_F(TRANSPORT_PROF_CHANNELREADER_UTEST, UploadData) {
GlobalMockObject::verify();
MOCKER_CPP(&analysis::dvvp::transport::ChannelReader::UploadData)
.stubs();
std::shared_ptr<analysis::dvvp::transport::ChannelReader> reader(
new analysis::dvvp::transport::ChannelReader(
0, analysis::dvvp::driver::PROF_CHANNEL_TS_CPU, "data/ts.12.0.0",
_job_ctx));
EXPECT_EQ(PROFILING_SUCCESS, reader->Init());
reader->dataSize_ = 0;
reader->UploadData();
reader->dataSize_ = 10;
reader->UploadData();
}
TEST_F(TRANSPORT_PROF_CHANNELREADER_UTEST, FlushDrvBuff) {
GlobalMockObject::verify();
MOCKER_CPP(&analysis::dvvp::transport::ChannelReader::UploadData)
.stubs();
MOCKER(&analysis::dvvp::driver::DrvProfFlush)
.stubs()
.will(returnValue(PROFILING_FAILED))
.then(returnValue(PROFILING_SUCCESS));
std::shared_ptr<analysis::dvvp::transport::ChannelReader> reader(
new analysis::dvvp::transport::ChannelReader(
0, analysis::dvvp::driver::PROF_CHANNEL_TS_CPU, "data/ts.12.0.0",
_job_ctx));
EXPECT_EQ(PROFILING_SUCCESS, reader->Init());
reader->FlushDrvBuff();
reader->channelId_ = analysis::dvvp::driver::PROF_CHANNEL_HWTS_LOG;
reader->FlushDrvBuff();
reader->FlushDrvBuff();
reader->needWait_ = true;
reader->CheckIfSendFlush(0);
}
TEST_F(TRANSPORT_PROF_CHANNELREADER_UTEST, FlushDrvBuffDoesNotSupportNtsPmu) {
GlobalMockObject::verify();
MOCKER_CPP(&analysis::dvvp::transport::ChannelReader::UploadData)
.stubs();
uint32_t flushsize = 0;
MOCKER(&analysis::dvvp::driver::DrvProfFlush)
.expects(never())
.with(eq(0U), eq(static_cast<uint32_t>(analysis::dvvp::driver::PROF_CHANNEL_NTS_PMU)), outBound(flushsize))
.will(returnValue(PROFILING_SUCCESS));
std::shared_ptr<analysis::dvvp::transport::ChannelReader> reader(
new analysis::dvvp::transport::ChannelReader(
0, analysis::dvvp::driver::PROF_CHANNEL_NTS_PMU, "data/nts_pmu.data",
_job_ctx));
EXPECT_EQ(PROFILING_SUCCESS, reader->Init());
EXPECT_FALSE(reader->IsSupportFlushDrvBuff());
reader->FlushDrvBuff();
}
TEST_F(TRANSPORT_PROF_CHANNELREADER_UTEST, FlushDrvBuffDoesNotSupportNtsTask) {
GlobalMockObject::verify();
MOCKER_CPP(&analysis::dvvp::transport::ChannelReader::UploadData)
.stubs();
uint32_t flushsize = 0;
MOCKER(&analysis::dvvp::driver::DrvProfFlush)
.expects(never())
.with(eq(0U), eq(static_cast<uint32_t>(analysis::dvvp::driver::PROF_CHANNEL_NTS_TASK)), outBound(flushsize))
.will(returnValue(PROFILING_SUCCESS));
std::shared_ptr<analysis::dvvp::transport::ChannelReader> reader(
new analysis::dvvp::transport::ChannelReader(
0, analysis::dvvp::driver::PROF_CHANNEL_NTS_TASK, "data/nts_task.data",
_job_ctx));
EXPECT_EQ(PROFILING_SUCCESS, reader->Init());
EXPECT_FALSE(reader->IsSupportFlushDrvBuff());
reader->FlushDrvBuff();
}
struct FlushThreadData {
analysis::dvvp::transport::ChannelReader *reader;
bool stop;
};
void *ThreadRun(void *data)
{
auto threadData = reinterpret_cast<FlushThreadData *>(data);
auto reader = threadData->reader;
bool stop = false;
while (!stop) {
{
std::unique_lock<std::mutex> guard(reader->flushMutex_);
if (reader->needWait_) {
reader->SendFlushFinished();
return nullptr;
}
stop = threadData->stop;
}
if (!stop) {
usleep(1000);
}
}
return nullptr;
}
TEST_F(TRANSPORT_PROF_CHANNELREADER_UTEST, FlushDrvBuff2) {
GlobalMockObject::verify();
MOCKER_CPP(&analysis::dvvp::transport::ChannelReader::UploadData)
.stubs();
unsigned int flushsize = 100;
MOCKER(&analysis::dvvp::driver::DrvProfFlush)
.stubs()
.with(any(), any(), outBound(flushsize))
.will(returnValue(PROFILING_SUCCESS));
std::shared_ptr<analysis::dvvp::transport::ChannelReader> reader(
new analysis::dvvp::transport::ChannelReader(
0, analysis::dvvp::driver::PROF_CHANNEL_TS_CPU, "data/ts.12.0.0",
_job_ctx));
EXPECT_EQ(PROFILING_SUCCESS, reader->Init());
reader->flushBufSize_ = 0;
reader->channelId_ = analysis::dvvp::driver::PROF_CHANNEL_HWTS_LOG;
FlushThreadData threadData = {reader.get(), false};
pthread_t threadId;
ASSERT_EQ(0, pthread_create(&threadId, NULL, ThreadRun, reinterpret_cast<void *>(&threadData)));
reader->FlushDrvBuff();
{
std::unique_lock<std::mutex> guard(reader->flushMutex_);
threadData.stop = true;
}
ASSERT_EQ(0, pthread_join(threadId, nullptr));
}
class TRANSPORT_PROF_CHANNELPOLL_UTEST: public testing::Test {
protected:
virtual void SetUp() {
_job_ctx = std::make_shared<analysis::dvvp::message::JobContext>();
}
virtual void TearDown() {
}
public:
std::shared_ptr<analysis::dvvp::message::JobContext> _job_ctx;
};
TEST_F(TRANSPORT_PROF_CHANNELPOLL_UTEST, AddReader) {
GlobalMockObject::verify();
std::shared_ptr<analysis::dvvp::transport::ChannelPoll> poll(
new analysis::dvvp::transport::ChannelPoll());
std::shared_ptr<analysis::dvvp::transport::ChannelReader> reader(
new analysis::dvvp::transport::ChannelReader(
0, analysis::dvvp::driver::PROF_CHANNEL_TS_CPU, "data/ts.12.0.0",
_job_ctx));
reader->Init();
EXPECT_EQ(PROFILING_SUCCESS, poll->AddReader(0, analysis::dvvp::driver::PROF_CHANNEL_TS_CPU, reader));
EXPECT_NE(nullptr, poll->GetReader(0, analysis::dvvp::driver::PROF_CHANNEL_TS_CPU).get());
poll.reset();
}
TEST_F(TRANSPORT_PROF_CHANNELPOLL_UTEST, RemoveReader) {
GlobalMockObject::verify();
std::shared_ptr<analysis::dvvp::transport::ChannelPoll> poll(
new analysis::dvvp::transport::ChannelPoll());
std::shared_ptr<analysis::dvvp::transport::ChannelReader> reader(
new analysis::dvvp::transport::ChannelReader(
0, analysis::dvvp::driver::PROF_CHANNEL_TS_CPU, "data/ts.12.0.0",
_job_ctx));
reader->Init();
EXPECT_EQ(PROFILING_SUCCESS, poll->AddReader(0, analysis::dvvp::driver::PROF_CHANNEL_TS_CPU, reader));
EXPECT_NE(nullptr, poll->GetReader(0, analysis::dvvp::driver::PROF_CHANNEL_TS_CPU).get());
EXPECT_EQ(PROFILING_SUCCESS, poll->RemoveReader(0, analysis::dvvp::driver::PROF_CHANNEL_TS_CPU));
EXPECT_EQ(nullptr, poll->GetReader(0, analysis::dvvp::driver::PROF_CHANNEL_TS_CPU).get());
poll.reset();
}
TEST_F(TRANSPORT_PROF_CHANNELPOLL_UTEST, DispatchChannel) {
GlobalMockObject::verify();
std::shared_ptr<analysis::dvvp::transport::ChannelPoll> poll(
new analysis::dvvp::transport::ChannelPoll());
EXPECT_EQ(PROFILING_FAILED, poll->DispatchChannel(0, analysis::dvvp::driver::PROF_CHANNEL_TS_CPU));
MOCKER(&analysis::dvvp::driver::DrvGetDevNum)
.stubs()
.will(returnValue(1));
poll->Start();
EXPECT_EQ(PROFILING_FAILED, poll->DispatchChannel(0, analysis::dvvp::driver::PROF_CHANNEL_TS_CPU));
std::shared_ptr<analysis::dvvp::transport::ChannelReader> reader(
new analysis::dvvp::transport::ChannelReader(
0, analysis::dvvp::driver::PROF_CHANNEL_TS_CPU, "data/ts.12.0.0",
_job_ctx));
reader->Init();
poll->AddReader(0, analysis::dvvp::driver::PROF_CHANNEL_TS_CPU, reader);
reader->SetSchedulingStatus(true);
EXPECT_EQ(PROFILING_SUCCESS, poll->DispatchChannel(0, analysis::dvvp::driver::PROF_CHANNEL_TS_CPU));
reader->SetSchedulingStatus(false);
EXPECT_EQ(PROFILING_SUCCESS, poll->DispatchChannel(0, analysis::dvvp::driver::PROF_CHANNEL_TS_CPU));
poll.reset();
}
TEST_F(TRANSPORT_PROF_CHANNELPOLL_UTEST, FlushAllChannels) {
GlobalMockObject::verify();
std::shared_ptr<analysis::dvvp::transport::ChannelPoll> poll(
new analysis::dvvp::transport::ChannelPoll());
MOCKER(&analysis::dvvp::driver::DrvGetDevNum)
.stubs()
.will(returnValue(1));
poll->Start();
std::shared_ptr<analysis::dvvp::transport::ChannelReader> reader(
new analysis::dvvp::transport::ChannelReader(
0, analysis::dvvp::driver::PROF_CHANNEL_TS_CPU, "data/ts.12.0.0",
_job_ctx));
reader->Init();
EXPECT_EQ(PROFILING_SUCCESS, poll->AddReader(0, analysis::dvvp::driver::PROF_CHANNEL_TS_CPU, reader));
EXPECT_NE(nullptr, poll->GetReader(0, analysis::dvvp::driver::PROF_CHANNEL_TS_CPU).get());
poll.reset();
}
TEST_F(TRANSPORT_PROF_CHANNELPOLL_UTEST, start) {
GlobalMockObject::verify();
std::shared_ptr<analysis::dvvp::transport::ChannelPoll> poll(
new analysis::dvvp::transport::ChannelPoll());
EXPECT_EQ(PROFILING_SUCCESS, poll->Start());
EXPECT_TRUE(poll->isStart_);
poll.reset();
}
TEST_F(TRANSPORT_PROF_CHANNELPOLL_UTEST, stop) {
GlobalMockObject::verify();
std::shared_ptr<analysis::dvvp::transport::ChannelPoll> poll(
new analysis::dvvp::transport::ChannelPoll());
poll->Start();
EXPECT_EQ(PROFILING_SUCCESS, poll->Stop());
EXPECT_FALSE(poll->isStart_);
poll.reset();
}
TEST_F(TRANSPORT_PROF_CHANNELPOLL_UTEST, run) {
GlobalMockObject::verify();
std::shared_ptr<analysis::dvvp::transport::ChannelPoll> poll(
new analysis::dvvp::transport::ChannelPoll());
EXPECT_NE(nullptr, poll);
poll->isStart_= true;
MOCKER(&analysis::dvvp::driver::DrvChannelPoll)
.stubs()
.will(returnValue(-4))
.then(returnValue(6))
.then(returnValue(-1));
MOCKER_CPP(&analysis::dvvp::transport::ChannelPoll::DispatchChannel)
.stubs()
.will(returnValue(PROFILING_SUCCESS));
auto errorContext = MsprofErrorManager::instance()->GetErrorManagerContext();
poll->Run(errorContext);
poll->isStart_=false;
EXPECT_EQ(3, poll->pollCount_);
EXPECT_EQ(1, poll->pollSleepCount_);
EXPECT_EQ(1, poll->dispatchCount_);
EXPECT_EQ(6, poll->dispatchChannelCount_);
poll.reset();
}
TEST_F(TRANSPORT_PROF_CHANNELPOLL_UTEST, run_quit) {
GlobalMockObject::verify();
std::shared_ptr<analysis::dvvp::transport::ChannelPoll> poll(
new analysis::dvvp::transport::ChannelPoll());
MOCKER(&analysis::dvvp::driver::DrvChannelPoll)
.stubs()
.will(returnValue(0));
EXPECT_EQ(PROFILING_SUCCESS, poll->Start());
EXPECT_EQ(PROFILING_SUCCESS, poll->Stop());
poll.reset();
}
TEST_F(TRANSPORT_PROF_CHANNELPOLL_UTEST, ChannelBufferBase) {
GlobalMockObject::verify();
std::shared_ptr<analysis::dvvp::transport::ChannelBuffer> threadBuffer(
new analysis::dvvp::transport::ChannelBuffer());
std::string buffer;
threadBuffer->SwapChannelBuffer(buffer);
EXPECT_EQ(PROFILING_SUCCESS, threadBuffer->Start());
uint32_t sleepCount = 0;
while (threadBuffer->preBufferQueue_.size() == 0 && sleepCount < 1000) {
analysis::dvvp::common::utils::Utils::UsleepInterupt(1000);
sleepCount++;
}
threadBuffer->SwapChannelBuffer(buffer);
EXPECT_EQ(PROFILING_SUCCESS, threadBuffer->Stop());
threadBuffer.reset();
}