* This file is part of the MindStudio project.
* Copyright (c) 2025 Huawei Technologies Co.,Ltd.
*
* MindStudio is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*/
#include <sys/types.h>
#include <unistd.h>
#include <semaphore.h>
#include <algorithm>
#include <atomic>
#include <chrono>
#include <climits>
#include <fstream>
#include <iostream>
#include <string>
#include <thread>
#include <vector>
#include <map>
#include <cmath>
#include <functional>
#include <algorithm>
#include <set>
#include <cmath>
#include <forward_list>
#include <sqlite3.h>
#include <sys/stat.h>
#include "securec.h"
#include "acl/acl_prof.h"
#include "acl/acl.h"
#include "mstx/ms_tools_ext.h"
#include "msServiceProfiler/Log.h"
#include "msServiceProfiler/DbBuffer.h"
#include "msServiceProfiler/Profiler.h"
#include "msServiceProfiler/NpuMemoryUsage.h"
#include "msServiceProfiler/SecurityUtilsLog.h"
#include "msServiceProfiler/ServiceProfilerManager.h"
#include "msServiceProfiler/ServiceProfilerDbWriter.h"
namespace {
constexpr int ALIGN_SIZE = 8;
}
namespace msServiceProfiler {
bool ServiceProfilerDbWriter::StartTransAction() const
{
if (db_ == nullptr || !inited) {
return false;
}
char *errMsg = nullptr;
if (sqlite3_exec(db_, "BEGIN TRANSACTION", nullptr, nullptr, &errMsg) != SQLITE_OK) {
PROF_LOGE(" begin transaction error: %s", errMsg);
sqlite3_free(errMsg);
return false;
}
return true;
}
void ServiceProfilerDbWriter::Flash() const
{
char *errMsg = nullptr;
if (db_ == nullptr || !inited) {
return;
}
if (sqlite3_exec(db_, "COMMIT", nullptr, nullptr, &errMsg) != SQLITE_OK) {
PROF_LOGE(" commit error: %s", errMsg);
sqlite3_free(errMsg);
}
}
void ServiceProfilerDbWriter::StartDump(const std::string &outputPath)
{
if (inited) {
return;
}
const auto &hostName = MsUtils::GetHostName();
std::string dir = outputPath;
if (!dir.empty() && dir.back() != '/') {
dir += '/';
}
dir += "Trace_Service/";
::mkdir(dir.c_str(), 0755);
std::string dbPath = dir + dbFileName_ + "_" + hostName + "-" + std::to_string(getpid()) + ".db";
MsUtils::UmaskGuard umaskGuard;
int rc = sqlite3_open(dbPath.c_str(), &db_);
if (rc != SQLITE_OK) {
const char *errMsg = db_ ? sqlite3_errmsg(db_) : sqlite3_errstr(rc);
PROF_LOGE("Execution failed: %s, %s", SecurityUtils::ToSafeString(errMsg).c_str(), SecurityUtils::ToSafeString(dbPath).c_str());
if (db_) {
sqlite3_close(db_);
db_ = nullptr;
}
return;
}
ApplyOptimizations();
inited = true;
Execute("CREATE TABLE IF NOT EXISTS counter ("
"id INTEGER PRIMARY KEY AUTOINCREMENT, "
"name TEXT, pid TEXT, timestamp INTEGER, cat TEXT, args TEXT);");
Execute("CREATE TABLE IF NOT EXISTS flow ("
"id INTEGER PRIMARY KEY AUTOINCREMENT, "
"flow_id TEXT, name TEXT, cat TEXT, track_id INTEGER, timestamp INTEGER, type TEXT);");
for (const auto &executor : cachedExecutor) {
executor->Execute(*this, db_);
}
}
void ServiceProfilerDbWriter::StopDump()
{
PROF_LOGD("Service Profiler DbWriter StopDump");
for (auto &enableStmt : enableStmts_) {
auto *stmt = enableStmt;
if (stmt != nullptr) {
sqlite3_finalize(stmt);
}
enableStmt = nullptr;
}
if (db_) {
Execute("PRAGMA locking_mode = NORMAL;");
Execute("PRAGMA journal_mode = WAL;");
Execute("PRAGMA wal_checkpoint(FULL);");
sqlite3_close(db_);
db_ = nullptr;
}
inited = false;
}
void ServiceProfilerDbWriter::ApplyOptimizations() const
{
Execute("PRAGMA journal_mode = WAL;");
Execute("PRAGMA synchronous = OFF;");
Execute("PRAGMA cache_size = -1000;");
Execute("PRAGMA temp_store = MEMORY;");
Execute("PRAGMA page_size = 4096;");
Execute("PRAGMA locking_mode = NORMAL;");
PROF_LOGD("DB set to WAL mode.");
}
void ServiceProfilerDbWriter::Execute(const char *sql) const
{
if (db_ == nullptr || !inited) {
return;
}
char *errMsg = nullptr;
auto execRet = sqlite3_exec(db_, sql, nullptr, nullptr, &errMsg);
if (execRet != SQLITE_OK) {
PROF_LOGE(" Execution SQL error: [%d][%s], and the sql is [%s]", execRet, errMsg, sql);
sqlite3_free(errMsg);
}
}
void msServiceProfiler::ServiceProfilerDbWriter::RecvDbExecutor(std::unique_ptr<DbExecutorInterface> dbExecutor)
{
const int level = dbExecutor->Level();
if (cachePopExecutors_.find(level) == cachePopExecutors_.end()) {
cachePopExecutors_.emplace(level, std::vector<std::unique_ptr<DbExecutorInterface>>());
}
cachePopExecutors_.at(level).emplace_back(std::move(dbExecutor));
}
void msServiceProfiler::ServiceProfilerDbWriter::ExecutorDumpToDb()
{
std::vector<int> levels;
for (const auto &pair : cachePopExecutors_) {
levels.push_back(pair.first);
}
std::sort(levels.begin(), levels.end());
std::lock_guard<std::mutex> lock(mtx_);
bool started = false;
for (int level : levels) {
if (!started && level == PRIORITY_NORMAL) {
started = StartTransAction();
}
for (auto it = cachePopExecutors_[level].begin(); it != cachePopExecutors_[level].end(); ++it) {
std::unique_ptr<DbExecutorInterface> &executor = *it;
executor->Execute(*this, this->db_);
if (executor->Cache()) {
CacheExecutor(std::move(executor));
}
}
cachePopExecutors_[level].clear();
if (started && level == PRIORITY_NORMAL) {
Flash();
started = false;
}
}
}
}