* This file is part of the MindStudio project.
* Copyright (c) 2025 Huawei Technologies Co.,Ltd.
*
* MindStudio is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*/
#ifndef SERVICEPROFILERDBWRITER_H
#define SERVICEPROFILERDBWRITER_H
#include <array>
#include <functional>
#include <string>
#include <map>
#include <sqlite3.h>
#include "Log.h"
#include "DBExecutor/DbDefines.h"
#include "DbBuffer.h"
#include "MultiThreadBufferManager.h"
namespace msServiceProfiler {
enum DBPriorityLevel : int {
PRIORITY_START_PROF = -10,
PRIORITY_NORMAL = 0,
PRIORITY_STOP_PROF = 10,
};
class ServiceProfilerDbWriter;
class DbExecutorInterface {
public:
virtual void Execute(ServiceProfilerDbWriter &writer, sqlite3 *db) = 0;
virtual bool Cache() = 0;
virtual int Level()
{
return PRIORITY_NORMAL;
};
virtual ~DbExecutorInterface() = default;
};
template <int T>
class DbExecutor final : public DbExecutorInterface {
public:
void Execute(ServiceProfilerDbWriter &, sqlite3 *) override{};
bool Cache() override
{
return false;
};
~DbExecutor() override = default;
};
class DbFuncExec final : public DbExecutorInterface {
public:
explicit DbFuncExec(std::function<void(ServiceProfilerDbWriter &, sqlite3 *)> func, int level = PRIORITY_NORMAL)
: func_(std::move(func)), level_(level)
{}
void Execute(ServiceProfilerDbWriter &writer, sqlite3 *db) override
{
if (func_) {
func_(writer, db);
}
};
bool Cache() override
{
return false;
};
int Level() override
{
return level_;
}
~DbFuncExec() override = default;
private:
std::function<void(ServiceProfilerDbWriter &, sqlite3 *)> func_{};
int level_ = PRIORITY_NORMAL;
};
using DBExecBuffer = DbBuffer<DbExecutorInterface>;
class ServiceProfilerDbWriter {
public:
explicit ServiceProfilerDbWriter(const char *fileName) : dbFileName_(fileName),
bufferManger_{
std::bind(&ServiceProfilerDbWriter::RecvDbExecutor, this, std::placeholders::_1),
std::bind(&ServiceProfilerDbWriter::ExecutorDumpToDb, this)
} {};
~ServiceProfilerDbWriter()
{
std::lock_guard<std::mutex> lock(mtx_);
inited = false;
StopDump();
};
void StartDump(const std::string &outputPath);
void StopDump();
std::shared_ptr<DBExecBuffer> Register(uintptr_t pThreadIns)
{
return bufferManger_.Register(pThreadIns);
}
void Unregister(uintptr_t pThreadIns)
{
bufferManger_.Unregister(pThreadIns);
}
public:
sqlite3_stmt *GetStmt(const size_t stmtIndex) const
{
if (stmtIndex >= enableStmts_.size()) {
return nullptr;
}
return enableStmts_[stmtIndex];
}
sqlite3_stmt *InitStmt(const size_t stmtIndex, const char *stmtStr)
{
if (stmtIndex >= enableStmts_.size() || db_ == nullptr || !inited) {
return nullptr;
}
sqlite3_stmt *stmt = nullptr;
if (sqlite3_prepare_v2(db_, stmtStr, -1, &stmt, nullptr) != SQLITE_OK) {
PROF_LOGE("sqlInsertKindMstx SQL error");
return nullptr;
}
enableStmts_[stmtIndex] = stmt;
return stmt;
}
void Execute(const char *sql) const;
void CacheExecutor(std::unique_ptr<DbExecutorInterface> pExec)
{
cachedExecutor.push_back(std::move(pExec));
}
private:
void ApplyOptimizations() const;
bool StartTransAction() const;
void Flash() const;
void RecvDbExecutor(std::unique_ptr<DbExecutorInterface> dbExecutor);
void ExecutorDumpToDb();
private:
std::mutex mtx_;
const char *dbFileName_ = nullptr;
bool inited = false;
sqlite3 *db_ = nullptr;
std::vector<std::unique_ptr<DbExecutorInterface>> cachedExecutor{};
std::array<sqlite3_stmt *, DB_STMT_CNT> enableStmts_{nullptr};
std::map<int, std::vector<std::unique_ptr<DbExecutorInterface>>> cachePopExecutors_;
MultiThreadBufferManager<DbExecutorInterface> bufferManger_;
};
template <DBFile dbFile>
class ServiceProfilerDbFileWriter : public ServiceProfilerDbWriter {
ServiceProfilerDbFileWriter() : ServiceProfilerDbWriter(DbFileName(dbFile)){};
public:
static ServiceProfilerDbFileWriter &GetDbWriter()
{
static ServiceProfilerDbFileWriter manager;
return manager;
};
};
template <DBFile dbFile>
class ServiceProfilerThreadWriter {
public:
ServiceProfilerThreadWriter()
{
pBuffer = ServiceProfilerDbFileWriter<dbFile>::GetDbWriter().Register(reinterpret_cast<uintptr_t>(this));
}
~ServiceProfilerThreadWriter()
{
ServiceProfilerDbFileWriter<dbFile>::GetDbWriter().Unregister(reinterpret_cast<uintptr_t>(this));
}
inline static ServiceProfilerThreadWriter &GetWriter()
{
thread_local ServiceProfilerThreadWriter writer;
return writer;
}
void Insert(std::unique_ptr<DbExecutorInterface> activity)
{
if (pBuffer) {
auto pRetData = pBuffer->Push(std::move(activity));
#ifdef ENABLE_SERVICE_PROF_UNIT_TEST
if (pRetData != nullptr) {
thisThreadPushFailedCnt_++;
}
thisThreadPushCnt_++;
#endif
}
}
#ifdef ENABLE_SERVICE_PROF_UNIT_TEST
void WaitForAllDump()
{
while (pBuffer->Size() > 0) {
std::this_thread::sleep_for(std::chrono::nanoseconds(100));
}
PROF_LOGI(
"buffer push: %lu, failed: %lu, pop cnt: %lu, push cnt: %lu, max cnt: %lu, diff: %lu",
thisThreadPushCnt_,
thisThreadPushFailedCnt_,
pBuffer->PopCnt(),
pBuffer->PushCnt(),
pBuffer->MaxCntInBuffer(),
pBuffer->PushCnt() - pBuffer->PopCnt());
}
#endif
private:
std::shared_ptr<DBExecBuffer> pBuffer = nullptr;
#ifdef ENABLE_SERVICE_PROF_UNIT_TEST
size_t thisThreadPushCnt_ = 0;
size_t thisThreadPushFailedCnt_ = 0;
#endif
};
template <DBFile dbFile>
void InsertExecutor2Writer(std::unique_ptr<DbExecutorInterface> activity)
{
ServiceProfilerThreadWriter<dbFile>::GetWriter().Insert(std::move(activity));
}
}
#endif